
| """ 链路耗时分析器 """ import json import requests from flask import Flask, request, jsonify import logging from decimal import Decimal, getcontext
getcontext().prec = 6
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger('my_logger') app = Flask(__name__)
iot_device = { "585a7a10-7285-492d-bc8d-dabbc8cf8462": "墙壁开关", "dfe780a0-be78-479b-baa7-f64d9299c9a4": "空调", "50533f17-cb94-4f04-a77c-cd5e5ba6d3f8": "窗帘伴侣左", "53c5cad1-fc6b-4e76-984a-528d5d08b6e5": "窗帘伴侣右", "d309919d-4e15-45a1-aebd-29dac7847343": "扫地机器人", "0ca95625-85e8-4f60-879b-7eb39b8e8a20": "加湿器", "65ee7ebd-a8f6-49bf-8ecc-28db1f225405": "空气净化器", "3d5c385e-492f-47db-acaf-8eff2e0048f5": "门铃", }
module_start_times = {} """ { "module_name":"",//模块名,只能取(语音模块,意图分析模块,IoT 后端模块,HA 模块,send) "event_name":"",//事件名 "description":"",//描述 "timestamp":"" //1697790122.937543,表示 10-20-2023 16:22 } """ unity = {} voice = {} intent_analysis = {} backend = {} ha = {} no_module_name = {}
@app.route('/record_time', methods=['POST']) def record_time(): data = request.get_json()
module_name = data.get('module_name', 'Unknown Module') event_name = data.get('event_name', 'No event_name') description = data.get('description', 'No description') timestamp = data.get('timestamp', '0') if timestamp is None or timestamp == '': timestamp = '0' print(f"收到 {module_name} 消息:{event_name},描述:{description},时间戳:{timestamp}") logger.log(logging.INFO, f"收到 {module_name} 消息:{event_name},描述:{description},时间戳:{timestamp}") if module_name == "语音模块": voice[event_name] = timestamp elif module_name == "意图分析模块": intent_analysis[event_name] = timestamp elif module_name == "unity 模块": unity[event_name] = timestamp elif module_name == "IoT 后端模块": backend[event_name] = timestamp elif module_name == "HA 模块": ha[event_name] = timestamp elif module_name == "clean": logger.log(logging.INFO, f"开始清空字典") voice.clear() intent_analysis.clear() backend.clear() ha.clear() no_module_name.clear() unity.clear() logger.log(logging.INFO, f"字典清空") return jsonify(clean="success") elif module_name == "send": logger.log(logging.INFO, f"开始处理数据") parse_data() logger.log(logging.INFO, f"开始清空字典") voice.clear() intent_analysis.clear() backend.clear() ha.clear() no_module_name.clear() unity.clear() logger.log(logging.INFO, f"字典清空")
else: no_module_name[module_name + event_name] = timestamp return jsonify(success=True)
def parse_data(): sorted_unity = dict(sorted(unity.items(), key=lambda item: float(item[1]))) sorted_voice = dict(sorted(voice.items(), key=lambda item: float(item[1]))) sorted_intent_analysis = dict(sorted(intent_analysis.items(), key=lambda item: float(item[1]))) sorted_backend = dict(sorted(backend.items(), key=lambda item: float(item[1]))) sorted_ha = dict(sorted(ha.items(), key=lambda item: float(item[1]))) sorted_no_module_name = dict(sorted(no_module_name.items(), key=lambda item: float(item[1])))
if sorted_voice: voice_message = calculate_time_difference(sorted_voice) voice_message = f"**语音模块** \n{voice_message}" send_post_request(voice_message) if sorted_intent_analysis: intent_analysis_message = calculate_time_difference(sorted_intent_analysis) intent_analysis_message = f"**意图分析模块** \n{intent_analysis_message}" send_post_request(intent_analysis_message) if sorted_backend: updated_sorted_backend = { iot_device.get(event_key.split('_')[0], event_key.split('_')[0]) + ( "_end" if "_end" in event_key else ""): value for event_key, value in sorted_backend.items() } backend_message = calculate_time_difference(updated_sorted_backend) if backend_message is not None: backend_message = f"**后端模块** \n{backend_message}" send_post_request(backend_message) if sorted_ha: ha_message = calculate_time_difference(sorted_ha) ha_message = f"**HA 模块** \n{ha_message}" send_post_request(ha_message) if sorted_no_module_name: no_module_name_message = calculate_time_difference(sorted_no_module_name) no_module_name_message = f"**未命名模块** \n{no_module_name_message}" send_post_request(no_module_name_message) if sorted_unity: unity_message = calculate_time_difference(sorted_unity) unity_message = (f"**Unity 模块** \n{unity_message}") send_post_request(unity_message)
def calculate_time_difference(sorted_dict): total_time = 0 keys = list(sorted_dict.keys()) time_differences = [] for i in range(len(keys) - 1): key1 = keys[i] key2 = keys[i + 1] timestamp1 = Decimal(sorted_dict[key1]) timestamp2 = Decimal(sorted_dict[key2]) time_difference = timestamp2 - timestamp1 time_differences.append(f"{key1} 到 {key2} 用时 {str(time_difference)} 毫秒") if key1 == "wakeup" or key2 == "wakeup": continue total_time = total_time + time_difference if not time_differences: return None return "\n".join(time_differences) + "\n总时间:" + str(total_time/1000).rstrip('0').rstrip('.') + " 秒"
def send_post_request(message): url = "https://open.feishu.cn/open-apis/bot/v2/hook/ed0fb5e2-cea4-4473-8b99-9e5bc3662b00"
headers = { "Content-Type": "application/json" }
payload = { "msg_type": "text", "content": { "text": f"{message}" } }
payload_json = json.dumps(payload)
response = requests.post(url, headers=headers, data=payload_json)
if response.status_code == 200: print(response.text) print("POST 请求成功") else: print(f"POST 请求失败,状态码:{response.status_code}") print(response.text)
if __name__ == '__main__': app.run(host='0.0.0.0', port=5001, debug=True)
|