# rt **Repository Path**: rex-cn-abc/rt ## Basic Information - **Project Name**: rt - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-25 - **Last Updated**: 2025-09-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 实时日志 ## 【需求】 1、使用fastapi 模拟一个post请求接口 2、该接口的功能是前端传入仓库https地址,用户名、密码、分支以及本地的存储路径拉取远端仓库代码,要求每一次调用生成单独的日志文件,且过程日志需要实时显示在前端 3、前端使用vue3框架,使用组合式、setup、javascript,要输入仓库https地址,用户名、密码、分支以及本地的存储路径,点击提交按钮后传给后端接口调用,过程日志需要实时显示 --- ## 【实现】 ### **方法1** ```python # main.py # 导入操作系统接口模块,用于文件和目录操作 import os # 导入日志记录模块 import logging # 导入异步IO模块,用于处理异步操作 import asyncio # 从fastapi框架导入核心类和方法 from fastapi import FastAPI, HTTPException # 导入CORS中间件,用于处理跨域请求 from fastapi.middleware.cors import CORSMiddleware # 导入流式响应类,用于实时数据传输 from fastapi.responses import StreamingResponse # 导入数据验证基类 from pydantic import BaseModel # 导入子进程管理模块,用于执行外部命令 import subprocess # 导入UUID生成模块,用于生成唯一标识符 import uuid # 从datetime模块导入datetime类,用于时间处理 from datetime import datetime # 导入多线程模块 import threading # 从queue模块导入队列相关类 from queue import Queue, Empty # 导入类型提示相关模块 from typing import Dict # 导入时间模块 import time # 创建FastAPI应用实例,设置应用标题 app = FastAPI(title="Git Repository Manager") # CORS配置:添加跨域资源共享中间件 app.add_middleware( # 使用CORS中间件类 CORSMiddleware, # 允许所有来源的请求 allow_origins=["*"], # 允许携带凭证(如cookies) allow_credentials=True, # 允许所有HTTP方法 allow_methods=["*"], # 允许所有HTTP头 allow_headers=["*"], ) # 存储日志队列的字典:键为请求ID,值为队列对象 log_queues: Dict[str, Queue] = {} # 定义Git克隆请求的数据模型,使用Pydantic进行数据验证 class GitCloneRequest(BaseModel): # 仓库URL地址,字符串类型,必需字段 repoUrl: str # 用户名,字符串类型,必需字段 username: str # 密码或token,字符串类型,必需字段 password: str # 分支名称,字符串类型,默认值为"main" branch: str = "main" # 本地存储路径,字符串类型,必需字段 localPath: str # 定义执行Git命令的函数 def run_git_command(request_id: str, repoUrl: str, username: str, password: str, branch: str, localPath: str): """执行Git命令并实时记录日志""" # 根据请求ID获取对应的日志队列 log_queue = log_queues[request_id] # 使用try-except块捕获可能出现的异常 try: # 构造带认证的URL:将用户名和密码嵌入到URL中 # 检查URL是否以https://开头 if repoUrl.startswith("https://"): # 替换URL前缀,添加认证信息 authenticated_url = repoUrl.replace( "https://", f"https://{username}:{password}@" ) else: # 如果URL没有协议前缀,直接添加认证信息 authenticated_url = f"https://{username}:{password}@{repoUrl}" # 记录开始操作的日志消息 log_message(log_queue, "开始拉取仓库...") # 记录仓库URL信息(不包含密码) log_message(log_queue, f"仓库URL: {repoUrl}") # 记录分支信息 log_message(log_queue, f"分支: {branch}") # 记录本地路径信息 log_message(log_queue, f"本地路径: {localPath}") # 确保本地目录存在:如果目录不存在则创建 os.makedirs(localPath, exist_ok=True) # 检查是否已经是Git仓库:判断.git目录是否存在 git_dir = os.path.join(localPath, '.git') if os.path.exists(git_dir): # 如果是现有仓库,记录相应日志 log_message(log_queue, "检测到现有Git仓库,尝试拉取最新代码...") # 切换到指定分支并拉取:定义需要执行的命令序列 commands = [ # 获取远程仓库的最新信息 ['git', '-C', localPath, 'fetch', 'origin'], # 切换到指定分支 ['git', '-C', localPath, 'checkout', branch], # 拉取指定分支的最新代码 ['git', '-C', localPath, 'pull', 'origin', branch] ] else: # 如果是新仓库,记录克隆日志 log_message(log_queue, "克隆新仓库...") # 克隆仓库:只执行克隆命令 commands = [ # 克隆指定分支的仓库到本地路径 ['git', 'clone', '-b', branch, authenticated_url, localPath] ] # 遍历所有需要执行的命令 for i, cmd in enumerate(commands): # 记录当前执行的命令 log_message(log_queue, f"执行命令: {' '.join(cmd)}") # 使用subprocess创建子进程执行命令 process = subprocess.Popen( # 命令参数列表 cmd, # 重定向标准输出到管道 stdout=subprocess.PIPE, # 将标准错误重定向到标准输出 stderr=subprocess.STDOUT, # 以文本模式处理输出 text=True, # 设置编码为UTF-8 encoding='utf-8', # 设置编码错误处理方式 errors='replace', # 设置行缓冲,实现实时输出 bufsize=1, # 启用通用换行符支持 universal_newlines=True ) # 实时读取命令输出 while True: # 读取一行输出 output = process.stdout.readline() # 如果输出为空且进程已结束,退出循环 if output == '' and process.poll() is not None: break # 如果有输出内容 if output: # 清理输出中的敏感信息(密码) clean_output = output.replace(password, '***') # 记录清理后的输出内容 log_message(log_queue, clean_output.strip()) # 获取命令的返回码 return_code = process.poll() # 检查命令是否执行成功(返回码0表示成功) if return_code != 0: # 记录执行失败信息 log_message(log_queue, f"命令执行失败,返回码: {return_code}") # 记录警告信息,但继续执行 log_message(log_queue, "⚠️ 命令执行有警告,但继续处理...") # 所有命令执行完成,记录成功日志 log_message(log_queue, "✅ 操作完成!") # 捕获所有异常 except Exception as e: # 构造错误消息 error_msg = f"❌ 错误: {str(e)}" # 记录错误信息 log_message(log_queue, error_msg) # 无论是否发生异常都会执行的代码块 finally: # 在队列中放入结束标记 log_queue.put("EOF") # 结束标记 # 定义日志消息记录函数 def log_message(log_queue: Queue, message: str): """记录带时间戳的消息""" # 获取当前时间并格式化为时分秒 timestamp = datetime.now().strftime('%H:%M:%S') # 构造带时间戳的格式化消息 formatted_message = f"[{timestamp}] {message}\n" # 将消息放入日志队列 log_queue.put(formatted_message) # 同时在控制台输出消息(便于调试) print(formatted_message, end='') # 同时在控制台输出 # 定义克隆仓库的API端点(POST请求) @app.post("/api/clone-repo") async def clone_repository(request: GitCloneRequest): """克隆或更新Git仓库""" try: # 生成唯一的请求ID request_id = str(uuid.uuid4()) # 为当前请求创建新的日志队列 log_queues[request_id] = Queue() # 验证输入数据:检查仓库URL是否为空 if not request.repoUrl: # 抛出HTTP异常,状态码400(客户端错误) raise HTTPException(status_code=400, detail="仓库URL不能为空") # 验证输入数据:检查本地路径是否为空 if not request.localPath: raise HTTPException(status_code=400, detail="本地路径不能为空") # 创建单独的日志文件:定义日志目录 log_dir = "logs" # 创建日志目录(如果不存在) os.makedirs(log_dir, exist_ok=True) # 构造日志文件名(包含请求ID) log_filename = f"{log_dir}/git_operation_{request_id}.log" # 在后台线程中执行Git操作:创建新线程 thread = threading.Thread( # 设置线程目标函数 target=run_git_command, # 传递函数参数 args=(request_id, request.repoUrl, request.username, request.password, request.branch, request.localPath) ) # 设置线程为守护线程(主程序退出时自动结束) thread.daemon = True # 启动线程 thread.start() # 返回成功响应 return { "request_id": request_id, "status": "started", "message": "任务已开始执行" } # 捕获处理过程中可能出现的异常 except Exception as e: # 抛出HTTP异常,状态码500(服务器错误) raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") # 定义获取实时日志的API端点(GET请求) @app.get("/api/logs/{request_id}") async def get_logs(request_id: str): """获取实时日志流""" # 检查请求ID是否存在 if request_id not in log_queues: # 如果不存在,返回404错误 raise HTTPException(status_code=404, detail="Request ID not found") # 定义异步生成器函数,用于流式传输日志 async def log_generator(): # 获取对应请求ID的日志队列 log_queue = log_queues[request_id] # 构造日志文件名 log_filename = f"logs/git_operation_{request_id}.log" # 确保日志目录存在 os.makedirs("logs", exist_ok=True) # 打开日志文件进行写入 with open(log_filename, 'w', encoding='utf-8') as log_file: # 记录最后一次活动时间 last_activity = time.time() # 持续循环直到任务完成 while True: try: # 设置较短超时时间以便快速响应(1秒) log_line = log_queue.get(timeout=1.0) # 检查是否是结束标记 if log_line == "EOF": # 发送完成消息 yield f"data: [完成] 操作已完成\n\n" # 退出循环 break # 写入日志文件:将日志行写入文件 log_file.write(log_line) # 刷新文件缓冲区,确保数据写入磁盘 log_file.flush() # 发送到前端 (SSE格式):按照Server-Sent Events格式发送 yield f"data: {log_line}\n\n" # 更新最后活动时间 last_activity = time.time() # 捕获队列为空异常(超时) except Empty: # 检查是否超时(5分钟无活动) if time.time() - last_activity > 300: # 5分钟超时 # 发送超时消息 yield "data: [超时] 连接超时\n\n" # 退出循环 break # 发送心跳保持连接:空消息用于保持连接活跃 yield "data: \n\n" # 空心跳 # 捕获其他异常 except Exception as e: # 发送错误消息 yield f"data: [错误] {str(e)}\n\n" # 退出循环 break # 返回流式响应 return StreamingResponse( # 使用日志生成器 log_generator(), # 设置媒体类型为事件流 media_type="text/event-stream", # 设置响应头 headers={ # 禁止缓存 "Cache-Control": "no-cache", # 保持连接 "Connection": "keep-alive", # 允许所有来源的跨域请求 "Access-Control-Allow-Origin": "*", # 暴露所有头信息 "Access-Control-Expose-Headers": "*", } ) # 定义获取任务状态的API端点 @app.get("/api/status/{request_id}") async def get_status(request_id: str): """获取任务状态""" # 检查请求ID是否存在 if request_id not in log_queues: # 如果不存在,返回未找到状态 return {"status": "not_found"} # 返回处理中状态 return {"status": "processing"} # 定义健康检查API端点 @app.get("/api/health") async def health_check(): """健康检查接口""" # 返回健康状态信息 return {"status": "healthy", "message": "服务正常运行"} # 定义根路径API端点 @app.get("/") async def root(): # 返回简单的欢迎消息 return {"message": "Git Repository Manager API"} # 主程序入口 if __name__ == "__main__": # 导入uvicorn服务器 import uvicorn # 运行FastAPI应用 uvicorn.run( app, host="0.0.0.0", # 监听所有网络接口 port=8000, # 使用8000端口 log_level="info" # 设置日志级别为info ) ``` #### 代码结构总结 这个代码实现了一个完整的Git仓库管理后端服务,主要包含以下功能: 1. **API接口**:提供克隆仓库、获取日志、检查状态等RESTful接口 2. **异步处理**:使用多线程处理耗时的Git操作,避免阻塞主线程 3. **实时日志**:通过Server-Sent Events实现实时日志推送 4. **错误处理**:完善的异常捕获和处理机制 5. **数据验证**:使用Pydantic进行请求数据验证 6. **跨域支持**:配置CORS中间件支持前端跨域访问 代码采用了清晰的分层结构,将业务逻辑、API接口和工具函数分离,便于维护和扩展。 ‍ 以下是逐行添加注释的代码: ```html Git仓库管理器

