# FlinkTransactionProcessor **Repository Path**: ywygoogle/flink-transaction-processor ## Basic Information - **Project Name**: FlinkTransactionProcessor - **Description**: 从 Kafka 实时消费交易流水数据,每条记录包含字段 source_id、user_id、bank_card、time、amount 和 ukey。 标签计算:需要实时计算用户标签值,例如转入转出比率(即用户的转入金额与转出金额的比值),并支持处理时间乱序的数据。 学习Demo - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-02-27 - **Last Updated**: 2025-03-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flink交易处理器 基于Apache Flink的实时交易数据处理系统,用于计算金融交易标签。 ## 项目概述 本项目是一个基于Apache Flink的实时数据处理系统,主要用于处理金融交易数据,并根据特定规则计算交易标签。系统从Kafka读取交易数据,经过处理后将标签结果写入Kafka和Doris。 ### 主要功能 - 从Kafka读取交易数据和账户数据 - 基于clue_id和jykh的20分钟会话窗口,当窗口内没有新数据到达时触发计算 - 执行可扩展的标签计算逻辑 - 将标签结果写入Kafka - 更新已处理的记录到Doris,用于后续去重和历史数据查询 ### 已实现的标签 1. **夜间高频交易标签**:识别在夜间(00:00-07:00)进行的高频交易行为 2. **大额交易标签**:识别单笔金额大于等于5万的交易 ## 系统架构 系统采用模块化设计,主要包含以下组件: - **MainClass**:应用程序入口,负责初始化环境和启动处理流程 - **TagProcessorService**:标签处理服务,负责协调标签计算 - **TagProcessor接口**:标签处理器接口,所有标签计算逻辑都实现此接口 - **AsyncSqlUtil**:SQL执行工具类,提供异步SQL执行功能 - **TableDefinitions**:表定义类,包含所有表的创建SQL - **AppConfig**:应用配置类,提供所有配置参数 ## 标签处理框架 系统实现了一个可扩展的标签处理框架,可以方便地添加新的标签计算逻辑: 1. 实现TagProcessor接口 2. 在TagProcessorService中注册标签处理器 3. 系统会自动执行所有注册的标签处理器 ## 环境要求 - Java 8+ - Apache Flink 1.15+ - Apache Kafka 2.8+ - Apache Doris 1.2+ ## 配置说明 主要配置参数在AppConfig类中定义: - Kafka配置:服务器地址、主题名称等 - Doris配置:服务器地址、用户名、密码等 - 应用配置:检查点间隔、并行度等 ## 运行说明 1. 确保Kafka和Doris服务已启动 2. 编译项目:`mvn clean package` 3. 提交到Flink集群:`flink run -c com.meiya.flink.MainClass target/flink-transaction-processor-1.0.0.jar` ## 扩展标签处理器 要添加新的标签处理器,只需: 1. 创建一个实现TagProcessor接口的类 2. 实现getTagName()和generateTagSQL()方法 3. 在TagProcessorService.registerTagProcessors()方法中注册新的处理器 示例: ```java public class NewTagProcessor implements TagProcessor { @Override public String getTagName() { return "新标签名称"; } @Override public String generateTagSQL(boolean useProcessedUkeys) { // 构建标签计算SQL return "INSERT INTO tags_result ..."; } } ```