2025-03-27 02:34PM
实现方法:
async def send_result(update: Update, result: str):
"""处理并发送结果"""
try:
if not result:
return await send_message(update, "❌ 未找到匹配结果")
# 解析 JSON 结果
try:
results = json.loads(result)
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}, 原始数据: {result}")
return await send_message(update, "⚠️ 搜索结果格式错误")
result_count = len(results)
if result_count == 0:
return await send_message(update, "❌ 未找到匹配结果")
# 发送结果数量提示
await send_message(update, f"🔍 找到 {result_count} 条匹配结果")
# 格式化结果
formatted_results = []
for item in results:
source_data = item['data']
formatted_result = (
f"索引: {item.get('index', 'unknown')}\n"
f"数据: {json.dumps(source_data, ensure_ascii=False, indent=2)}\n"
f"{'-' * 40}"
)
formatted_results.append(formatted_result)
# 生成带时间戳的文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"search_result_{timestamp}.txt"
# 将结果写入临时文件
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') as f:
f.write("\n".join(formatted_results))
temp_file = f.name
# 发送文件
with open(temp_file, 'rb') as f:
await send_document(update, f, filename)
# 清理临时文件
try:
os.unlink(temp_file)
except Exception as e:
logger.error(f"删除临时文件出错: {e}")
except Exception as e:
logger.error(f"处理结果时出错: {e}")
await send_message(update, f"⚠️ 处理结果时出错:{str(e)}")
代码解析:
1. 临时文件创建
with tempfile.NamedTemporaryFile(
mode='w',
delete=False,
suffix='.txt',
encoding='utf-8'
) as f:
f.write("\n".join(formatted_results)) # 写入格式化结果
temp_file = f.name # 获取临时文件路径
delete=False
:避免文件自动删除,需手动清理
唯一文件名:通过时间戳search_result_20240101_123456.txt
防重复
2. 文件发送机制
with open(temp_file, 'rb') as f:
await send_document(update, f, filename) # 发送二进制文件流
使用Telegram API的reply_document
方法发送文件
支持多消息类型:原始消息/编辑消息/CallbackQuery
3. 资源清理
os.unlink(temp_file) # 显式删除临时文件
发送完成后立即清理服务器存储
异常处理:捕获删除错误并记录日志
登录
请登录后再发表评论。
评论列表:
目前还没有人发表评论