🚀 Git仓库管理器

克隆或更新远程Git仓库到本地目录

仓库配置

{{ errorMessage }}
{{ successMessage }}

实时日志

等待操作开始...
{{ log }}
``` #### 代码功能总结 这个前端页面实现了一个完整的Git仓库管理界面,主要功能包括: 1. **响应式布局**:使用CSS Grid实现自适应布局,支持移动端 2. **表单处理**:收集用户输入的Git仓库信息 3. **实时日志**:通过Server-Sent Events实时显示后端操作日志 4. **状态管理**:显示处理状态、错误信息和成功消息 5. **用户体验**:包含加载状态、动画效果和友好的交互提示 页面采用了现代化的Vue3组合式API,代码结构清晰,易于维护和扩展。 ‍ ‍ --- ### **方法2** 我来为你抽取一个公共的日志模块,并添加颜色显示功能。 我来为您抽取日志公共模块,创建一个独立的日志管理类。以下是重构后的代码: #### 1. 创建日志管理模块 (log_manager.py) ```python # log_manager.py import os import logging import uuid from datetime import datetime from queue import Queue, Empty from typing import Dict, Generator import time import threading import asyncio class LogManager: """日志管理器,用于处理实时日志和文件日志""" def __init__(self, log_dir: str = "logs"): self.log_dir = log_dir self.log_queues: Dict[str, Queue] = {} self.log_files: Dict[str, str] = {} self.active_connections: Dict[str, bool] = {} self.completed_requests: set = set() self._setup_logging() def _setup_logging(self): """设置日志目录""" try: if not os.path.exists(self.log_dir): os.makedirs(self.log_dir, exist_ok=True) except Exception as e: print(f"设置日志目录失败: {e}") import tempfile self.log_dir = tempfile.gettempdir() def create_request_logger(self, request_id: str = None) -> str: """为每个请求创建独立的日志记录器""" if request_id is None: request_id = str(uuid.uuid4()) # 清理可能存在的旧队列 if request_id in self.log_queues: self._cleanup_request(request_id) self.log_queues[request_id] = Queue() self.active_connections[request_id] = False log_filename = f"git_operation_{request_id}.log" log_filepath = os.path.join(self.log_dir, log_filename) self.log_files[request_id] = log_filepath try: os.makedirs(self.log_dir, exist_ok=True) with open(log_filepath, 'w', encoding='utf-8') as log_file: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') start_info = f"=== Git操作日志 - 请求ID: {request_id} ===\n" start_info += f"=== 开始时间: {timestamp} ===\n\n" log_file.write(start_info) log_file.flush() except Exception as e: print(f"创建日志文件失败: {e}") return request_id def log_message(self, request_id: str, message: str, level: str = "INFO"): """记录带时间戳的日志消息""" if request_id not in self.log_queues: print(f"警告: 请求ID {request_id} 的日志队列不存在") return timestamp = datetime.now().strftime('%H:%M:%S') formatted_message = f"[{timestamp}] [{level}] {message}" # 写入文件日志 file_message = formatted_message + "\n" self._write_to_file(request_id, file_message) # 只有在有活跃连接时才放入实时队列 if self.active_connections.get(request_id, False): try: self.log_queues[request_id].put(formatted_message, timeout=0.1) print(f"已放入队列: {message[:50]}...") # 调试信息 except Exception as e: print(f"放入队列失败: {e}") print(file_message, end='') def _write_to_file(self, request_id: str, message: str): """写入日志文件""" if request_id not in self.log_files: return log_filepath = self.log_files[request_id] try: with open(log_filepath, 'a', encoding='utf-8') as log_file: log_file.write(message) log_file.flush() except Exception as e: print(f"写入日志文件失败: {e}") def mark_complete(self, request_id: str): """标记请求完成 - 确保完成标记被正确发送""" if request_id not in self.log_queues: print(f"警告: 无法标记完成,请求ID {request_id} 不存在") return print(f"标记任务完成: {request_id}") # 写入完成信息到文件 completion_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') completion_msg = f"\n=== 操作完成时间: {completion_time} ===\n" self._write_to_file(request_id, completion_msg) # 标记为已完成 self.completed_requests.add(request_id) # 发送完成标记到队列(确保发送) if self.active_connections.get(request_id, False): try: # 清空队列中可能剩余的消息 self._clear_queue(request_id) # 发送完成标记 self.log_queues[request_id].put("EOF", timeout=1.0) print(f"已发送完成标记到队列: {request_id}") except Exception as e: print(f"发送完成标记失败: {e}") # 如果发送失败,尝试直接关闭连接 self.active_connections[request_id] = False else: print(f"没有活跃连接,无需发送完成标记: {request_id}") # 无论是否有连接,都标记为非活跃 self.active_connections[request_id] = False def _clear_queue(self, request_id: str): """清空队列中的剩余消息""" if request_id not in self.log_queues: return queue = self.log_queues[request_id] cleared_count = 0 try: while True: queue.get_nowait() cleared_count += 1 except Empty: pass if cleared_count > 0: print(f"清空了 {cleared_count} 条剩余消息") async def get_log_generator(self, request_id: str) -> Generator[str, None, None]: """获取实时日志生成器""" print(f"开始处理日志流请求: {request_id}") # 检查请求ID有效性 if not request_id: yield "data: [错误] 无效的请求ID\n\n" return # 检查是否是已完成的任务 if request_id in self.completed_requests: print(f"处理历史任务: {request_id}") yield "data: [信息] 加载历史任务日志\n\n" await asyncio.sleep(0.1) log_content = self.get_log_content(request_id) if log_content != "日志文件不存在": lines = log_content.split('\n') for line in lines: if line.strip(): yield f"data: {line}\n\n" await asyncio.sleep(0.01) yield "data: [完成] 历史日志显示完毕\n\n" else: yield "data: [错误] 找不到该任务的日志文件\n\n" return # 检查是否是活跃任务 if request_id not in self.log_queues: yield "data: [错误] 请求ID未找到或任务已过期\n\n" return # 实时任务处理 self.active_connections[request_id] = True log_queue = self.log_queues[request_id] print(f"开始实时日志流: {request_id}") try: # 发送开始消息 yield "data: [开始] 开始实时日志流\n\n" await asyncio.sleep(0.1) # 实时流处理 last_activity = time.time() eof_received = False while not eof_received: try: # 使用较短的超时时间 log_line = log_queue.get(timeout=1.0) if log_line == "EOF": print(f"收到完成标记: {request_id}") yield "data: [完成] 操作已完成\n\n" eof_received = True break yield f"data: {log_line}\n\n" last_activity = time.time() except Empty: # 检查超时(2分钟无活动) if time.time() - last_activity > 120: yield "data: [超时] 连接超时\n\n" break # 检查任务是否已完成但未收到EOF if request_id in self.completed_requests: print(f"检测到已完成但未收到EOF的任务: {request_id}") yield "data: [完成] 操作已完成(检测到完成状态)\n\n" eof_received = True break # 发送心跳保持连接 yield "data: \n\n" await asyncio.sleep(0.1) except Exception as e: print(f"日志流异常: {e}") yield f"data: [错误] 流处理异常: {str(e)}\n\n" break # 检查连接是否仍然活跃 if not self.active_connections.get(request_id, False): yield "data: [信息] 连接已关闭\n\n" break except Exception as e: print(f"日志生成器异常: {e}") yield f"data: [错误] 生成器异常: {str(e)}\n\n" finally: # 清理连接状态 self.active_connections[request_id] = False print(f"结束日志流: {request_id}") # 如果收到EOF,清理资源 if eof_received: self._cleanup_request(request_id) def get_request_status(self, request_id: str) -> Dict: """获取请求状态""" if request_id in self.completed_requests: return {"status": "completed", "message": "任务已完成"} if request_id not in self.log_queues: log_file = self._get_log_file_path(request_id) if log_file and os.path.exists(log_file): self.completed_requests.add(request_id) return {"status": "completed", "message": "历史任务"} return {"status": "not_found", "message": "请求ID不存在"} if self.active_connections.get(request_id, False): return {"status": "streaming", "message": "实时流传输中"} return {"status": "processing", "message": "任务处理中"} def _get_log_file_path(self, request_id: str) -> str: """获取日志文件路径""" if request_id in self.log_files: return self.log_files[request_id] else: return os.path.join(self.log_dir, f"git_operation_{request_id}.log") def _cleanup_request(self, request_id: str): """清理请求资源""" if request_id in self.log_queues: try: # 清空队列 while True: self.log_queues[request_id].get_nowait() except Empty: pass # 移除引用 if request_id in self.log_queues: del self.log_queues[request_id] if request_id in self.active_connections: del self.active_connections[request_id] if request_id in self.completed_requests: self.completed_requests.remove(request_id) print(f"已清理请求资源: {request_id}") def get_log_content(self, request_id: str) -> str: """获取完整的日志内容""" log_file = self._get_log_file_path(request_id) if log_file and os.path.exists(log_file): try: with open(log_file, 'r', encoding='utf-8') as f: return f.read() except Exception as e: return f"读取日志失败: {str(e)}" else: return "日志文件不存在" def get_all_requests(self) -> Dict[str, Dict]: """获取所有请求的状态信息""" result = {} # 添加当前活跃的请求 for request_id in list(self.log_queues.keys()): result[request_id] = self.get_request_status(request_id) # 扫描日志目录查找历史请求 try: if os.path.exists(self.log_dir): for filename in os.listdir(self.log_dir): if filename.startswith("git_operation_") and filename.endswith(".log"): request_id = filename[14:-4] if request_id not in result: self.completed_requests.add(request_id) result[request_id] = {"status": "completed", "message": "历史任务"} except Exception as e: print(f"扫描日志目录失败: {e}") return result # 创建全局日志管理器实例 log_manager = LogManager() ``` #### 2. 重构后的主程序 (main.py) ```python # main.py import os import asyncio import subprocess import threading from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel from log_manager import log_manager # 创建FastAPI应用实例 app = FastAPI(title="Git Repository Manager") # CORS配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 定义Git克隆请求的数据模型 class GitCloneRequest(BaseModel): repoUrl: str username: str password: str branch: str = "main" localPath: str # 在 main.py 中修复 run_git_command 函数 def run_git_command(request_id: str, repoUrl: str, username: str, password: str, branch: str, localPath: str): """执行Git命令并实时记录日志""" try: # 构造带认证的URL if repoUrl.startswith("https://"): authenticated_url = repoUrl.replace( "https://", f"https://{username}:{password}@" ) else: authenticated_url = f"https://{username}:{password}@{repoUrl}" log_manager.log_message(request_id, "开始拉取仓库...") log_manager.log_message(request_id, f"仓库URL: {repoUrl}") log_manager.log_message(request_id, f"分支: {branch}") log_manager.log_message(request_id, f"本地路径: {localPath}") # 确保本地目录存在 os.makedirs(localPath, exist_ok=True) # 检查是否已经是Git仓库 git_dir = os.path.join(localPath, '.git') if os.path.exists(git_dir): log_manager.log_message(request_id, "检测到现有Git仓库,尝试拉取最新代码...") commands = [ ['git', '-C', localPath, 'fetch', 'origin'], ['git', '-C', localPath, 'checkout', branch], ['git', '-C', localPath, 'pull', 'origin', branch] ] else: log_manager.log_message(request_id, "克隆新仓库...") commands = [ ['git', 'clone', '-b', branch, authenticated_url, localPath] ] all_success = True for i, cmd in enumerate(commands): log_manager.log_message(request_id, f"执行命令: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace', bufsize=1, universal_newlines=True ) # 实时读取输出 while True: output = process.stdout.readline() if output == '' and process.poll() is not None: break if output: clean_output = output.replace(password, '***') log_manager.log_message(request_id, clean_output.strip()) return_code = process.poll() if return_code != 0: log_manager.log_message(request_id, f"命令执行失败,返回码: {return_code}", "WARNING") all_success = False # 不立即退出,继续执行后续命令 log_manager.log_message(request_id, "继续执行后续操作...", "WARNING") if all_success: log_manager.log_message(request_id, "✅ 操作完成!", "SUCCESS") else: log_manager.log_message(request_id, "⚠️ 操作完成,但有警告", "WARNING") except Exception as e: error_msg = f"❌ 错误: {str(e)}" log_manager.log_message(request_id, error_msg, "ERROR") finally: # 确保标记完成被调用 log_manager.mark_complete(request_id) print(f"Git命令执行完成,已标记请求 {request_id} 为完成状态") @app.post("/api/clone-repo") async def clone_repository(request: GitCloneRequest): """克隆或更新Git仓库""" try: # 验证输入数据 if not request.repoUrl: raise HTTPException(status_code=400, detail="仓库URL不能为空") if not request.localPath: raise HTTPException(status_code=400, detail="本地路径不能为空") # 为每个请求创建独立的日志记录器 request_id = log_manager.create_request_logger() # 在后台线程中执行Git操作 thread = threading.Thread( target=run_git_command, args=(request_id, request.repoUrl, request.username, request.password, request.branch, request.localPath) ) thread.daemon = True thread.start() return { "request_id": request_id, "status": "started", "message": "任务已开始执行", "log_url": f"/api/logs/{request_id}", "log_file": f"logs/git_operation_{request_id}.log" } except Exception as e: raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") @app.get("/api/logs/{request_id}") async def get_logs(request_id: str): """获取实时日志流""" return StreamingResponse( log_manager.get_log_generator(request_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Expose-Headers": "*", "X-Accel-Buffering": "no" # 禁用Nginx缓冲 } ) @app.get("/api/status/{request_id}") async def get_status(request_id: str): """获取任务状态""" return log_manager.get_request_status(request_id) @app.get("/api/logs/{request_id}/content") async def get_log_content(request_id: str): """获取完整的日志内容""" content = log_manager.get_log_content(request_id) return { "request_id": request_id, "log_content": content, "has_log": content != "日志文件不存在" } @app.get("/api/requests") async def get_all_requests(): """获取所有请求的状态""" return log_manager.get_all_requests() @app.get("/api/health") async def health_check(): """健康检查接口""" return {"status": "healthy", "message": "服务正常运行"} @app.get("/") async def root(): return {"message": "Git Repository Manager API"} if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=8000, log_level="info" ) ``` #### 3. 主要改进点 1. **模块化设计**: 将日志功能抽离为独立的 `LogManager` 类 2. **统一的日志管理**: 提供统一的接口处理实时日志和文件日志 3. **资源管理**: 自动清理完成的请求资源 4. **扩展性**: 易于添加新的日志功能(如日志级别、日志轮转等) 5. **错误处理**: 完善的异常处理机制 6. **状态管理**: 提供请求状态查询功能 7. **日志检索**: 新增获取完整日志内容的接口 #### 4. 使用示例 前端可以通过以下方式使用: ```html Git仓库管理器

