# bulk_data_processor **Repository Path**: taikun928/bulk_data_processor ## Basic Information - **Project Name**: bulk_data_processor - **Description**: 大批量数据处理 - **Primary Language**: Python - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-01-08 - **Last Updated**: 2026-01-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # bulk_data_processor ## 项目简介(草稿项目,暂时废弃) `bulk_data_processor` 使用 Celery + Redis 异步处理超大规模的 `ct_export.csv`,按券号与交易日期拆分并输出标准化的报价 CSV。核心思路是流式读取(避免一次性载入数百 GB 的文件),仅保留指定券(默认 `09250412IB` 与 `160408IB`),再按 `securityID_yyyymmdd.csv` 写入 `data/output`。 ## 目录结构 ``` data/ input/ct_export.csv # 原始行情导出(需自行放置) output/ # Celery 任务写入的结果 src/bulk_data_processor/ config.py # 配置(支持环境变量) processing.py # 流式处理与落盘逻辑 tasks.py # Celery 任务定义 celery_app.py # Celery 应用初始化 redis_client.py # Redis 连接封装(注入账号/密码) cli.py # Typer CLI,提交 Celery 任务 ``` ## 环境准备 1. **Python**:建议 3.11+。 2. **Redis**:Celery 使用 Redis 作为 broker & result backend,可使用本地安装或 Docker。 3. **安装依赖** ```bash python -m venv .venv .\.venv\Scripts\activate # Windows pip install -r requirements.txt ``` ## 配置说明 所有配置都可通过环境变量覆盖,前缀为 `BULK_`(与 `config.py` 定义一致): | 环境变量 | 说明 | 默认值 | | --- | --- | --- | | `BULK_BROKER_URL` | Celery broker(Redis) | `redis://localhost:6379/0` | | `BULK_RESULT_BACKEND` | 结果存储 | `redis://localhost:6379/1` | | `BULK_REDIS_USERNAME` | Redis 用户(可选,用于 ACL) | 空 | | `BULK_REDIS_PASSWORD` | Redis 密码(可选,建议通过 `.env` 或环境变量注入) | 空 | | `BULK_TASK_DEFAULT_QUEUE` | 任务队列 | `bulk.default` | | `BULK_DATA_INPUT_DIR` | 输入目录 | `data/input` | | `BULK_DATA_OUTPUT_DIR` | 输出目录 | `data/output` | | `BULK_CT_EXPORT_FILENAME` | 输入文件名 | `ct_export.csv` | | `BULK_DEFAULT_SECURITY_IDS` | 默认券号(逗号分隔) | `09250412IB,160408IB` | | `BULK_WRITE_THRESHOLD` | 缓冲写入阈值 | `2000` | | `BULK_MAX_ROWS` | 输入行数上限(调试用,空值读取全量) | `50000` | > Windows CMD/PowerShell 示例 > `set BULK_MAX_ROWS=` 读取全部数据;`set BULK_DEFAULT_SECURITY_IDS=09250412IB,160408IB,2500006IB` 扩展券列表。 ## 运行流程 ### 1. 启动 Redis ```bash docker run --rm -p 6379:6379 redis:7 ``` ### 2. 启动 Celery Worker ```bash celery -A bulk_data_processor.celery_app worker --loglevel=info ``` ### 3. 提交任务(Typer CLI) ```bash python -m bulk_data_processor.cli enqueue --wait # 或自定义券与读取行数 python -m bulk_data_processor.cli enqueue -s 09250412IB -s 160408IB --max-rows 100000 --wait ``` - 默认处理 `config.py` 中设置的券。 - 可多次添加 `--security/-s` 参数来覆盖券列表。 - `--wait` 会阻塞并输出 JSON 结果(读取行数、匹配记录、写入文件列表)。 ### 4. 查看输出 任务完成后,`data/output` 下会产生若干 `securityID_yyyymmdd.csv` 文件。再次运行同一任务会以追加方式写入,便于逐日积累。 此外,`tasks.py` 会在结束时把 JSON 结果同步写入 Redis,键名格式为 `bulk:tasks:`,便于外部系统再读取: ```python from redis import Redis r = Redis(host="47.115.77.75", port=6379, password="123456Wdj", decode_responses=True) result = r.get("bulk:tasks:") ``` Redis 连接参数与 Celery 配置保持一致(可以通过 `BULK_REDIS_USERNAME/BULK_REDIS_PASSWORD` 重写),无需额外维护两套配置。 ## 常见问题 1. **提示未找到输入文件**:确认 `data/input/ct_export.csv` 存在,或者设置 `BULK_DATA_INPUT_DIR` 与 `BULK_CT_EXPORT_FILENAME`。 2. **只想调试少量数据**:保留默认 `BULK_MAX_ROWS=50000`,或在 CLI 中直接传 `--max-rows 100000`。 3. **需要读全量**:清空或删除 `BULK_MAX_ROWS`,注意处理时长和磁盘空间。 4. **字符显示异常**:输出文件统一为 UTF-8,可在 Excel 中通过“数据-自文本”并指定 UTF-8 导入。 根据需要可以新增券号、调整并发 worker,或在 `tasks.py` 中拓展更多 Celery 任务以复用同一套流式处理框架。