链路耗时分析器

链路耗时分析器

介绍

我们开发的功能的时候,整个链路可能涉及多个模块,为统计每个阶段的处理耗时,从而做出针对性优化,特此推出链路耗时分析器

效果

下图是飞书机器人发出的消息

链路耗时分析器 1

开始使用

你只需要在你要记录时间节点的位置发送 http 请求,如下:

1
2
3
4
5
6
7
curl --location '<http://host:5001/record_time>' \\
--header 'Content-Type: application/json' \\
--data '{
"module_name":"send",
"event_name":"2",
"timestamp":"1697790122.937543"
}'

注意你的 Json 格式

module_name:只能取(语音模块,意图分析模块,IoT 后端模块,HA 模块,send)

event_name:是唯一的,如果两次请求的 event_name 相同,则后一次的时间戳会覆盖前一次的时间戳

timestamp:你本地的毫秒级时间戳!!!!

1
2
3
4
5
{
"module_name":"xx 模块",//模块名
"event_name":"xxx",//事件名,随便取,唯一,重复会覆盖
"timestamp":"1697790122.937543" //模块本机的时间戳
}

内部实现

服务会收集所有请求的内容(module_name 和 module_description)和服务端本机时间戳,保存在一个字典中。等到 HA 也给他发出 http 请求后,把字典发送到飞书机器人,并清空字典等待下次请求开始。

使用样例

