# 数据流水线系统 **Repository Path**: mgrass/dataPipelineSystem ## Basic Information - **Project Name**: 数据流水线系统 - **Description**: 物联网平台 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-08-05 - **Last Updated**: 2025-08-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # DPS - 数据流水线系统 ## 项目简介 DPS(数据流水线系统)是一个基于Spring Boot的微服务,专门用于处理遥测话单数据。系统支持将遥测话单数据同时投放到四种不同的存储方式: 1. **Redis缓存** - 提供快速访问 2. **MQTT消息队列** - 实现异步消息传递 3. **MongoDB数据存储** - 存储非结构化数据 4. **PostgreSQL数据库** - 存储结构化数据(按月分表) ## 技术栈 - **Spring Boot 2.7.18** - **Spring Boot 2.7.18** - **Redis** - 缓存 - **MQTT** - 消息队列 - **MongoDB** - NoSQL数据库 - **PostgreSQL** - 关系型数据库 ## 项目结构 ``` dps/ ├── src/main/java/com/grass/ │ ├── DpsApplication.java # 主启动类 │ ├── config/ # 配置类 │ │ ├── RedisConfig.java │ │ ├── MongoConfig.java │ │ ├── PostgresConfig.java │ │ └── MqttConfig.java │ ├── controller/ # 控制器 │ │ └── TelemetryController.java │ ├── service/ # 服务层 │ │ ├── TelemetryService.java │ │ ├── RedisService.java │ │ ├── MqttService.java │ │ ├── MongoService.java │ │ └── PostgresService.java │ ├── model/ # 数据模型 │ │ └── TelemetryRecord.java │ └── util/ # 工具类 │ └── TableNameUtil.java └── src/main/resources/ ├── application.yml # 应用配置 └── bootstrap.yml # 启动配置 ``` ## 功能特性 ### 1. 遥测话单处理 - 接收遥测话单数据 - 并行投放到四种存储方式 - 支持查询和删除操作 ### 2. 存储方式 - **Redis缓存**: 提供快速访问,缓存时间24小时 - **MQTT消息队列**: 异步消息传递,支持发布/订阅模式 - **MongoDB**: 存储完整的遥测话单数据 - **PostgreSQL**: 按月分表存储,表名格式:telemetry_records_YYYYMM ### 3. 数据模型 遥测话单包含以下字段: - 基本信息:通话ID、主叫号码、被叫号码、通话类型 - 时间信息:开始时间、结束时间、通话时长 - 网络信息:网络类型、基站ID、位置信息 - 质量指标:信号强度、通话质量、是否掉线 - 业务信息:业务类型、费用、状态 - 系统信息:创建时间、设备ID、版本号 ## 快速开始 ### 1. 环境要求 - JDK 8+ - Maven 3.6+ - Redis 6.0+ - MongoDB 4.4+ - PostgreSQL 12+ - MQTT Broker (如Mosquitto) ### 2. 配置修改 修改 `application.yml` 中的数据库连接信息: ```yaml spring: redis: host: localhost port: 6379 data: mongodb: host: localhost port: 27017 database: dps_telemetry datasource: url: jdbc:postgresql://localhost:5432/dps_telemetry username: postgres password: password mqtt: broker: tcp://localhost:1883 username: admin password: admin # 配置示例 ``` ### 3. 启动服务 #### 方式一:使用独立配置(推荐,完全不依赖Nacos) ```bash # Windows standalone-start.bat # 或者直接运行 mvn spring-boot:run -Dspring-boot.run.profiles=standalone -Dspring-boot.run.main-class=com.grass.DpsSimpleApplication ``` #### 方式二:使用简单配置(不依赖Nacos) ```bash # Windows start.bat # 或者直接运行 mvn spring-boot:run -Dspring-boot.run.profiles=simple -Dspring-boot.run.main-class=com.grass.DpsSimpleApplication ``` #### 方式三:使用本地配置(不依赖Nacos) ```bash mvn spring-boot:run -Dspring-boot.run.profiles=local ``` #### 方式四:使用默认配置 ```bash # Windows simple-start.bat # 或者直接运行 mvn spring-boot:run ``` ### 4. API接口 #### 提交遥测话单 ```bash POST /api/telemetry/record Content-Type: application/json { "callId": "CALL_001", "sourceNumber": "13800138000", "targetNumber": "13800138001", "callType": "VOICE", "startTime": "2025-01-15T10:30:00", "endTime": "2025-01-15T10:32:30", "duration": 150, "networkType": "4G", "cellId": "CELL_001", "location": "北京市朝阳区", "signalStrength": -65.5, "callQuality": 95.0, "isDropped": false, "serviceType": "VOICE_CALL", "cost": 0.5, "status": "COMPLETED", "deviceId": "DEVICE_001", "version": "1.0.0" } ``` #### 查询遥测话单 ```bash GET /api/telemetry/record/{callId} ``` #### 删除遥测话单 ```bash DELETE /api/telemetry/record/{callId} ``` #### 健康检查 ```bash GET /api/telemetry/health ``` #### 获取统计信息 ```bash GET /api/telemetry/stats ``` ## 部署说明 ### Docker部署 ```bash # 构建镜像 docker build -t dps:1.0.0 . # 运行容器 docker run -d -p 8080:8080 --name dps dps:1.0.0 ``` ### 集群部署 1. 配置Nacos集群 2. 配置Redis集群 3. 配置MongoDB副本集 4. 配置PostgreSQL主从 5. 使用负载均衡器分发请求 ## 监控与日志 - 日志级别:DEBUG - 日志格式:时间戳 [线程] 级别 类名 - 消息 - 监控指标:可通过Spring Boot Actuator扩展 ## 注意事项 1. 确保所有依赖服务(Redis、MongoDB、PostgreSQL、MQTT)正常运行 2. PostgreSQL表按月自动创建,首次访问某月数据时会自动建表 3. Redis缓存过期时间为24小时 4. MQTT消息采用QoS 1级别,确保消息可靠传递 ## 许可证 MIT License