# 量化数据网关 **Repository Path**: teeoo/quant-data-gateway ## Basic Information - **Project Name**: 量化数据网关 - **Description**: 化数据网关 - **Primary Language**: Unknown - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-03-27 - **Last Updated**: 2026-03-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # DataServer - A股数据采集服务 [![Python](https://img.shields.io/badge/Python-3.8+-blue.svg)](https://www.python.org/) [![Flask](https://img.shields.io/badge/Flask-2.0+-green.svg)](https://flask.palletsprojects.com/) [![License](https://img.shields.io/badge/License-MIT-yellow.svg)](LICENSE) 一个高性能、可扩展的 A 股市场数据采集系统,通过 QMT 数据源采集行情数据,并提供 RESTful API 接口。 ## 核心特性 ### 🚀 高性能采集 - **多线程并行处理** - 支持配置并行线程数,大幅提升采集效率 - **连接池管理** - 数据库连接池复用,减少连接开销 - **批量写入优化** - 支持批量数据写入,提高吞吐量 - **实时采集优化** - 批量获取行情 + 并行保存,性能提升 50-100 倍 ### 🔄 灵活的数据路由 - **多数据库支持** - 支持 MySQL、ClickHouse、Doris - **可配置路由策略** - 元数据存 MySQL,行情数据存 OLAP 数据库 - **双模式写入** - 支持直接写入和 HTTP 代理写入 ### 🛡️ 可靠性保障 - **无限重试机制** - 采集失败自动重试,确保数据完整性 - **断点续传** - 支持从指定位置继续采集,避免重复工作 - **进度持久化** - 采集进度实时保存,支持中断恢复 - **数据质量监控** - 专用日志记录数据缺失问题 ### 📊 丰富的数据类型 - 日K线数据(开高低收量) - 分钟级K线数据(1/5/15/30/60分钟) - 财务数据(EPS、ROE、现金流等) - 实时行情数据(含买卖盘口) ### 🔌 完善的 API - RESTful API 接口 - 支持多种股票代码格式 - 自动从数据库获取全量股票列表 - 健康检查和状态监控 --- ## 系统架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ DataServer 服务 │ │ (Flask, Port 8766) │ ├─────────────────────────────────────────────────────────────┤ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ API Layer │ │ Scheduler │ │ CLI Tools │ │ │ │ (routes.py) │ │ (realtime) │ │ (init_*.py) │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ │ ┌──────┴─────────────────┴─────────────────┴──────┐ │ │ │ Core Collection Layer │ │ │ │ ┌─────────────┐ ┌──────────────┐ │ │ │ │ │QMTCollector │ │ │ │ │ │ │ │(xtquant) │ │ │ │ │ │ │ └─────────────┘ └──────────────┘ │ │ │ └────────────────────────┬────────────────────────┘ │ │ │ │ │ ┌────────────────────────┴────────────────────────┐ │ │ │ Data Writer Layer │ │ │ │ ┌─────────────┐ ┌──────────────┐ │ │ │ │ │ DataWriter │ │ JavaClient │ │ │ │ │ │ (Direct) │ │ (HTTP Mode) │ │ │ │ │ └─────────────┘ └──────────────┘ │ │ │ └────────────────────────┬────────────────────────┘ │ └───────────────────────────┼─────────────────────────────────┘ │ ┌─────────────────┼─────────────────┐ ▼ ▼ ▼ ┌───────────┐ ┌─────────────┐ ┌───────────┐ │ MySQL │ │ ClickHouse │ │ Doris │ │ 元数据 │ │ 行情数据 │ │ (可选) │ └───────────┘ └─────────────┘ └───────────┘ ``` --- ## 目录结构 ``` dataServer/ ├── api/ # API 模块 │ ├── __init__.py │ └── routes.py # API 路由定义 ├── config/ # 配置模块 │ ├── __init__.py │ └── settings.py # 配置加载(支持环境变量覆盖) ├── core/ # 核心采集模块 │ ├── __init__.py │ ├── base_collector.py # 采集器基类 │ └── qmt_collector.py # QMT 采集器实现 ├── db/ # 数据库模块 │ ├── __init__.py │ ├── base_connector.py # 连接器基类 │ ├── connection_pool.py # 连接池管理器 │ ├── mysql_connector.py # MySQL 连接器 │ ├── clickhouse_connector.py # ClickHouse 连接器 │ └── doris_connector.py # Doris 连接器 ├── scheduler/ # 调度模块 │ ├── __init__.py │ └── realtime_worker.py # 实时采集 Worker(批量优化版) ├── services/ # 服务模块 │ ├── __init__.py │ ├── data_writer.py # 数据写入服务(单例,含数据质量监控) │ ├── java_client.py # Java 后端 HTTP 客户端 │ └── stock_list_service.py # 股票列表服务 ├── utils/ # 工具模块 │ ├── __init__.py │ ├── code_converter.py # 股票代码转换 │ └── data_helper.py # 数据格式化工具 ├── sql/ # SQL 脚本 │ ├── mysql.sql # MySQL 建表脚本 │ ├── clickhouse.sql # ClickHouse 建表脚本 │ ├── doris.sql # Doris 建表脚本 │ └── fix_tick_data_nullable.sql # 修复 tick_data 表允许 NULL ├── logs/ # 日志目录 │ ├── data-collector-*.log # 主日志 │ └── data_quality.log # 数据质量日志 ├── main.py # 服务入口 ├── init_history_data.py # 历史数据初始化脚本(CLI) ├── config.yml # 配置文件 ├── config.example.yml # 配置文件模板 ├── requirements.txt # Python 依赖 ├── start.sh # Linux/Mac 启动脚本 ├── start.ps1 # Windows 启动脚本 ├── API_DOC.md # API 详细文档 ├── CODE_REVIEW.md # 代码审查记录 └── README.md # 本文档 ``` --- ## 快速开始 ### 1. 环境要求 - Python 3.8+ - MySQL 5.7+ / 8.0+ - ClickHouse 20.0+ (或 Doris 1.0+) - miniQMT 客户端(使用 QMT 数据源时) ### 2. 安装 ```bash # 克隆项目 git clone cd dataServer # 创建虚拟环境(推荐) python -m venv venv source venv/bin/activate # Linux/Mac # venv\Scripts\activate # Windows # 安装依赖 pip install -r requirements.txt ``` ### 3. 配置 ```bash # 复制配置模板 cp config.example.yml config.yml # 编辑配置文件 vim config.yml # 或使用其他编辑器 ``` **核心配置项:** ```yaml # 存储模式配置 storage: mode: direct # direct=直接写入数据库, http=通过Java后端写入 routing: metadata: mysql # 元数据存储 olap: clickhouse # OLAP数据存储 (clickhouse/doris/none) # MySQL 配置 mysql: enabled: true host: 127.0.0.1 port: 3306 database: stock_data user: stocker password: "your_mysql_password" # ClickHouse 配置 clickhouse: enabled: true host: 127.0.0.1 port: 8123 native_port: 9000 database: stock_data user: admin password: "your_clickhouse_password" # 初始化配置(多线程并行) init: threads: 2 # 并行线程数 retry_interval: 5 # 重试间隔(秒) batch_save_size: 50 # 批量保存进度大小 # 实时采集配置(批量优化版) realtime: enabled: true collect_interval: 3 # 采集间隔(秒) save_threads: 4 # 并行保存线程数 batch_save_size: 100 # 批量保存大小 ``` ### 4. 初始化数据库 ```bash # 创建 MySQL 数据库和表 mysql -u root -p < sql/mysql.sql # 创建 ClickHouse 数据库和表 clickhouse-client --host < sql/clickhouse.sql # 如果 tick_data 表需要支持 NULL 值(买卖盘口数据),执行修复脚本 clickhouse-client --host < sql/fix_tick_data_nullable.sql ``` ### 5. 启动服务 ```bash # Linux/Mac ./start.sh # Windows .\start.ps1 # 或直接运行 python main.py ``` 服务默认监听 `http://0.0.0.0:8766` --- ## 使用指南 ### API 接口 服务启动后,可以通过 RESTful API 进行数据采集: #### 健康检查 ```bash GET /health ``` #### 采集日K线数据 ```bash POST /api/v1/collector/history/kline Content-Type: application/json { "start_date": "20240101", "end_date": "20241231", "symbols": ["600519.SH", "000001.SZ"], "auto_save": true } ``` #### 采集分钟数据 ```bash POST /api/v1/collector/history/minute Content-Type: application/json { "start_date": "20240101", "end_date": "20240131", "period": "1m", "codes": ["600519"], "auto_save": true } ``` #### 断点续传(从指定股票之后继续) ```bash POST /api/v1/collector/history/minute Content-Type: application/json { "start_date": "20240101", "end_date": "20260630", "codes": ["301312"], "auto_save": true, "continue_from_last": true } ``` #### 实时采集控制 ```bash # 启动实时采集 POST /api/v1/collector/realtime/start # 停止实时采集 POST /api/v1/collector/realtime/stop # 获取状态 GET /api/v1/collector/realtime/status ``` > 完整 API 文档请访问 `http://localhost:8766/doc` 或参考 [API_DOC.md](./API_DOC.md) ### 命令行工具 #### 历史数据初始化脚本 ```bash # 查看帮助 python init_history_data.py --help # 初始化所有数据类型 python init_history_data.py --all --start 20240101 --end 20260630 # 只初始化分钟数据 python init_history_data.py --minute --start 20240101 --end 20260630 # 从指定股票继续(断点续传) python init_history_data.py --minute --from 600519.SH --start 20240101 # 只重试失败的股票 python init_history_data.py --kline --retry-failed # 查看进度 python init_history_data.py --progress --show-failed # 清除进度 python init_history_data.py --clear ``` **命令行参数说明:** | 参数 | 说明 | |------|------| | `--all` | 初始化所有数据类型 | | `--kline` | 只初始化日K线 | | `--minute` | 只初始化分钟数据 | | `--financial` | 只初始化财务数据 | | `--start` | 起始日期 (YYYYMMDD) | | `--end` | 结束日期 (YYYYMMDD) | | `--from` | 从指定股票代码开始 | | `--retry-failed` | 只处理失败的股票 | | `--stocks` | 指定股票列表(逗号分隔) | | `--progress` | 显示当前进度 | | `--clear` | 清除进度记录 | --- ## 实时采集(批量优化版) ### 架构优化 实时采集模块采用批量优化架构: ``` ┌─────────────────────────────────────────────────────────────┐ │ RealtimeWorker │ ├─────────────────────────────────────────────────────────────┤ │ ┌───────────────────────────────────────────────────────┐ │ │ │ 批量获取行情(核心优化) │ │ │ │ xtdata.get_full_tick([所有股票]) → 一次调用 │ │ │ └───────────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ 多线程并行保存 │ │ │ │ ThreadPoolExecutor (4线程) → 并行写入数据库 │ │ │ └───────────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌───────────────────────────────────────────────────────┐ │ │ │ 直接写入数据库 │ │ │ │ DataWriter → ClickHouse/Doris(不经过Java后端) │ │ │ └───────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ### 性能对比 | 指标 | 旧版(逐个获取) | 批量优化版 | 提升 | |------|------------------|------------|------| | 获取 5000 只股票行情 | 5000 次 API 调用 | **1 次** API 调用 | **5000x** | | 保存数据 | 串行逐个保存 | **4 线程**并行保存 | **4x** | | 数据写入路径 | Python → HTTP → Java → DB | **Python → DB(直连)** | 更低延迟 | | 整体性能 | 基准 | **50-100 倍提升** | **50-100x** | ### 配置说明 ```yaml realtime: enabled: true # 启用实时采集 collect_interval: 3 # 采集间隔(秒) save_threads: 4 # 并行保存线程数 batch_save_size: 100 # 批量保存大小 trade_time: morning_start: "09:30:00" morning_end: "11:30:00" afternoon_start: "13:00:00" afternoon_end: "15:00:00" ``` ### 运行日志示例 ``` [实时采集] 采集线程已启动(批量优化版) - 监控股票数: 5000 只 - 采集间隔: 3 秒 - 保存线程: 4 个 ============================================================ [实时采集] 本轮采集完成 - 采集股票: 5000 只 - 获取成功: 5000 条 - 保存成功: 5000 条 - 耗时: 1.25s (平均: 1.30s) ============================================================ ``` --- ## 数据质量监控 ### 质量日志 系统自动监控数据质量,缺失字段会记录到专用日志文件: **日志文件**: `logs/data_quality.log` **日志格式**: ```json { "symbol": "600519.SH", "data_type": "tick", "issues": { "bid_price1": 5, "ask_price1": 3, "total_volume": 2 }, "sample": { "tradeTime": "2024-03-23 10:30:00", "price": 1850.50, "bidPrice1": null, "askPrice1": 1851.00 }, "timestamp": "2024-03-23 10:30:05" } ``` ### ClickHouse 表结构优化 `tick_data` 表已支持 NULL 值,允许买卖盘口数据为空: ```sql -- 买卖盘口字段支持 NULL bid_price1 Nullable(Decimal(10, 3)), bid_volume1 Nullable(UInt64), ask_price1 Nullable(Decimal(10, 3)), ask_volume1 Nullable(UInt64), ``` --- ## 数据表结构 ### MySQL (t_stock_info) ```sql CREATE TABLE t_stock_info ( id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, code VARCHAR(10) NOT NULL COMMENT '股票代码', name VARCHAR(50) NOT NULL COMMENT '股票名称', market VARCHAR(10) NOT NULL COMMENT '市场: sh/sz/bj', industry VARCHAR(50) COMMENT '行业', list_date DATE COMMENT '上市日期', status TINYINT DEFAULT 1 COMMENT '状态: 1=正常, 0=退市', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_code (code), KEY idx_market (market), KEY idx_status (status) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='股票基本信息表'; ``` ### ClickHouse (kline_daily) ```sql CREATE TABLE kline_daily ( symbol String COMMENT '股票代码', trade_date Date COMMENT '交易日期', open Decimal(10, 3) COMMENT '开盘价', high Decimal(10, 3) COMMENT '最高价', low Decimal(10, 3) COMMENT '最低价', close Decimal(10, 3) COMMENT '收盘价', volume UInt64 COMMENT '成交量(手)', amount Decimal(18, 2) COMMENT '成交额(元)', turnover_rate Decimal(10, 4) COMMENT '换手率(%)', change_percent Decimal(10, 4) COMMENT '涨跌幅(%)', created_at DateTime DEFAULT now() COMMENT '创建时间' ) ENGINE = ReplacingMergeTree() ORDER BY (symbol, trade_date) COMMENT '日K线数据表'; ``` > 完整建表脚本见 `sql/` 目录 --- ## 股票代码格式 系统支持多种股票代码格式: | 格式 | 示例 | 说明 | |------|------|------| | 完整格式 | `600519.SH` | 推荐,明确指定市场 | | 简码 + market | `codes: ["600519"], market: "SH"` | 需配合 market 参数 | | 自动推断 | `codes: ["600519"]` | 根据代码规则自动判断 | **市场代码规则:** - `6xxxxx` → SH(上海主板) - `00xxxx` → SZ(深圳主板) - `30xxxx` → SZ(创业板) - `68xxxx` → SH(科创板) - `4xxxxx/8xxxxx` → BJ(北京证券交易所) --- ## 性能特点 ### 多线程并行采集 ```yaml init: threads: 2 # 并行线程数,建议根据 CPU 核心数调整 ``` - 每个线程独立处理分配到的股票 - 遇到异常时无限重试,不跳过任何股票 - 线程安全的进度保存机制 ### 批量写入优化 ```yaml clickhouse: batch: size: 10000 # 批量大小 flush_interval: 5 # 刷新间隔(秒) ``` ### 连接池管理 ```yaml mysql: pool: max_connections: 10 min_cached: 2 max_cached: 5 ``` --- ## PowerShell 使用示例 ```powershell # 采集单只股票分钟数据 $body = @{ start_date = "20240101" end_date = "20260630" codes = @("301312") period = "1m" auto_save = $true } | ConvertTo-Json Invoke-RestMethod -Method POST -Uri "http://localhost:8766/api/v1/collector/history/minute" -ContentType "application/json" -Body $body # 断点续传(从指定股票之后继续) $body = @{ start_date = "20240101" end_date = "20260630" codes = @("301312") auto_save = $true continue_from_last = $true } | ConvertTo-Json Invoke-RestMethod -Method POST -Uri "http://localhost:8766/api/v1/collector/history/minute" -ContentType "application/json" -Body $body ``` --- ## 注意事项 1. **QMT 数据源** - 使用前需启动 miniQMT 客户端并登录 2. **MySQL 股票列表** - 确保 `t_stock_info` 表有完整的股票列表数据 3. **实时采集时间** - 只在交易时间段执行(09:30-11:30, 13:00-15:00) 4. **内存控制** - 分钟数据量较大,建议分批次采集 5. **断点续传** - 进度保存在 `init_progress.json` 文件中 6. **数据质量** - 查看 `logs/data_quality.log` 了解数据缺失情况 --- ## ⚠️ 安全须知 1. **配置文件安全** - `config.yml` 包含数据库密码,**请勿提交到版本控制** - 已在 `.gitignore` 中排除此文件 2. **使用环境变量(推荐)** 可以通过环境变量覆盖敏感配置: ```bash export MYSQL_HOST="your_host" export MYSQL_PASSWORD="your_password" export CLICKHOUSE_PASSWORD="your_password" ``` 3. **首次使用** ```bash cp config.example.yml config.yml # 编辑 config.yml 填入真实配置 ``` --- ## 故障排除 ### QMT 连接失败 ```bash # 检查 miniQMT 客户端是否运行 # 确保已登录账户 # 查看日志 tail -f logs/data-collector-*.log ``` ### ClickHouse 连接失败 ```bash # 测试连接 clickhouse-client --host --query "SELECT 1" # 检查容器状态 docker ps | grep clickhouse docker logs clickhouse-container ``` ### MySQL 连接池耗尽 ```yaml # 调整连接池配置 mysql: pool: max_connections: 20 # 增加最大连接数 ``` ### 数据缺失问题 ```bash # 查看数据质量日志 tail -f logs/data_quality.log # 检查 ClickHouse 表结构是否支持 NULL clickhouse-client --query "DESCRIBE tick_data" ``` --- ## 相关文档 - [API 详细文档](./API_DOC.md) - [初始化指南](./INIT_GUIDE.md) - [代码审查记录](./CODE_REVIEW.md) - [Windows 部署指南](./README_WINDOWS.md) --- ## License MIT License