Git仓库管理器

克隆、拉取和监控Git仓库操作

Git仓库配置

历史请求

实时日志

等待操作开始...
``` 这样的设计使得日志管理更加清晰,便于维护和扩展。 ‍ ### **方法3** 我将把实时日志展示从 SSE 改为 WebSockets。以下是完整的实现: 1. WebSocket 连接管理器 (websocket_manager.py) ```python # websocket_manager.py import asyncio import json from typing import Dict, Set from fastapi import WebSocket, WebSocketDisconnect import logging class WebSocketManager: """WebSocket 连接管理器""" def __init__(self): self.active_connections: Dict[str, WebSocket] = {} self.connection_tasks: Dict[str, asyncio.Task] = {} self.logger = logging.getLogger(__name__) async def connect(self, websocket: WebSocket, request_id: str): """接受 WebSocket 连接""" await websocket.accept() self.active_connections[request_id] = websocket self.logger.info(f"WebSocket 连接已建立: {request_id}") # 发送连接确认消息 await self.send_message(request_id, { "type": "connection", "status": "connected", "message": "WebSocket 连接成功" }) def disconnect(self, request_id: str): """断开 WebSocket 连接""" if request_id in self.active_connections: del self.active_connections[request_id] self.logger.info(f"WebSocket 连接已断开: {request_id}") # 取消相关任务 if request_id in self.connection_tasks: self.connection_tasks[request_id].cancel() del self.connection_tasks[request_id] async def send_message(self, request_id: str, message: dict): """发送消息到指定客户端""" if request_id in self.active_connections: try: await self.active_connections[request_id].send_json(message) except Exception as e: self.logger.error(f"发送消息失败 {request_id}: {e}") self.disconnect(request_id) async def broadcast(self, message: dict): """广播消息到所有客户端""" disconnected_ids = [] for request_id, websocket in self.active_connections.items(): try: await websocket.send_json(message) except Exception as e: self.logger.error(f"广播消息失败 {request_id}: {e}") disconnected_ids.append(request_id) # 清理断开连接的客户端 for request_id in disconnected_ids: self.disconnect(request_id) def is_connected(self, request_id: str) -> bool: """检查连接是否活跃""" return request_id in self.active_connections async def handle_client_messages(self, websocket: WebSocket, request_id: str): """处理客户端消息""" try: while True: # 接收客户端消息 data = await websocket.receive_text() try: message = json.loads(data) await self.process_client_message(request_id, message) except json.JSONDecodeError: await self.send_message(request_id, { "type": "error", "message": "无效的 JSON 格式" }) except WebSocketDisconnect: self.logger.info(f"客户端主动断开连接: {request_id}") except Exception as e: self.logger.error(f"处理客户端消息错误 {request_id}: {e}") finally: self.disconnect(request_id) async def process_client_message(self, request_id: str, message: dict): """处理客户端消息""" msg_type = message.get("type") if msg_type == "ping": # 心跳响应 await self.send_message(request_id, { "type": "pong", "timestamp": message.get("timestamp") }) elif msg_type == "control": # 控制命令 command = message.get("command") if command == "pause": await self.send_message(request_id, { "type": "control", "status": "paused", "message": "日志传输已暂停" }) elif command == "resume": await self.send_message(request_id, { "type": "control", "status": "resumed", "message": "日志传输已恢复" }) elif msg_type == "status": # 状态查询 await self.send_message(request_id, { "type": "status", "connected": True, "request_id": request_id }) else: await self.send_message(request_id, { "type": "error", "message": f"未知的消息类型: {msg_type}" }) # 全局 WebSocket 管理器实例 websocket_manager = WebSocketManager() ``` 2. 更新日志管理模块 (log_manager.py) ```python # log_manager.py import os import logging import uuid from datetime import datetime from queue import Queue, Empty from typing import Dict import time import asyncio from websocket_manager import websocket_manager class LogManager: """日志管理器,集成 WebSocket 支持""" def __init__(self, log_dir: str = "logs"): self.log_dir = log_dir self.log_queues: Dict[str, Queue] = {} self.log_files: Dict[str, str] = {} self.completed_requests: set = set() self.logger = logging.getLogger(__name__) self._setup_logging() def _setup_logging(self): """设置日志目录""" try: if not os.path.exists(self.log_dir): os.makedirs(self.log_dir, exist_ok=True) except Exception as e: self.logger.error(f"设置日志目录失败: {e}") import tempfile self.log_dir = tempfile.gettempdir() def create_request_logger(self, request_id: str = None) -> str: """为每个请求创建独立的日志记录器""" if request_id is None: request_id = str(uuid.uuid4()) # 清理可能存在的旧队列 if request_id in self.log_queues: self._cleanup_request(request_id) self.log_queues[request_id] = Queue() log_filename = f"git_operation_{request_id}.log" log_filepath = os.path.join(self.log_dir, log_filename) self.log_files[request_id] = log_filepath try: os.makedirs(self.log_dir, exist_ok=True) with open(log_filepath, 'w', encoding='utf-8') as log_file: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') start_info = f"=== Git操作日志 - 请求ID: {request_id} ===\n" start_info += f"=== 开始时间: {timestamp} ===\n\n" log_file.write(start_info) log_file.flush() except Exception as e: self.logger.error(f"创建日志文件失败: {e}") return request_id def log_message(self, request_id: str, message: str, level: str = "INFO"): """记录带时间戳的日志消息""" if request_id not in self.log_queues: self.logger.warning(f"请求ID {request_id} 的日志队列不存在") return timestamp = datetime.now().strftime('%H:%M:%S') formatted_message = f"[{timestamp}] [{level}] {message}" # 写入文件日志 file_message = formatted_message + "\n" self._write_to_file(request_id, file_message) # 通过 WebSocket 实时发送 asyncio.create_task(self._send_websocket_message(request_id, { "type": "log", "level": level.lower(), "message": message, "timestamp": timestamp, "full_message": formatted_message })) print(file_message, end='') async def _send_websocket_message(self, request_id: str, message: dict): """通过 WebSocket 发送消息""" if websocket_manager.is_connected(request_id): try: await websocket_manager.send_message(request_id, message) except Exception as e: self.logger.error(f"WebSocket 发送失败 {request_id}: {e}") def _write_to_file(self, request_id: str, message: str): """写入日志文件""" if request_id not in self.log_files: return log_filepath = self.log_files[request_id] try: with open(log_filepath, 'a', encoding='utf-8') as log_file: log_file.write(message) log_file.flush() except Exception as e: self.logger.error(f"写入日志文件失败: {e}") def mark_complete(self, request_id: str): """标记请求完成""" if request_id not in self.log_queues: self.logger.warning(f"无法标记完成,请求ID {request_id} 不存在") return self.logger.info(f"标记任务完成: {request_id}") # 写入完成信息到文件 completion_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') completion_msg = f"\n=== 操作完成时间: {completion_time} ===\n" self._write_to_file(request_id, completion_msg) # 标记为已完成 self.completed_requests.add(request_id) # 通过 WebSocket 发送完成消息 asyncio.create_task(self._send_websocket_message(request_id, { "type": "completion", "status": "completed", "message": "操作已完成", "timestamp": completion_time })) async def stream_historical_logs(self, request_id: str): """流式传输历史日志""" if not websocket_manager.is_connected(request_id): return self.logger.info(f"开始流式传输历史日志: {request_id}") # 发送开始消息 await websocket_manager.send_message(request_id, { "type": "history_start", "message": "开始传输历史日志" }) log_content = self.get_log_content(request_id) if log_content != "日志文件不存在": lines = log_content.split('\n') for line in lines: if line.strip() and websocket_manager.is_connected(request_id): await websocket_manager.send_message(request_id, { "type": "log", "level": "info", "message": line, "timestamp": datetime.now().strftime('%H:%M:%S'), "historical": True }) await asyncio.sleep(0.01) # 避免发送过快 await websocket_manager.send_message(request_id, { "type": "history_complete", "message": "历史日志传输完成" }) else: await websocket_manager.send_message(request_id, { "type": "error", "message": "找不到该任务的日志文件" }) def get_request_status(self, request_id: str) -> Dict: """获取请求状态""" if request_id in self.completed_requests: return {"status": "completed", "message": "任务已完成"} if request_id not in self.log_queues: log_file = self._get_log_file_path(request_id) if log_file and os.path.exists(log_file): self.completed_requests.add(request_id) return {"status": "completed", "message": "历史任务"} return {"status": "not_found", "message": "请求ID不存在"} return {"status": "processing", "message": "任务处理中"} def _get_log_file_path(self, request_id: str) -> str: """获取日志文件路径""" if request_id in self.log_files: return self.log_files[request_id] else: return os.path.join(self.log_dir, f"git_operation_{request_id}.log") def _cleanup_request(self, request_id: str): """清理请求资源""" if request_id in self.log_queues: try: while True: self.log_queues[request_id].get_nowait() except Empty: pass if request_id in self.log_queues: del self.log_queues[request_id] if request_id in self.completed_requests: self.completed_requests.remove(request_id) def get_log_content(self, request_id: str) -> str: """获取完整的日志内容""" log_file = self._get_log_file_path(request_id) if log_file and os.path.exists(log_file): try: with open(log_file, 'r', encoding='utf-8') as f: return f.read() except Exception as e: return f"读取日志失败: {str(e)}" else: return "日志文件不存在" def get_all_requests(self) -> Dict[str, Dict]: """获取所有请求的状态信息""" result = {} # 添加当前活跃的请求 for request_id in list(self.log_queues.keys()): result[request_id] = self.get_request_status(request_id) # 扫描日志目录查找历史请求 try: if os.path.exists(self.log_dir): for filename in os.listdir(self.log_dir): if filename.startswith("git_operation_") and filename.endswith(".log"): request_id = filename[14:-4] if request_id not in result: self.completed_requests.add(request_id) result[request_id] = {"status": "completed", "message": "历史任务"} except Exception as e: self.logger.error(f"扫描日志目录失败: {e}") return result # 创建全局日志管理器实例 log_manager = LogManager() ``` 3. 更新主程序 (main.py) ```python # main.py import os import asyncio import subprocess import threading from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel from log_manager import log_manager from websocket_manager import websocket_manager # 创建FastAPI应用实例 app = FastAPI(title="Git Repository Manager") # CORS配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 定义Git克隆请求的数据模型 class GitCloneRequest(BaseModel): repoUrl: str username: str password: str branch: str = "main" localPath: str def run_git_command(request_id: str, repoUrl: str, username: str, password: str, branch: str, localPath: str): """执行Git命令并实时记录日志""" try: # 构造带认证的URL if repoUrl.startswith("https://"): authenticated_url = repoUrl.replace( "https://", f"https://{username}:{password}@" ) else: authenticated_url = f"https://{username}:{password}@{repoUrl}" log_manager.log_message(request_id, "开始拉取仓库...") log_manager.log_message(request_id, f"仓库URL: {repoUrl}") log_manager.log_message(request_id, f"分支: {branch}") log_manager.log_message(request_id, f"本地路径: {localPath}") # 确保本地目录存在 os.makedirs(localPath, exist_ok=True) # 检查是否已经是Git仓库 git_dir = os.path.join(localPath, '.git') if os.path.exists(git_dir): log_manager.log_message(request_id, "检测到现有Git仓库,尝试拉取最新代码...") commands = [ ['git', '-C', localPath, 'fetch', 'origin'], ['git', '-C', localPath, 'checkout', branch], ['git', '-C', localPath, 'pull', 'origin', branch] ] else: log_manager.log_message(request_id, "克隆新仓库...") commands = [ ['git', 'clone', '-b', branch, authenticated_url, localPath] ] all_success = True for i, cmd in enumerate(commands): log_manager.log_message(request_id, f"执行命令: {' '.join(cmd)}") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace', bufsize=1, universal_newlines=True ) # 实时读取输出 while True: output = process.stdout.readline() if output == '' and process.poll() is not None: break if output: clean_output = output.replace(password, '***') log_manager.log_message(request_id, clean_output.strip()) return_code = process.poll() if return_code != 0: log_manager.log_message(request_id, f"命令执行失败,返回码: {return_code}", "WARNING") all_success = False log_manager.log_message(request_id, "继续执行后续操作...", "WARNING") if all_success: log_manager.log_message(request_id, "✅ 操作完成!", "SUCCESS") else: log_manager.log_message(request_id, "⚠️ 操作完成,但有警告", "WARNING") except Exception as e: error_msg = f"❌ 错误: {str(e)}" log_manager.log_message(request_id, error_msg, "ERROR") finally: log_manager.mark_complete(request_id) @app.post("/api/clone-repo") async def clone_repository(request: GitCloneRequest): """克隆或更新Git仓库""" try: # 验证输入数据 if not request.repoUrl: raise HTTPException(status_code=400, detail="仓库URL不能为空") if not request.localPath: raise HTTPException(status_code=400, detail="本地路径不能为空") # 为每个请求创建独立的日志记录器 request_id = log_manager.create_request_logger() # 在后台线程中执行Git操作 thread = threading.Thread( target=run_git_command, args=(request_id, request.repoUrl, request.username, request.password, request.branch, request.localPath) ) thread.daemon = True thread.start() return { "request_id": request_id, "status": "started", "message": "任务已开始执行", "websocket_url": f"ws://localhost:8000/ws/logs/{request_id}" } except Exception as e: raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}") @app.websocket("/ws/logs/{request_id}") async def websocket_logs_endpoint(websocket: WebSocket, request_id: str): """WebSocket 日志端点""" await websocket_manager.connect(websocket, request_id) try: # 如果是历史任务,流式传输日志 if request_id in log_manager.completed_requests: await log_manager.stream_historical_logs(request_id) # 处理客户端消息 await websocket_manager.handle_client_messages(websocket, request_id) except WebSocketDisconnect: websocket_manager.disconnect(request_id) except Exception as e: print(f"WebSocket 错误: {e}") websocket_manager.disconnect(request_id) @app.get("/api/status/{request_id}") async def get_status(request_id: str): """获取任务状态""" return log_manager.get_request_status(request_id) @app.get("/api/logs/{request_id}/content") async def get_log_content(request_id: str): """获取完整的日志内容""" content = log_manager.get_log_content(request_id) return { "request_id": request_id, "log_content": content, "has_log": content != "日志文件不存在" } @app.get("/api/requests") async def get_all_requests(): """获取所有请求的状态""" return log_manager.get_all_requests() @app.get("/api/health") async def health_check(): """健康检查接口""" return {"status": "healthy", "message": "服务正常运行"} @app.get("/") async def root(): return {"message": "Git Repository Manager API"} if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=8000, log_level="info" ) ``` 4. 完整的前端代码 (index.html) ```html Git仓库管理器 - WebSocket版本

Git仓库管理器 - WebSocket版本

使用 WebSocket 实现实时日志监控

Git仓库配置

历史请求

实时日志 未连接

等待 WebSocket 连接...
``` 主要改进内容 1. 完整的 WebSocket 实现:替代了原来的 SSE 方案 2. 双向通信:支持客户端向服务器发送控制命令 3. 心跳机制:保持连接活跃,检测连接状态 4. 更好的错误处理:详细的连接状态监控 5. 历史任务支持:自动识别并流式传输历史日志 6. 控制功能:支持暂停、恢复等控制命令 7. 连接状态显示:实时显示 WebSocket 连接状态 现在系统使用 WebSocket 实现了真正的双向实时通信,提供了更好的用户体验和更强大的功能。 ‍ ‍ ‍