1
2
3
4
5
6
7
curl --location '<http://host:5001/record_time>' \\
--header 'Content-Type: application/json' \\
--data '{
"module_name":"语音模块",
"event_name":"事件1",
"timestamp":"1697790122.937543"
}'

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
链路耗时分析器
"""
import json
import requests
from flask import Flask, request, jsonify
import logging
from decimal import Decimal, getcontext

# 设置精度为 3 位小数
getcontext().prec = 6
# 配置日志记录器
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger('my_logger')
app = Flask(__name__)

iot_device = {
"585a7a10-7285-492d-bc8d-dabbc8cf8462": "墙壁开关",
"dfe780a0-be78-479b-baa7-f64d9299c9a4": "空调",
"50533f17-cb94-4f04-a77c-cd5e5ba6d3f8": "窗帘伴侣左",
"53c5cad1-fc6b-4e76-984a-528d5d08b6e5": "窗帘伴侣右",
"d309919d-4e15-45a1-aebd-29dac7847343": "扫地机器人",
"0ca95625-85e8-4f60-879b-7eb39b8e8a20": "加湿器",
"65ee7ebd-a8f6-49bf-8ecc-28db1f225405": "空气净化器",
"3d5c385e-492f-47db-acaf-8eff2e0048f5": "门铃",
}
# 存储模块的开始时间
module_start_times = {}
"""
{
"module_name":"",//模块名,只能取(语音模块,意图分析模块,IoT 后端模块,HA 模块,send)
"event_name":"",//事件名
"description":"",//描述
"timestamp":"" //1697790122.937543,表示 10-20-2023 16:22
}
"""
unity = {}
voice = {}
intent_analysis = {}
backend = {}
ha = {}
no_module_name = {}


@app.route('/record_time', methods=['POST'])
def record_time():
data = request.get_json()

module_name = data.get('module_name', 'Unknown Module')
event_name = data.get('event_name', 'No event_name') # 获取事件名
description = data.get('description', 'No description') # 获取事件名
timestamp = data.get('timestamp', '0') # 获取模块描述字段
if timestamp is None or timestamp == '':
timestamp = '0' # 设置默认值为'0',或者你想要的任何默认值
print(f"收到 {module_name} 消息:{event_name},描述:{description},时间戳:{timestamp}")
logger.log(logging.INFO, f"收到 {module_name} 消息:{event_name},描述:{description},时间戳:{timestamp}")
# if event_name == "wakeup":
# voice.clear()
# intent_analysis.clear()
# backend.clear()
# ha.clear()
# no_module_name.clear()
# unity.clear()
if module_name == "语音模块":
voice[event_name] = timestamp
elif module_name == "意图分析模块":
intent_analysis[event_name] = timestamp
elif module_name == "unity 模块":
unity[event_name] = timestamp
elif module_name == "IoT 后端模块":
backend[event_name] = timestamp
elif module_name == "HA 模块":
ha[event_name] = timestamp
elif module_name == "clean":
logger.log(logging.INFO, f"开始清空字典")
voice.clear()
intent_analysis.clear()
backend.clear()
ha.clear()
no_module_name.clear()
unity.clear()
logger.log(logging.INFO, f"字典清空")
return jsonify(clean="success") # 返回一个空的 JSON 响应
elif module_name == "send":
# 处理数据。发飞书。
logger.log(logging.INFO, f"开始处理数据")
parse_data()
logger.log(logging.INFO, f"开始清空字典")
voice.clear()
intent_analysis.clear()
backend.clear()
ha.clear()
no_module_name.clear()
unity.clear()
logger.log(logging.INFO, f"字典清空")

else:
no_module_name[module_name + event_name] = timestamp
return jsonify(success=True) # 返回一个空的 JSON 响应


def parse_data():
# 对每个字典按时间戳进行排序
sorted_unity = dict(sorted(unity.items(), key=lambda item: float(item[1])))
sorted_voice = dict(sorted(voice.items(), key=lambda item: float(item[1])))
sorted_intent_analysis = dict(sorted(intent_analysis.items(), key=lambda item: float(item[1])))
sorted_backend = dict(sorted(backend.items(), key=lambda item: float(item[1])))
sorted_ha = dict(sorted(ha.items(), key=lambda item: float(item[1])))
sorted_no_module_name = dict(sorted(no_module_name.items(), key=lambda item: float(item[1])))

if sorted_voice:
voice_message = calculate_time_difference(sorted_voice)
voice_message = f"**语音模块** \n{voice_message}"
send_post_request(voice_message)
if sorted_intent_analysis:
intent_analysis_message = calculate_time_difference(sorted_intent_analysis)
intent_analysis_message = f"**意图分析模块** \n{intent_analysis_message}"
send_post_request(intent_analysis_message)
if sorted_backend:
# 遍历 sorted_backend 字典
updated_sorted_backend = {
iot_device.get(event_key.split('_')[0], event_key.split('_')[0]) + (
"_end" if "_end" in event_key else ""): value
for event_key, value in sorted_backend.items()
}
backend_message = calculate_time_difference(updated_sorted_backend)
if backend_message is not None:
backend_message = f"**后端模块** \n{backend_message}"
send_post_request(backend_message)
if sorted_ha:
ha_message = calculate_time_difference(sorted_ha)
ha_message = f"**HA 模块** \n{ha_message}"
send_post_request(ha_message)
if sorted_no_module_name:
no_module_name_message = calculate_time_difference(sorted_no_module_name)
no_module_name_message = f"**未命名模块** \n{no_module_name_message}"
send_post_request(no_module_name_message)
if sorted_unity:
unity_message = calculate_time_difference(sorted_unity)
unity_message = (f"**Unity 模块** \n{unity_message}")
send_post_request(unity_message)


def calculate_time_difference(sorted_dict):
total_time = 0
keys = list(sorted_dict.keys())
time_differences = []
for i in range(len(keys) - 1):
key1 = keys[i]
key2 = keys[i + 1]
timestamp1 = Decimal(sorted_dict[key1])
timestamp2 = Decimal(sorted_dict[key2])
time_difference = timestamp2 - timestamp1
time_differences.append(f"{key1}{key2} 用时 {str(time_difference)} 毫秒")
if key1 == "wakeup" or key2 == "wakeup":
continue
total_time = total_time + time_difference
if not time_differences:
return None
return "\n".join(time_differences) + "\n总时间:" + str(total_time/1000).rstrip('0').rstrip('.') + " 秒"


def send_post_request(message):
url = "https://open.feishu.cn/open-apis/bot/v2/hook/ed0fb5e2-cea4-4473-8b99-9e5bc3662b00"

# 设置请求头
headers = {
"Content-Type": "application/json"
}

# 构建请求体
payload = {
"msg_type": "text",
"content": {
"text": f"{message}"
}
}

# 将 payload 转换为 JSON 格式
payload_json = json.dumps(payload)

# 发送 POST 请求
response = requests.post(url, headers=headers, data=payload_json)

# 检查响应状态码
if response.status_code == 200:
print(response.text)
print("POST 请求成功")
else:
print(f"POST 请求失败,状态码:{response.status_code}")
print(response.text)


if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001, debug=True)