# rt
**Repository Path**: rex-cn-abc/rt
## Basic Information
- **Project Name**: rt
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-09-25
- **Last Updated**: 2025-09-25
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 实时日志
## 【需求】
1、使用fastapi 模拟一个post请求接口
2、该接口的功能是前端传入仓库https地址,用户名、密码、分支以及本地的存储路径拉取远端仓库代码,要求每一次调用生成单独的日志文件,且过程日志需要实时显示在前端
3、前端使用vue3框架,使用组合式、setup、javascript,要输入仓库https地址,用户名、密码、分支以及本地的存储路径,点击提交按钮后传给后端接口调用,过程日志需要实时显示
---
## 【实现】
### **方法1**
```python
# main.py
# 导入操作系统接口模块,用于文件和目录操作
import os
# 导入日志记录模块
import logging
# 导入异步IO模块,用于处理异步操作
import asyncio
# 从fastapi框架导入核心类和方法
from fastapi import FastAPI, HTTPException
# 导入CORS中间件,用于处理跨域请求
from fastapi.middleware.cors import CORSMiddleware
# 导入流式响应类,用于实时数据传输
from fastapi.responses import StreamingResponse
# 导入数据验证基类
from pydantic import BaseModel
# 导入子进程管理模块,用于执行外部命令
import subprocess
# 导入UUID生成模块,用于生成唯一标识符
import uuid
# 从datetime模块导入datetime类,用于时间处理
from datetime import datetime
# 导入多线程模块
import threading
# 从queue模块导入队列相关类
from queue import Queue, Empty
# 导入类型提示相关模块
from typing import Dict
# 导入时间模块
import time
# 创建FastAPI应用实例,设置应用标题
app = FastAPI(title="Git Repository Manager")
# CORS配置:添加跨域资源共享中间件
app.add_middleware(
# 使用CORS中间件类
CORSMiddleware,
# 允许所有来源的请求
allow_origins=["*"],
# 允许携带凭证(如cookies)
allow_credentials=True,
# 允许所有HTTP方法
allow_methods=["*"],
# 允许所有HTTP头
allow_headers=["*"],
)
# 存储日志队列的字典:键为请求ID,值为队列对象
log_queues: Dict[str, Queue] = {}
# 定义Git克隆请求的数据模型,使用Pydantic进行数据验证
class GitCloneRequest(BaseModel):
# 仓库URL地址,字符串类型,必需字段
repoUrl: str
# 用户名,字符串类型,必需字段
username: str
# 密码或token,字符串类型,必需字段
password: str
# 分支名称,字符串类型,默认值为"main"
branch: str = "main"
# 本地存储路径,字符串类型,必需字段
localPath: str
# 定义执行Git命令的函数
def run_git_command(request_id: str, repoUrl: str, username: str, password: str, branch: str, localPath: str):
"""执行Git命令并实时记录日志"""
# 根据请求ID获取对应的日志队列
log_queue = log_queues[request_id]
# 使用try-except块捕获可能出现的异常
try:
# 构造带认证的URL:将用户名和密码嵌入到URL中
# 检查URL是否以https://开头
if repoUrl.startswith("https://"):
# 替换URL前缀,添加认证信息
authenticated_url = repoUrl.replace(
"https://",
f"https://{username}:{password}@"
)
else:
# 如果URL没有协议前缀,直接添加认证信息
authenticated_url = f"https://{username}:{password}@{repoUrl}"
# 记录开始操作的日志消息
log_message(log_queue, "开始拉取仓库...")
# 记录仓库URL信息(不包含密码)
log_message(log_queue, f"仓库URL: {repoUrl}")
# 记录分支信息
log_message(log_queue, f"分支: {branch}")
# 记录本地路径信息
log_message(log_queue, f"本地路径: {localPath}")
# 确保本地目录存在:如果目录不存在则创建
os.makedirs(localPath, exist_ok=True)
# 检查是否已经是Git仓库:判断.git目录是否存在
git_dir = os.path.join(localPath, '.git')
if os.path.exists(git_dir):
# 如果是现有仓库,记录相应日志
log_message(log_queue, "检测到现有Git仓库,尝试拉取最新代码...")
# 切换到指定分支并拉取:定义需要执行的命令序列
commands = [
# 获取远程仓库的最新信息
['git', '-C', localPath, 'fetch', 'origin'],
# 切换到指定分支
['git', '-C', localPath, 'checkout', branch],
# 拉取指定分支的最新代码
['git', '-C', localPath, 'pull', 'origin', branch]
]
else:
# 如果是新仓库,记录克隆日志
log_message(log_queue, "克隆新仓库...")
# 克隆仓库:只执行克隆命令
commands = [
# 克隆指定分支的仓库到本地路径
['git', 'clone', '-b', branch, authenticated_url, localPath]
]
# 遍历所有需要执行的命令
for i, cmd in enumerate(commands):
# 记录当前执行的命令
log_message(log_queue, f"执行命令: {' '.join(cmd)}")
# 使用subprocess创建子进程执行命令
process = subprocess.Popen(
# 命令参数列表
cmd,
# 重定向标准输出到管道
stdout=subprocess.PIPE,
# 将标准错误重定向到标准输出
stderr=subprocess.STDOUT,
# 以文本模式处理输出
text=True,
# 设置编码为UTF-8
encoding='utf-8',
# 设置编码错误处理方式
errors='replace',
# 设置行缓冲,实现实时输出
bufsize=1,
# 启用通用换行符支持
universal_newlines=True
)
# 实时读取命令输出
while True:
# 读取一行输出
output = process.stdout.readline()
# 如果输出为空且进程已结束,退出循环
if output == '' and process.poll() is not None:
break
# 如果有输出内容
if output:
# 清理输出中的敏感信息(密码)
clean_output = output.replace(password, '***')
# 记录清理后的输出内容
log_message(log_queue, clean_output.strip())
# 获取命令的返回码
return_code = process.poll()
# 检查命令是否执行成功(返回码0表示成功)
if return_code != 0:
# 记录执行失败信息
log_message(log_queue, f"命令执行失败,返回码: {return_code}")
# 记录警告信息,但继续执行
log_message(log_queue, "⚠️ 命令执行有警告,但继续处理...")
# 所有命令执行完成,记录成功日志
log_message(log_queue, "✅ 操作完成!")
# 捕获所有异常
except Exception as e:
# 构造错误消息
error_msg = f"❌ 错误: {str(e)}"
# 记录错误信息
log_message(log_queue, error_msg)
# 无论是否发生异常都会执行的代码块
finally:
# 在队列中放入结束标记
log_queue.put("EOF") # 结束标记
# 定义日志消息记录函数
def log_message(log_queue: Queue, message: str):
"""记录带时间戳的消息"""
# 获取当前时间并格式化为时分秒
timestamp = datetime.now().strftime('%H:%M:%S')
# 构造带时间戳的格式化消息
formatted_message = f"[{timestamp}] {message}\n"
# 将消息放入日志队列
log_queue.put(formatted_message)
# 同时在控制台输出消息(便于调试)
print(formatted_message, end='') # 同时在控制台输出
# 定义克隆仓库的API端点(POST请求)
@app.post("/api/clone-repo")
async def clone_repository(request: GitCloneRequest):
"""克隆或更新Git仓库"""
try:
# 生成唯一的请求ID
request_id = str(uuid.uuid4())
# 为当前请求创建新的日志队列
log_queues[request_id] = Queue()
# 验证输入数据:检查仓库URL是否为空
if not request.repoUrl:
# 抛出HTTP异常,状态码400(客户端错误)
raise HTTPException(status_code=400, detail="仓库URL不能为空")
# 验证输入数据:检查本地路径是否为空
if not request.localPath:
raise HTTPException(status_code=400, detail="本地路径不能为空")
# 创建单独的日志文件:定义日志目录
log_dir = "logs"
# 创建日志目录(如果不存在)
os.makedirs(log_dir, exist_ok=True)
# 构造日志文件名(包含请求ID)
log_filename = f"{log_dir}/git_operation_{request_id}.log"
# 在后台线程中执行Git操作:创建新线程
thread = threading.Thread(
# 设置线程目标函数
target=run_git_command,
# 传递函数参数
args=(request_id, request.repoUrl, request.username,
request.password, request.branch, request.localPath)
)
# 设置线程为守护线程(主程序退出时自动结束)
thread.daemon = True
# 启动线程
thread.start()
# 返回成功响应
return {
"request_id": request_id,
"status": "started",
"message": "任务已开始执行"
}
# 捕获处理过程中可能出现的异常
except Exception as e:
# 抛出HTTP异常,状态码500(服务器错误)
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
# 定义获取实时日志的API端点(GET请求)
@app.get("/api/logs/{request_id}")
async def get_logs(request_id: str):
"""获取实时日志流"""
# 检查请求ID是否存在
if request_id not in log_queues:
# 如果不存在,返回404错误
raise HTTPException(status_code=404, detail="Request ID not found")
# 定义异步生成器函数,用于流式传输日志
async def log_generator():
# 获取对应请求ID的日志队列
log_queue = log_queues[request_id]
# 构造日志文件名
log_filename = f"logs/git_operation_{request_id}.log"
# 确保日志目录存在
os.makedirs("logs", exist_ok=True)
# 打开日志文件进行写入
with open(log_filename, 'w', encoding='utf-8') as log_file:
# 记录最后一次活动时间
last_activity = time.time()
# 持续循环直到任务完成
while True:
try:
# 设置较短超时时间以便快速响应(1秒)
log_line = log_queue.get(timeout=1.0)
# 检查是否是结束标记
if log_line == "EOF":
# 发送完成消息
yield f"data: [完成] 操作已完成\n\n"
# 退出循环
break
# 写入日志文件:将日志行写入文件
log_file.write(log_line)
# 刷新文件缓冲区,确保数据写入磁盘
log_file.flush()
# 发送到前端 (SSE格式):按照Server-Sent Events格式发送
yield f"data: {log_line}\n\n"
# 更新最后活动时间
last_activity = time.time()
# 捕获队列为空异常(超时)
except Empty:
# 检查是否超时(5分钟无活动)
if time.time() - last_activity > 300: # 5分钟超时
# 发送超时消息
yield "data: [超时] 连接超时\n\n"
# 退出循环
break
# 发送心跳保持连接:空消息用于保持连接活跃
yield "data: \n\n" # 空心跳
# 捕获其他异常
except Exception as e:
# 发送错误消息
yield f"data: [错误] {str(e)}\n\n"
# 退出循环
break
# 返回流式响应
return StreamingResponse(
# 使用日志生成器
log_generator(),
# 设置媒体类型为事件流
media_type="text/event-stream",
# 设置响应头
headers={
# 禁止缓存
"Cache-Control": "no-cache",
# 保持连接
"Connection": "keep-alive",
# 允许所有来源的跨域请求
"Access-Control-Allow-Origin": "*",
# 暴露所有头信息
"Access-Control-Expose-Headers": "*",
}
)
# 定义获取任务状态的API端点
@app.get("/api/status/{request_id}")
async def get_status(request_id: str):
"""获取任务状态"""
# 检查请求ID是否存在
if request_id not in log_queues:
# 如果不存在,返回未找到状态
return {"status": "not_found"}
# 返回处理中状态
return {"status": "processing"}
# 定义健康检查API端点
@app.get("/api/health")
async def health_check():
"""健康检查接口"""
# 返回健康状态信息
return {"status": "healthy", "message": "服务正常运行"}
# 定义根路径API端点
@app.get("/")
async def root():
# 返回简单的欢迎消息
return {"message": "Git Repository Manager API"}
# 主程序入口
if __name__ == "__main__":
# 导入uvicorn服务器
import uvicorn
# 运行FastAPI应用
uvicorn.run(
app,
host="0.0.0.0", # 监听所有网络接口
port=8000, # 使用8000端口
log_level="info" # 设置日志级别为info
)
```
#### 代码结构总结
这个代码实现了一个完整的Git仓库管理后端服务,主要包含以下功能:
1. **API接口**:提供克隆仓库、获取日志、检查状态等RESTful接口
2. **异步处理**:使用多线程处理耗时的Git操作,避免阻塞主线程
3. **实时日志**:通过Server-Sent Events实现实时日志推送
4. **错误处理**:完善的异常捕获和处理机制
5. **数据验证**:使用Pydantic进行请求数据验证
6. **跨域支持**:配置CORS中间件支持前端跨域访问
代码采用了清晰的分层结构,将业务逻辑、API接口和工具函数分离,便于维护和扩展。
以下是逐行添加注释的代码:
```html
Git仓库管理器
```
#### 代码功能总结
这个前端页面实现了一个完整的Git仓库管理界面,主要功能包括:
1. **响应式布局**:使用CSS Grid实现自适应布局,支持移动端
2. **表单处理**:收集用户输入的Git仓库信息
3. **实时日志**:通过Server-Sent Events实时显示后端操作日志
4. **状态管理**:显示处理状态、错误信息和成功消息
5. **用户体验**:包含加载状态、动画效果和友好的交互提示
页面采用了现代化的Vue3组合式API,代码结构清晰,易于维护和扩展。
---
### **方法2**
我来为你抽取一个公共的日志模块,并添加颜色显示功能。
我来为您抽取日志公共模块,创建一个独立的日志管理类。以下是重构后的代码:
#### 1. 创建日志管理模块 (log_manager.py)
```python
# log_manager.py
import os
import logging
import uuid
from datetime import datetime
from queue import Queue, Empty
from typing import Dict, Generator
import time
import threading
import asyncio
class LogManager:
"""日志管理器,用于处理实时日志和文件日志"""
def __init__(self, log_dir: str = "logs"):
self.log_dir = log_dir
self.log_queues: Dict[str, Queue] = {}
self.log_files: Dict[str, str] = {}
self.active_connections: Dict[str, bool] = {}
self.completed_requests: set = set()
self._setup_logging()
def _setup_logging(self):
"""设置日志目录"""
try:
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir, exist_ok=True)
except Exception as e:
print(f"设置日志目录失败: {e}")
import tempfile
self.log_dir = tempfile.gettempdir()
def create_request_logger(self, request_id: str = None) -> str:
"""为每个请求创建独立的日志记录器"""
if request_id is None:
request_id = str(uuid.uuid4())
# 清理可能存在的旧队列
if request_id in self.log_queues:
self._cleanup_request(request_id)
self.log_queues[request_id] = Queue()
self.active_connections[request_id] = False
log_filename = f"git_operation_{request_id}.log"
log_filepath = os.path.join(self.log_dir, log_filename)
self.log_files[request_id] = log_filepath
try:
os.makedirs(self.log_dir, exist_ok=True)
with open(log_filepath, 'w', encoding='utf-8') as log_file:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
start_info = f"=== Git操作日志 - 请求ID: {request_id} ===\n"
start_info += f"=== 开始时间: {timestamp} ===\n\n"
log_file.write(start_info)
log_file.flush()
except Exception as e:
print(f"创建日志文件失败: {e}")
return request_id
def log_message(self, request_id: str, message: str, level: str = "INFO"):
"""记录带时间戳的日志消息"""
if request_id not in self.log_queues:
print(f"警告: 请求ID {request_id} 的日志队列不存在")
return
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_message = f"[{timestamp}] [{level}] {message}"
# 写入文件日志
file_message = formatted_message + "\n"
self._write_to_file(request_id, file_message)
# 只有在有活跃连接时才放入实时队列
if self.active_connections.get(request_id, False):
try:
self.log_queues[request_id].put(formatted_message, timeout=0.1)
print(f"已放入队列: {message[:50]}...") # 调试信息
except Exception as e:
print(f"放入队列失败: {e}")
print(file_message, end='')
def _write_to_file(self, request_id: str, message: str):
"""写入日志文件"""
if request_id not in self.log_files:
return
log_filepath = self.log_files[request_id]
try:
with open(log_filepath, 'a', encoding='utf-8') as log_file:
log_file.write(message)
log_file.flush()
except Exception as e:
print(f"写入日志文件失败: {e}")
def mark_complete(self, request_id: str):
"""标记请求完成 - 确保完成标记被正确发送"""
if request_id not in self.log_queues:
print(f"警告: 无法标记完成,请求ID {request_id} 不存在")
return
print(f"标记任务完成: {request_id}")
# 写入完成信息到文件
completion_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
completion_msg = f"\n=== 操作完成时间: {completion_time} ===\n"
self._write_to_file(request_id, completion_msg)
# 标记为已完成
self.completed_requests.add(request_id)
# 发送完成标记到队列(确保发送)
if self.active_connections.get(request_id, False):
try:
# 清空队列中可能剩余的消息
self._clear_queue(request_id)
# 发送完成标记
self.log_queues[request_id].put("EOF", timeout=1.0)
print(f"已发送完成标记到队列: {request_id}")
except Exception as e:
print(f"发送完成标记失败: {e}")
# 如果发送失败,尝试直接关闭连接
self.active_connections[request_id] = False
else:
print(f"没有活跃连接,无需发送完成标记: {request_id}")
# 无论是否有连接,都标记为非活跃
self.active_connections[request_id] = False
def _clear_queue(self, request_id: str):
"""清空队列中的剩余消息"""
if request_id not in self.log_queues:
return
queue = self.log_queues[request_id]
cleared_count = 0
try:
while True:
queue.get_nowait()
cleared_count += 1
except Empty:
pass
if cleared_count > 0:
print(f"清空了 {cleared_count} 条剩余消息")
async def get_log_generator(self, request_id: str) -> Generator[str, None, None]:
"""获取实时日志生成器"""
print(f"开始处理日志流请求: {request_id}")
# 检查请求ID有效性
if not request_id:
yield "data: [错误] 无效的请求ID\n\n"
return
# 检查是否是已完成的任务
if request_id in self.completed_requests:
print(f"处理历史任务: {request_id}")
yield "data: [信息] 加载历史任务日志\n\n"
await asyncio.sleep(0.1)
log_content = self.get_log_content(request_id)
if log_content != "日志文件不存在":
lines = log_content.split('\n')
for line in lines:
if line.strip():
yield f"data: {line}\n\n"
await asyncio.sleep(0.01)
yield "data: [完成] 历史日志显示完毕\n\n"
else:
yield "data: [错误] 找不到该任务的日志文件\n\n"
return
# 检查是否是活跃任务
if request_id not in self.log_queues:
yield "data: [错误] 请求ID未找到或任务已过期\n\n"
return
# 实时任务处理
self.active_connections[request_id] = True
log_queue = self.log_queues[request_id]
print(f"开始实时日志流: {request_id}")
try:
# 发送开始消息
yield "data: [开始] 开始实时日志流\n\n"
await asyncio.sleep(0.1)
# 实时流处理
last_activity = time.time()
eof_received = False
while not eof_received:
try:
# 使用较短的超时时间
log_line = log_queue.get(timeout=1.0)
if log_line == "EOF":
print(f"收到完成标记: {request_id}")
yield "data: [完成] 操作已完成\n\n"
eof_received = True
break
yield f"data: {log_line}\n\n"
last_activity = time.time()
except Empty:
# 检查超时(2分钟无活动)
if time.time() - last_activity > 120:
yield "data: [超时] 连接超时\n\n"
break
# 检查任务是否已完成但未收到EOF
if request_id in self.completed_requests:
print(f"检测到已完成但未收到EOF的任务: {request_id}")
yield "data: [完成] 操作已完成(检测到完成状态)\n\n"
eof_received = True
break
# 发送心跳保持连接
yield "data: \n\n"
await asyncio.sleep(0.1)
except Exception as e:
print(f"日志流异常: {e}")
yield f"data: [错误] 流处理异常: {str(e)}\n\n"
break
# 检查连接是否仍然活跃
if not self.active_connections.get(request_id, False):
yield "data: [信息] 连接已关闭\n\n"
break
except Exception as e:
print(f"日志生成器异常: {e}")
yield f"data: [错误] 生成器异常: {str(e)}\n\n"
finally:
# 清理连接状态
self.active_connections[request_id] = False
print(f"结束日志流: {request_id}")
# 如果收到EOF,清理资源
if eof_received:
self._cleanup_request(request_id)
def get_request_status(self, request_id: str) -> Dict:
"""获取请求状态"""
if request_id in self.completed_requests:
return {"status": "completed", "message": "任务已完成"}
if request_id not in self.log_queues:
log_file = self._get_log_file_path(request_id)
if log_file and os.path.exists(log_file):
self.completed_requests.add(request_id)
return {"status": "completed", "message": "历史任务"}
return {"status": "not_found", "message": "请求ID不存在"}
if self.active_connections.get(request_id, False):
return {"status": "streaming", "message": "实时流传输中"}
return {"status": "processing", "message": "任务处理中"}
def _get_log_file_path(self, request_id: str) -> str:
"""获取日志文件路径"""
if request_id in self.log_files:
return self.log_files[request_id]
else:
return os.path.join(self.log_dir, f"git_operation_{request_id}.log")
def _cleanup_request(self, request_id: str):
"""清理请求资源"""
if request_id in self.log_queues:
try:
# 清空队列
while True:
self.log_queues[request_id].get_nowait()
except Empty:
pass
# 移除引用
if request_id in self.log_queues:
del self.log_queues[request_id]
if request_id in self.active_connections:
del self.active_connections[request_id]
if request_id in self.completed_requests:
self.completed_requests.remove(request_id)
print(f"已清理请求资源: {request_id}")
def get_log_content(self, request_id: str) -> str:
"""获取完整的日志内容"""
log_file = self._get_log_file_path(request_id)
if log_file and os.path.exists(log_file):
try:
with open(log_file, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"读取日志失败: {str(e)}"
else:
return "日志文件不存在"
def get_all_requests(self) -> Dict[str, Dict]:
"""获取所有请求的状态信息"""
result = {}
# 添加当前活跃的请求
for request_id in list(self.log_queues.keys()):
result[request_id] = self.get_request_status(request_id)
# 扫描日志目录查找历史请求
try:
if os.path.exists(self.log_dir):
for filename in os.listdir(self.log_dir):
if filename.startswith("git_operation_") and filename.endswith(".log"):
request_id = filename[14:-4]
if request_id not in result:
self.completed_requests.add(request_id)
result[request_id] = {"status": "completed", "message": "历史任务"}
except Exception as e:
print(f"扫描日志目录失败: {e}")
return result
# 创建全局日志管理器实例
log_manager = LogManager()
```
#### 2. 重构后的主程序 (main.py)
```python
# main.py
import os
import asyncio
import subprocess
import threading
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from log_manager import log_manager
# 创建FastAPI应用实例
app = FastAPI(title="Git Repository Manager")
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 定义Git克隆请求的数据模型
class GitCloneRequest(BaseModel):
repoUrl: str
username: str
password: str
branch: str = "main"
localPath: str
# 在 main.py 中修复 run_git_command 函数
def run_git_command(request_id: str, repoUrl: str, username: str, password: str, branch: str, localPath: str):
"""执行Git命令并实时记录日志"""
try:
# 构造带认证的URL
if repoUrl.startswith("https://"):
authenticated_url = repoUrl.replace(
"https://",
f"https://{username}:{password}@"
)
else:
authenticated_url = f"https://{username}:{password}@{repoUrl}"
log_manager.log_message(request_id, "开始拉取仓库...")
log_manager.log_message(request_id, f"仓库URL: {repoUrl}")
log_manager.log_message(request_id, f"分支: {branch}")
log_manager.log_message(request_id, f"本地路径: {localPath}")
# 确保本地目录存在
os.makedirs(localPath, exist_ok=True)
# 检查是否已经是Git仓库
git_dir = os.path.join(localPath, '.git')
if os.path.exists(git_dir):
log_manager.log_message(request_id, "检测到现有Git仓库,尝试拉取最新代码...")
commands = [
['git', '-C', localPath, 'fetch', 'origin'],
['git', '-C', localPath, 'checkout', branch],
['git', '-C', localPath, 'pull', 'origin', branch]
]
else:
log_manager.log_message(request_id, "克隆新仓库...")
commands = [
['git', 'clone', '-b', branch, authenticated_url, localPath]
]
all_success = True
for i, cmd in enumerate(commands):
log_manager.log_message(request_id, f"执行命令: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding='utf-8',
errors='replace',
bufsize=1,
universal_newlines=True
)
# 实时读取输出
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
clean_output = output.replace(password, '***')
log_manager.log_message(request_id, clean_output.strip())
return_code = process.poll()
if return_code != 0:
log_manager.log_message(request_id, f"命令执行失败,返回码: {return_code}", "WARNING")
all_success = False
# 不立即退出,继续执行后续命令
log_manager.log_message(request_id, "继续执行后续操作...", "WARNING")
if all_success:
log_manager.log_message(request_id, "✅ 操作完成!", "SUCCESS")
else:
log_manager.log_message(request_id, "⚠️ 操作完成,但有警告", "WARNING")
except Exception as e:
error_msg = f"❌ 错误: {str(e)}"
log_manager.log_message(request_id, error_msg, "ERROR")
finally:
# 确保标记完成被调用
log_manager.mark_complete(request_id)
print(f"Git命令执行完成,已标记请求 {request_id} 为完成状态")
@app.post("/api/clone-repo")
async def clone_repository(request: GitCloneRequest):
"""克隆或更新Git仓库"""
try:
# 验证输入数据
if not request.repoUrl:
raise HTTPException(status_code=400, detail="仓库URL不能为空")
if not request.localPath:
raise HTTPException(status_code=400, detail="本地路径不能为空")
# 为每个请求创建独立的日志记录器
request_id = log_manager.create_request_logger()
# 在后台线程中执行Git操作
thread = threading.Thread(
target=run_git_command,
args=(request_id, request.repoUrl, request.username,
request.password, request.branch, request.localPath)
)
thread.daemon = True
thread.start()
return {
"request_id": request_id,
"status": "started",
"message": "任务已开始执行",
"log_url": f"/api/logs/{request_id}",
"log_file": f"logs/git_operation_{request_id}.log"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
@app.get("/api/logs/{request_id}")
async def get_logs(request_id: str):
"""获取实时日志流"""
return StreamingResponse(
log_manager.get_log_generator(request_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Expose-Headers": "*",
"X-Accel-Buffering": "no" # 禁用Nginx缓冲
}
)
@app.get("/api/status/{request_id}")
async def get_status(request_id: str):
"""获取任务状态"""
return log_manager.get_request_status(request_id)
@app.get("/api/logs/{request_id}/content")
async def get_log_content(request_id: str):
"""获取完整的日志内容"""
content = log_manager.get_log_content(request_id)
return {
"request_id": request_id,
"log_content": content,
"has_log": content != "日志文件不存在"
}
@app.get("/api/requests")
async def get_all_requests():
"""获取所有请求的状态"""
return log_manager.get_all_requests()
@app.get("/api/health")
async def health_check():
"""健康检查接口"""
return {"status": "healthy", "message": "服务正常运行"}
@app.get("/")
async def root():
return {"message": "Git Repository Manager API"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)
```
#### 3. 主要改进点
1. **模块化设计**: 将日志功能抽离为独立的 `LogManager` 类
2. **统一的日志管理**: 提供统一的接口处理实时日志和文件日志
3. **资源管理**: 自动清理完成的请求资源
4. **扩展性**: 易于添加新的日志功能(如日志级别、日志轮转等)
5. **错误处理**: 完善的异常处理机制
6. **状态管理**: 提供请求状态查询功能
7. **日志检索**: 新增获取完整日志内容的接口
#### 4. 使用示例
前端可以通过以下方式使用:
```html
Git仓库管理器
```
这样的设计使得日志管理更加清晰,便于维护和扩展。
### **方法3**
我将把实时日志展示从 SSE 改为 WebSockets。以下是完整的实现:
1. WebSocket 连接管理器 (websocket_manager.py)
```python
# websocket_manager.py
import asyncio
import json
from typing import Dict, Set
from fastapi import WebSocket, WebSocketDisconnect
import logging
class WebSocketManager:
"""WebSocket 连接管理器"""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.connection_tasks: Dict[str, asyncio.Task] = {}
self.logger = logging.getLogger(__name__)
async def connect(self, websocket: WebSocket, request_id: str):
"""接受 WebSocket 连接"""
await websocket.accept()
self.active_connections[request_id] = websocket
self.logger.info(f"WebSocket 连接已建立: {request_id}")
# 发送连接确认消息
await self.send_message(request_id, {
"type": "connection",
"status": "connected",
"message": "WebSocket 连接成功"
})
def disconnect(self, request_id: str):
"""断开 WebSocket 连接"""
if request_id in self.active_connections:
del self.active_connections[request_id]
self.logger.info(f"WebSocket 连接已断开: {request_id}")
# 取消相关任务
if request_id in self.connection_tasks:
self.connection_tasks[request_id].cancel()
del self.connection_tasks[request_id]
async def send_message(self, request_id: str, message: dict):
"""发送消息到指定客户端"""
if request_id in self.active_connections:
try:
await self.active_connections[request_id].send_json(message)
except Exception as e:
self.logger.error(f"发送消息失败 {request_id}: {e}")
self.disconnect(request_id)
async def broadcast(self, message: dict):
"""广播消息到所有客户端"""
disconnected_ids = []
for request_id, websocket in self.active_connections.items():
try:
await websocket.send_json(message)
except Exception as e:
self.logger.error(f"广播消息失败 {request_id}: {e}")
disconnected_ids.append(request_id)
# 清理断开连接的客户端
for request_id in disconnected_ids:
self.disconnect(request_id)
def is_connected(self, request_id: str) -> bool:
"""检查连接是否活跃"""
return request_id in self.active_connections
async def handle_client_messages(self, websocket: WebSocket, request_id: str):
"""处理客户端消息"""
try:
while True:
# 接收客户端消息
data = await websocket.receive_text()
try:
message = json.loads(data)
await self.process_client_message(request_id, message)
except json.JSONDecodeError:
await self.send_message(request_id, {
"type": "error",
"message": "无效的 JSON 格式"
})
except WebSocketDisconnect:
self.logger.info(f"客户端主动断开连接: {request_id}")
except Exception as e:
self.logger.error(f"处理客户端消息错误 {request_id}: {e}")
finally:
self.disconnect(request_id)
async def process_client_message(self, request_id: str, message: dict):
"""处理客户端消息"""
msg_type = message.get("type")
if msg_type == "ping":
# 心跳响应
await self.send_message(request_id, {
"type": "pong",
"timestamp": message.get("timestamp")
})
elif msg_type == "control":
# 控制命令
command = message.get("command")
if command == "pause":
await self.send_message(request_id, {
"type": "control",
"status": "paused",
"message": "日志传输已暂停"
})
elif command == "resume":
await self.send_message(request_id, {
"type": "control",
"status": "resumed",
"message": "日志传输已恢复"
})
elif msg_type == "status":
# 状态查询
await self.send_message(request_id, {
"type": "status",
"connected": True,
"request_id": request_id
})
else:
await self.send_message(request_id, {
"type": "error",
"message": f"未知的消息类型: {msg_type}"
})
# 全局 WebSocket 管理器实例
websocket_manager = WebSocketManager()
```
2. 更新日志管理模块 (log_manager.py)
```python
# log_manager.py
import os
import logging
import uuid
from datetime import datetime
from queue import Queue, Empty
from typing import Dict
import time
import asyncio
from websocket_manager import websocket_manager
class LogManager:
"""日志管理器,集成 WebSocket 支持"""
def __init__(self, log_dir: str = "logs"):
self.log_dir = log_dir
self.log_queues: Dict[str, Queue] = {}
self.log_files: Dict[str, str] = {}
self.completed_requests: set = set()
self.logger = logging.getLogger(__name__)
self._setup_logging()
def _setup_logging(self):
"""设置日志目录"""
try:
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir, exist_ok=True)
except Exception as e:
self.logger.error(f"设置日志目录失败: {e}")
import tempfile
self.log_dir = tempfile.gettempdir()
def create_request_logger(self, request_id: str = None) -> str:
"""为每个请求创建独立的日志记录器"""
if request_id is None:
request_id = str(uuid.uuid4())
# 清理可能存在的旧队列
if request_id in self.log_queues:
self._cleanup_request(request_id)
self.log_queues[request_id] = Queue()
log_filename = f"git_operation_{request_id}.log"
log_filepath = os.path.join(self.log_dir, log_filename)
self.log_files[request_id] = log_filepath
try:
os.makedirs(self.log_dir, exist_ok=True)
with open(log_filepath, 'w', encoding='utf-8') as log_file:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
start_info = f"=== Git操作日志 - 请求ID: {request_id} ===\n"
start_info += f"=== 开始时间: {timestamp} ===\n\n"
log_file.write(start_info)
log_file.flush()
except Exception as e:
self.logger.error(f"创建日志文件失败: {e}")
return request_id
def log_message(self, request_id: str, message: str, level: str = "INFO"):
"""记录带时间戳的日志消息"""
if request_id not in self.log_queues:
self.logger.warning(f"请求ID {request_id} 的日志队列不存在")
return
timestamp = datetime.now().strftime('%H:%M:%S')
formatted_message = f"[{timestamp}] [{level}] {message}"
# 写入文件日志
file_message = formatted_message + "\n"
self._write_to_file(request_id, file_message)
# 通过 WebSocket 实时发送
asyncio.create_task(self._send_websocket_message(request_id, {
"type": "log",
"level": level.lower(),
"message": message,
"timestamp": timestamp,
"full_message": formatted_message
}))
print(file_message, end='')
async def _send_websocket_message(self, request_id: str, message: dict):
"""通过 WebSocket 发送消息"""
if websocket_manager.is_connected(request_id):
try:
await websocket_manager.send_message(request_id, message)
except Exception as e:
self.logger.error(f"WebSocket 发送失败 {request_id}: {e}")
def _write_to_file(self, request_id: str, message: str):
"""写入日志文件"""
if request_id not in self.log_files:
return
log_filepath = self.log_files[request_id]
try:
with open(log_filepath, 'a', encoding='utf-8') as log_file:
log_file.write(message)
log_file.flush()
except Exception as e:
self.logger.error(f"写入日志文件失败: {e}")
def mark_complete(self, request_id: str):
"""标记请求完成"""
if request_id not in self.log_queues:
self.logger.warning(f"无法标记完成,请求ID {request_id} 不存在")
return
self.logger.info(f"标记任务完成: {request_id}")
# 写入完成信息到文件
completion_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
completion_msg = f"\n=== 操作完成时间: {completion_time} ===\n"
self._write_to_file(request_id, completion_msg)
# 标记为已完成
self.completed_requests.add(request_id)
# 通过 WebSocket 发送完成消息
asyncio.create_task(self._send_websocket_message(request_id, {
"type": "completion",
"status": "completed",
"message": "操作已完成",
"timestamp": completion_time
}))
async def stream_historical_logs(self, request_id: str):
"""流式传输历史日志"""
if not websocket_manager.is_connected(request_id):
return
self.logger.info(f"开始流式传输历史日志: {request_id}")
# 发送开始消息
await websocket_manager.send_message(request_id, {
"type": "history_start",
"message": "开始传输历史日志"
})
log_content = self.get_log_content(request_id)
if log_content != "日志文件不存在":
lines = log_content.split('\n')
for line in lines:
if line.strip() and websocket_manager.is_connected(request_id):
await websocket_manager.send_message(request_id, {
"type": "log",
"level": "info",
"message": line,
"timestamp": datetime.now().strftime('%H:%M:%S'),
"historical": True
})
await asyncio.sleep(0.01) # 避免发送过快
await websocket_manager.send_message(request_id, {
"type": "history_complete",
"message": "历史日志传输完成"
})
else:
await websocket_manager.send_message(request_id, {
"type": "error",
"message": "找不到该任务的日志文件"
})
def get_request_status(self, request_id: str) -> Dict:
"""获取请求状态"""
if request_id in self.completed_requests:
return {"status": "completed", "message": "任务已完成"}
if request_id not in self.log_queues:
log_file = self._get_log_file_path(request_id)
if log_file and os.path.exists(log_file):
self.completed_requests.add(request_id)
return {"status": "completed", "message": "历史任务"}
return {"status": "not_found", "message": "请求ID不存在"}
return {"status": "processing", "message": "任务处理中"}
def _get_log_file_path(self, request_id: str) -> str:
"""获取日志文件路径"""
if request_id in self.log_files:
return self.log_files[request_id]
else:
return os.path.join(self.log_dir, f"git_operation_{request_id}.log")
def _cleanup_request(self, request_id: str):
"""清理请求资源"""
if request_id in self.log_queues:
try:
while True:
self.log_queues[request_id].get_nowait()
except Empty:
pass
if request_id in self.log_queues:
del self.log_queues[request_id]
if request_id in self.completed_requests:
self.completed_requests.remove(request_id)
def get_log_content(self, request_id: str) -> str:
"""获取完整的日志内容"""
log_file = self._get_log_file_path(request_id)
if log_file and os.path.exists(log_file):
try:
with open(log_file, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"读取日志失败: {str(e)}"
else:
return "日志文件不存在"
def get_all_requests(self) -> Dict[str, Dict]:
"""获取所有请求的状态信息"""
result = {}
# 添加当前活跃的请求
for request_id in list(self.log_queues.keys()):
result[request_id] = self.get_request_status(request_id)
# 扫描日志目录查找历史请求
try:
if os.path.exists(self.log_dir):
for filename in os.listdir(self.log_dir):
if filename.startswith("git_operation_") and filename.endswith(".log"):
request_id = filename[14:-4]
if request_id not in result:
self.completed_requests.add(request_id)
result[request_id] = {"status": "completed", "message": "历史任务"}
except Exception as e:
self.logger.error(f"扫描日志目录失败: {e}")
return result
# 创建全局日志管理器实例
log_manager = LogManager()
```
3. 更新主程序 (main.py)
```python
# main.py
import os
import asyncio
import subprocess
import threading
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from log_manager import log_manager
from websocket_manager import websocket_manager
# 创建FastAPI应用实例
app = FastAPI(title="Git Repository Manager")
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 定义Git克隆请求的数据模型
class GitCloneRequest(BaseModel):
repoUrl: str
username: str
password: str
branch: str = "main"
localPath: str
def run_git_command(request_id: str, repoUrl: str, username: str, password: str, branch: str, localPath: str):
"""执行Git命令并实时记录日志"""
try:
# 构造带认证的URL
if repoUrl.startswith("https://"):
authenticated_url = repoUrl.replace(
"https://",
f"https://{username}:{password}@"
)
else:
authenticated_url = f"https://{username}:{password}@{repoUrl}"
log_manager.log_message(request_id, "开始拉取仓库...")
log_manager.log_message(request_id, f"仓库URL: {repoUrl}")
log_manager.log_message(request_id, f"分支: {branch}")
log_manager.log_message(request_id, f"本地路径: {localPath}")
# 确保本地目录存在
os.makedirs(localPath, exist_ok=True)
# 检查是否已经是Git仓库
git_dir = os.path.join(localPath, '.git')
if os.path.exists(git_dir):
log_manager.log_message(request_id, "检测到现有Git仓库,尝试拉取最新代码...")
commands = [
['git', '-C', localPath, 'fetch', 'origin'],
['git', '-C', localPath, 'checkout', branch],
['git', '-C', localPath, 'pull', 'origin', branch]
]
else:
log_manager.log_message(request_id, "克隆新仓库...")
commands = [
['git', 'clone', '-b', branch, authenticated_url, localPath]
]
all_success = True
for i, cmd in enumerate(commands):
log_manager.log_message(request_id, f"执行命令: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding='utf-8',
errors='replace',
bufsize=1,
universal_newlines=True
)
# 实时读取输出
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
clean_output = output.replace(password, '***')
log_manager.log_message(request_id, clean_output.strip())
return_code = process.poll()
if return_code != 0:
log_manager.log_message(request_id, f"命令执行失败,返回码: {return_code}", "WARNING")
all_success = False
log_manager.log_message(request_id, "继续执行后续操作...", "WARNING")
if all_success:
log_manager.log_message(request_id, "✅ 操作完成!", "SUCCESS")
else:
log_manager.log_message(request_id, "⚠️ 操作完成,但有警告", "WARNING")
except Exception as e:
error_msg = f"❌ 错误: {str(e)}"
log_manager.log_message(request_id, error_msg, "ERROR")
finally:
log_manager.mark_complete(request_id)
@app.post("/api/clone-repo")
async def clone_repository(request: GitCloneRequest):
"""克隆或更新Git仓库"""
try:
# 验证输入数据
if not request.repoUrl:
raise HTTPException(status_code=400, detail="仓库URL不能为空")
if not request.localPath:
raise HTTPException(status_code=400, detail="本地路径不能为空")
# 为每个请求创建独立的日志记录器
request_id = log_manager.create_request_logger()
# 在后台线程中执行Git操作
thread = threading.Thread(
target=run_git_command,
args=(request_id, request.repoUrl, request.username,
request.password, request.branch, request.localPath)
)
thread.daemon = True
thread.start()
return {
"request_id": request_id,
"status": "started",
"message": "任务已开始执行",
"websocket_url": f"ws://localhost:8000/ws/logs/{request_id}"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务器错误: {str(e)}")
@app.websocket("/ws/logs/{request_id}")
async def websocket_logs_endpoint(websocket: WebSocket, request_id: str):
"""WebSocket 日志端点"""
await websocket_manager.connect(websocket, request_id)
try:
# 如果是历史任务,流式传输日志
if request_id in log_manager.completed_requests:
await log_manager.stream_historical_logs(request_id)
# 处理客户端消息
await websocket_manager.handle_client_messages(websocket, request_id)
except WebSocketDisconnect:
websocket_manager.disconnect(request_id)
except Exception as e:
print(f"WebSocket 错误: {e}")
websocket_manager.disconnect(request_id)
@app.get("/api/status/{request_id}")
async def get_status(request_id: str):
"""获取任务状态"""
return log_manager.get_request_status(request_id)
@app.get("/api/logs/{request_id}/content")
async def get_log_content(request_id: str):
"""获取完整的日志内容"""
content = log_manager.get_log_content(request_id)
return {
"request_id": request_id,
"log_content": content,
"has_log": content != "日志文件不存在"
}
@app.get("/api/requests")
async def get_all_requests():
"""获取所有请求的状态"""
return log_manager.get_all_requests()
@app.get("/api/health")
async def health_check():
"""健康检查接口"""
return {"status": "healthy", "message": "服务正常运行"}
@app.get("/")
async def root():
return {"message": "Git Repository Manager API"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)
```
4. 完整的前端代码 (index.html)
```html
Git仓库管理器 - WebSocket版本
Git仓库管理器 - WebSocket版本
使用 WebSocket 实现实时日志监控
实时日志 未连接
```
主要改进内容
1. 完整的 WebSocket 实现:替代了原来的 SSE 方案
2. 双向通信:支持客户端向服务器发送控制命令
3. 心跳机制:保持连接活跃,检测连接状态
4. 更好的错误处理:详细的连接状态监控
5. 历史任务支持:自动识别并流式传输历史日志
6. 控制功能:支持暂停、恢复等控制命令
7. 连接状态显示:实时显示 WebSocket 连接状态
现在系统使用 WebSocket 实现了真正的双向实时通信,提供了更好的用户体验和更强大的功能。