# dataflow-notifier **Repository Path**: ju-zhuohan/dataflow-notifier ## Basic Information - **Project Name**: dataflow-notifier - **Description**: 一个演示如何运用设计模式构建可扩展业务任务的示例项目。通过数据查询、Excel 导出与多渠道通知,展示模板方法、策略模式、责任链等设计思想,并支持补偿机制。 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-09-16 - **Last Updated**: 2025-10-05 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # DataPulse-Demo - 优雅的数据流处理框架 ## 项目简介 **业务场景:** 定时生成报表并发送邮件(关键业务流程) **数据流:** ``` 输入参数 → 查询List → 转换Excel → 获取收件人List → 发送邮件 ``` **设计目标:** 步骤解耦、可替换、易维护、流程可控 --- ## 版本演进 ### V1:直接实现 ✅(当前) **特点:** 功能完整,快速验证 ```java public class SimpleReportTask { public TaskResult execute(String reportType) { List data = queryData(reportType); // 步骤1 String filePath = generateExcel(data, reportType); // 步骤2 sendNotification(filePath, data, reportType); // 步骤3 return TaskResult.success(); } } ``` **优势:** - ✅ 简单直接,易理解 - ✅ 快速开发,验证业务 **局限:** - ❌ 步骤耦合,难以单独调整 - ❌ 流程固定,无法灵活扩展 - ❌ 不支持断点续跑 - ❌ 步骤无法复用替换 --- ### V2:Pipeline模式 ✅(已实现) **核心思想:** 数据在管道中流动,每个步骤独立处理 ``` PipelineContext(数据容器) ↓ ┌──────────────┐ │ Step 1: 查询 │ → context.orderData └──────────────┘ ↓ ┌──────────────┐ │ Step 2: Excel│ → context.filePath └──────────────┘ ↓ ┌──────────────┐ │ Step 3: 收件人│ → context.recipients └──────────────┘ ↓ ┌──────────────┐ │ Step 4: 发送 │ → context.result └──────────────┘ ``` **核心接口:** ```java // 步骤处理器 public interface StepHandler { String getName(); void handle(PipelineContext context); boolean shouldExecute(PipelineContext context); // 幂等性 } // 流程管道 public class ReportTaskPipeline { private List steps; public TaskResult execute(String reportType) { PipelineContext context = new PipelineContext(reportType); for (StepHandler step : steps) { if (step.shouldExecute(context)) { step.handle(context); } } return TaskResult.success(context); } } ``` **优势:** - ✅ **解耦** - 步骤独立,单一职责 - ✅ **可替换** - 实现接口即可替换步骤 - ✅ **灵活** - 可配置步骤顺序 - ✅ **可控** - 统一流程管理 - ✅ **幂等** - 支持断点续跑 - ✅ **可测试** - 每个步骤可单独测试 --- ### V3:轻量级可靠增强 🚀(已实现) **核心设计理念:** 专注于任务级别的可靠性,邮件发送通过外部服务(模拟Feign调用) **增强特性:** 1. **断点续跑** - 任务级状态持久化 ```java // 执行任务 TaskResult result = pipeline.execute("日报"); // 任务失败后,从断点恢复 TaskResult result = pipeline.recover(taskId); // 已完成的步骤会被跳过,从失败点继续 ``` 2. **外部服务调用** - 邮件服务解耦 ```java @Component public class EmailServiceClient { // 模拟Feign调用外部邮件服务 public EmailResult sendEmail(String email, String name, String subject, String content) { // 实际项目中:调用外部邮件服务API // 返回同步结果(成功/失败) } } ``` 3. **幂等性保证** - 避免重复处理 ```java // 步骤级幂等:已完成的步骤跳过 if (context.getOrderData() != null) { log.info("数据已查询,跳过"); return false; } // 邮件级幂等:已发送的不重复发送 if (context.getSuccessEmails().contains(email)) { log.info("邮件已发送给: {}, 跳过", email); continue; } ``` --- ## 设计对比 ### 传统实现(V1) ```java // 步骤耦合在一起 public void execute() { List data = queryData(); String file = generateExcel(data); // 失败 sendEmail(file); // 未执行,前面白做 } ``` **问题:** - 步骤2失败,步骤1白做(不可恢复) - 无法单独重新执行步骤3 - 扩展新步骤需要修改主流程 --- ### Pipeline实现(V2) ```java // 步骤独立,数据在Context中流动 public void execute() { PipelineContext ctx = new PipelineContext(); dataQueryStep.handle(ctx); // → ctx.data excelGeneratorStep.handle(ctx); // → ctx.filePath (失败,状态已保存) emailSenderStep.handle(ctx); // 可单独重新执行 } ``` **优势:** - ✅ 每步保存状态,可恢复 - ✅ 可单独执行任意步骤 - ✅ 新增步骤无需修改主流程 --- ## 核心设计模式 | 模式 | 应用 | 解决问题 | |------|------|---------| | **Pipeline** | 流程编排 | 统一管理,数据流转 | | **Strategy** | StepHandler接口 | 步骤可替换 | | **Chain of Responsibility** | 步骤链式执行 | 解耦步骤 | | **Decorator** | 步骤增强 | 动态添加重试/监控 | --- ## 项目结构 ``` src/main/java/com/demo/arch/datapulsedemo/ common/ # 通用组件(业务无关) ├── OrderData.java # 数据实体 ├── TaskResult.java # 结果封装 ├── MockOrderMapper.java # 数据访问 └── NotificationRecipientMapper.java v1_simple_direct/ # V1:直接实现 ├── task/ │ └── SimpleReportTask.java # 业务逻辑(步骤耦合) └── controller/ └── ReportController.java v2_pipeline/ # V2:Pipeline实现 ├── core/ │ ├── PipelineContext.java # 数据容器 │ ├── StepHandler.java # 步骤接口 │ └── ReportTaskPipeline.java # 管道 │ ├── steps/ # 步骤实现(可替换) │ ├── DataQueryStepHandler.java │ ├── ExcelGeneratorStepHandler.java │ ├── RecipientQueryStepHandler.java │ └── EmailSendStepHandler.java │ └── controller/ └── ReportControllerV2.java v3_reliable/ # V3:可靠性增强 ├── core/ │ ├── ContextStorage.java # 上下文存储 │ └── ReliablePipeline.java # 可靠Pipeline │ ├── outbox/ # Outbox模式 │ ├── EmailOutboxEntry.java # 邮件条目 │ ├── EmailOutboxRepository.java # 邮件仓储 │ ├── EmailSendService.java # 邮件服务 │ └── EmailOutboxProcessor.java # 后台处理器 │ ├── steps/ # 可靠步骤 │ └── ReliableEmailSendStepHandler.java │ ├── example/ # 示例 │ ├── EmailOutboxExample.java │ ├── RecoveryExample.java │ └── ExampleController.java │ └── controller/ └── ReliableTaskController.java ``` --- ## 快速开始 ### 1. 启动项目 ```bash # 编译 mvn clean compile # 启动 mvn spring-boot:run ``` ### 2. V1版本测试 **方式1:使用测试脚本** ```bash # Linux/Mac chmod +x test-v1.sh ./test-v1.sh # Windows test-v1.bat ``` **方式2:手动测试** ```bash # 健康检查 curl "http://localhost:8080/v1/health" # 生成日报 curl "http://localhost:8080/v1/report?type=日报" # 生成周报 curl "http://localhost:8080/v1/report?type=周报" # 生成月报 curl "http://localhost:8080/v1/report?type=月报" ``` ### 3. 查看日志 控制台会详细输出执行过程,包括: - 查询数据的样例 - Excel生成的步骤 - 通知接收人的匹配逻辑 - 邮件发送的结果 ### 4. V1详细说明 查看 `src/main/java/com/demo/arch/datapulsedemo/v1_simple_direct/README.md` ### 5. V2版本测试(Pipeline模式)✅ **方式1:使用测试脚本** ```bash # Linux/Mac chmod +x test-v2.sh ./test-v2.sh # Windows test-v2.bat ``` **方式2:手动测试** ```bash # 健康检查 curl "http://localhost:8080/v2/health" # 查询Pipeline步骤信息 curl "http://localhost:8080/v2/steps" # 生成日报 curl "http://localhost:8080/v2/report?type=日报" # 生成周报 curl "http://localhost:8080/v2/report?type=周报" # 生成月报 curl "http://localhost:8080/v2/report?type=月报" ``` ### 6. V3版本测试(可靠Pipeline)🚀 **方式1:使用测试脚本** ```bash # Linux/Mac chmod +x test-v3.sh ./test-v3.sh # Windows test-v3.bat ``` **方式2:手动测试** ```bash # 健康检查 curl "http://localhost:8080/v3/health" # 生成日报 curl "http://localhost:8080/v3/report?type=日报" # 查询任务状态(替换为实际任务ID) curl "http://localhost:8080/v3/task/status/TASK_V3_12345678" # 查询失败任务 curl "http://localhost:8080/v3/task/failed" # 恢复失败任务(替换为实际任务ID) curl -X POST "http://localhost:8080/v3/task/recover/TASK_V3_12345678" # 示例接口 curl "http://localhost:8080/v3/example" curl "http://localhost:8080/v3/example/create-failed-task" curl "http://localhost:8080/v3/example/list-failed-tasks" curl "http://localhost:8080/v3/example/test-pipeline" ``` ### 7. V1 vs V2 vs V3 对比 在控制台日志中,你可以清楚地看到三个版本的区别: **V1日志:** 步骤耦合在一起 ``` ========== V1报表任务开始 ========== --- 步骤1: 查询日报数据 --- --- 步骤2: 生成Excel文件 --- --- 步骤3: 发送邮件通知 --- ========== V1报表任务完成 ========== ``` **V2日志:** Pipeline清晰展示每个步骤 ``` ========== V2 Pipeline 任务开始 ========== 执行步骤: 数据查询 → Excel生成 → 收件人查询 → 邮件发送 --- 步骤1/4: 数据查询 --- >>> [数据查询] 开始执行 >>> [数据查询] 完成,查询到 20 条数据 --- 步骤1: 数据查询 执行成功,耗时: 105ms --- ... ========== V2 Pipeline 任务完成,耗时: 245ms ========== ``` **V3日志:** 可靠Pipeline支持断点续跑和幂等性 ``` ========== V3 ReliablePipeline 任务开始 ========== 任务ID: TASK_V3_A1B2C3D4 报表类型: 日报 当前步骤: 0 执行步骤: 数据查询 → Excel生成 → 收件人查询 → 可靠邮件发送 --- 步骤1/4: 数据查询 --- >>> [数据查询] 开始执行 >>> [数据查询] 完成,查询到 20 条数据 上下文已保存: /tmp/datapulse/contexts/TASK_V3_A1B2C3D4.json ... 邮件已发送给: user1@example.com, 跳过 邮件发送成功: user2@example.com 邮件发送部分失败(已加入重试队列):成功2个,失败1个 ... ========== V3 ReliablePipeline 任务完成,耗时: 356ms ========== ``` ### 8. V2详细说明 查看 `src/main/java/com/demo/arch/datapulsedemo/v2_pipeline/README.md` ### 9. V3详细说明 查看 `src/main/java/com/demo/arch/datapulsedemo/v3_reliable/README.md` --- ## 扩展示例 ### 新增步骤(数据聚合) ```java @Component public class DataAggregationStep implements StepHandler { public void handle(PipelineContext context) { List data = context.getOrderData(); Map aggregated = aggregate(data); context.put("aggregated", aggregated); } } ``` **配置启用:** ```yaml pipeline: 周报: - 查询 - 聚合 # 新增步骤 - Excel - 收件人 - 发送 ``` ### 替换步骤(使用EasyExcel) ```java @Component @Primary // 优先使用 public class EasyExcelGeneratorStep implements StepHandler { public String getName() { return "Excel生成"; } public void handle(PipelineContext context) { // 使用EasyExcel实现 } } ``` --- ## 技术栈 - **Java 17** - 现代Java特性 - **Spring Boot 3.0.2** - 轻量级框架 - **Lombok** - 简化代码 - **SLF4J** - 日志框架 **可选依赖:** - **Spring Data Redis** - 状态持久化(V3) - **Apache POI / EasyExcel** - Excel生成(真实实现) - **Spring Mail** - 邮件发送(真实实现) --- ## 设计价值 ### 1. 简洁优雅 - 核心代码 < 500行 - 概念清晰,易理解 - 符合设计模式 ### 2. 步骤解耦 - 单一职责 - 通过Context传递数据 - 易于单元测试 ### 3. 灵活可控 - 配置调整步骤 - 动态插入新步骤 - 统一流程管理 ### 4. 易于维护 - 代码结构清晰 - 职责明确 - 扩展不影响现有代码 --- ## 学习价值 本项目适合学习: - ✅ Pipeline模式实际应用 - ✅ 如何设计可扩展的数据流处理 - ✅ 步骤解耦和组合的最佳实践 - ✅ 从简单到复杂的渐进式架构演进 --- ## 后续计划 ### Phase 1: 实现V2核心 ✅ - [x] 实现PipelineContext - [x] 实现StepHandler接口 - [x] 实现4个步骤Handler - [x] 实现ReportTaskPipeline ### Phase 2: 实现V3可靠增强 ✅ - [x] 状态持久化与断点续跑 - [x] Outbox模式实现邮件可靠发送 - [x] 幂等性保证与自动重试 ### Phase 3: 真实实现(未来计划) - [ ] Apache POI生成真实Excel - [ ] Spring Mail发送真实邮件 - [ ] 集成测试 --- ## 设计原则 **Simple > Complex > Complicated** 先用最简单的方式解决问题(V1),再用优雅的设计模式重构(V2),最后按需添加企业级特性(V3)。 --- **核心理念:小而美的架构,解决真实问题**