# cache-sync-spring-boot-starter **Repository Path**: alan_lw/cache-sync-spring-boot-starter ## Basic Information - **Project Name**: cache-sync-spring-boot-starter - **Description**: No description available - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-04-13 - **Last Updated**: 2026-04-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 基于 Redis Streams 的多实例本地缓存同步工具 ## 1. 项目简介 本项目是一个基于 Redis Streams 的轻量级 Spring Boot Starter,用于实现多实例部署场景下的本地缓存同步。当某个实例更新或删除数据时,会通知其他实例清理本地缓存,以保证缓存一致性。 ### 核心特性 - **可靠广播**:每个实例都能收到缓存清理通知,且消息不丢失(至少一次)。 - **自动容错**:支持实例宕机后,未处理消息可被其他实例认领。 - **易于集成**:提供注解和 API,业务代码只需简单配置即可接入。 - **低侵入**:与现有本地缓存(如 Caffeine)解耦,仅提供清理事件机制。 - **监控指标**:暴露 Micrometer 指标,便于监控系统运行状态。 - **类型匹配**:支持基于 type 和 subType 的缓存清理处理器匹配机制,实现精细化缓存管理。 - **事务支持**:支持在事务提交后发送缓存清理消息,确保数据一致性。 - **延时消息**:支持发送延时缓存清理消息,适用于需要延迟清理缓存的场景。 - **告警扩展**:提供可插拔的告警处理器接口,支持自定义告警渠道(钉钉、企业微信、邮件等)。 ## 2. 技术实现 - **消息存储**:使用 Redis 5.0+ 引入的 Streams 数据结构。 - **广播机制**:每个实例创建独立的消费者组,确保每个实例都能消费所有消息。 - **故障恢复**:定期扫描 Pending 消息,认领超时消息。 - **Stream 清理**:通过配置限制 Stream 长度,避免内存溢出。 - **延时消息**:使用 Redis ZSET 存储延时消息,通过独立线程定期扫描到期消息并发布到 Stream。 ## 3. 快速开始 ### 3.1 依赖引入 在 Maven 项目的 `pom.xml` 文件中添加以下依赖: ```xml org.cache.sync cache-sync-starter 1.0.0 ``` ### 3.2 配置项 在 `application.yml` 或 `application.properties` 文件中添加以下配置: ```yaml cache: sync: enabled: true # 是否启用缓存同步组件 stream-key: "cache:sync:stream" # Redis Stream 的 key consumer-group-prefix: "cache-sync-group" # 消费者组前缀,实际名称为 prefix-instanceId instance-id: ${spring.application.name}-${random.value} # 实例唯一标识,默认随机生成 message-timeout-ms: 300000 # 消息超时时间(毫秒),超过后可由其他实例认领,默认 5 分钟 max-retry: 3 # 最大重试次数(认领次数),超过则丢弃 batch-size: 10 # 每次 XREADGROUP 读取的消息数量 block-ms: 5000 # 阻塞读取时间(毫秒) max-len: 10000 # Stream 最大保留消息数量(通过 ~ 近似裁剪) thread-pool-size: 1 # 消费线程数(通常 1 即可) enable-metrics: true # 是否暴露 Micrometer 指标 ``` ### 3.3 使用方式 #### 3.3.1 注解方式(推荐) 使用 `@LocalCacheEvict` 注解标记需要自动发布缓存清理消息的方法: ```java import org.cache.sync.annotation.LocalCacheEvict; @Service public class MyService { // 基本用法,使用默认的type和subType @LocalCacheEvict(cacheKey = "#id") // 当方法执行完后自动发布清理消息 public void updateData(String id) { // 更新数据库逻辑 } // 高级用法,指定type和subType @LocalCacheEvict(type = "user", subType = "profile", cacheKey = "#id") public void updateUserProfile(String id) { // 更新用户资料 } // 非事务模式 @LocalCacheEvict(type = "product", subType = "info", cacheKey = "#productId", afterTransaction = false) public void updateProductInfo(String productId) { // 更新产品信息 } } ``` #### 3.3.2 编程方式 直接注入 `CacheSyncPublisher` Bean 并调用其方法: ```java import org.cache.sync.core.CacheSyncPublisher; import java.util.HashMap; import java.util.Map; @Service public class UserService { @Autowired private CacheSyncPublisher cacheSyncPublisher; public void deleteUser(String userId) { // 删除数据库 userDao.delete(userId); // 构建元数据 Map metadata = new HashMap<>(); metadata.put("operation", "delete"); // 广播清理本地缓存 cacheSyncPublisher.publishCacheClean("user", "profile", "user:" + userId, metadata, true); } // 发送延时缓存清理消息(2秒后执行) public void updateUserStatus(String userId) { // 更新用户状态 userDao.updateStatus(userId, "active"); // 构建元数据 Map metadata = new HashMap<>(); metadata.put("operation", "update"); // 广播延时清理本地缓存 cacheSyncPublisher.publishCacheClean("user", "status", "user:status:" + userId, 2000, metadata, true); } } ``` #### 3.3.3 自定义清理处理器 实现 `CacheCleanHandler` 接口来自定义缓存清理逻辑: ```java import org.cache.sync.core.CacheCleanHandler; import java.util.Map; @Component public class UserCacheCleanHandler implements CacheCleanHandler { static final Cache localCache = Caffeine.newBuilder() .initialCapacity(4) .maximumSize(512) .expireAfterAccess(10, TimeUnit.MINUTES) .build(); // 只处理用户相关的缓存清理 @Override public String supportType() { return "user"; } @Override public String supportSubType() { return "profile"; } @Override public void cacheSync(String type, String subType, String cacheKey, Map metadata) { // 清理本地缓存的实现 // localCache.invalidateAll(); localCache.invalidate(cacheKey); System.out.println("清理缓存: " + cacheKey + ", 操作: " + metadata.get("operation")); } } ``` ## 4. 监控指标 本组件暴露以下 Micrometer 指标: - `cache.sync.messages.published`:发布消息总数(Counter) - `cache.sync.messages.consumed`:消费成功总数(Counter) - `cache.sync.messages.failed`:消费失败总数(Counter) - `cache.sync.pending.size`:当前消费者组 PEL 中的消息数量(Gauge) - `cache.sync.lag`:最近一次消费的 ID 与 Stream 最新 ID 的差距(Gauge) 这些指标可以通过 Spring Boot Actuator 暴露,便于监控系统运行状态。 ## 5. 故障恢复 当实例宕机后,未处理的消息会保存在 Redis Streams 的 PEL(Pending Entries List)中。其他实例会定期扫描 PEL,认领超时的消息并处理,确保消息不会丢失。 ## 6. 性能与可靠性 - **性能**:单实例处理能力 ≥ 1000 条/秒(消息体小,仅做缓存 key 清理)。 - **延迟**:发布延迟 P99 ≤ 10ms(网络正常,Redis 单机);广播时,所有实例消费延迟 P99 ≤ 50ms。 - **可靠性**:依赖 Redis 高可用(Sentinel / Cluster),组件本身支持自动重连和故障切换。 ## 7. 环境要求 - **Redis**:版本 ≥ 5.0(支持 Streams) - **Spring Boot**:3.x - **Java**:25+ ## 8. 常见问题 ### 8.1 Redis 连接失败 - 检查 Redis 服务是否正常运行 - 检查网络连接是否畅通 - 检查配置的 Redis 地址和端口是否正确 ### 8.2 消息未被消费 - 检查实例是否正常运行 - 检查消费者组是否正确创建 - 检查 Stream 中是否有消息 ### 8.3 内存占用过高 - 调整 `maxlen` 参数,限制 Stream 长度 - 定期清理 Stream 中的历史消息 ## 9. 测试验证 项目包含完整的单元测试,验证以下功能: - Redis 连接正常 - 消息发布功能正常 - 带元数据的消息发布功能正常 - 监控指标功能正常 - 延时消息功能正常 ## 10. 告警功能 本组件提供可扩展的告警机制,当发生异常时可通知到指定的告警渠道。 ### 10.1 内置告警类型 组件内置了以下告警类型: - **PUBLISH_FAILED**:消息发布失败(ERROR 级别) - **CONSUME_FAILED**:消息消费失败(ERROR 级别) - **MESSAGE_DISCARDED**:消息被丢弃,重试次数超限(CRITICAL 级别) - **DELAYED_MESSAGE_RETRY_EXHAUSTED**:延时消息重试耗尽(CRITICAL 级别) - **CONSUMER_GROUP_CREATE_FAILED**:消费者组创建失败(WARN 级别) - **PENDING_SCAN_ERROR**:Pending 消息扫描异常(ERROR 级别) - **MESSAGE_RESEND_FAILED**:消息重新发送失败(ERROR 级别) - **OFFLINE_CONSUMER_CLEANUP_ERROR**:离线消费者清理异常(WARN 级别) - **LAG_METRICS_UPDATE_FAILED**:Lag 指标更新失败(INFO 级别) ### 10.2 自定义告警处理器 实现 `CacheSyncAlertHandler` 接口来自定义告警逻辑: ```java import org.cache.sync.alert.CacheSyncAlertHandler; import org.cache.sync.alert.CacheSyncAlertEvent; import org.cache.sync.alert.AlertLevel; import org.springframework.stereotype.Component; @Component public class DingTalkAlertHandler implements CacheSyncAlertHandler { @Override public void handle(CacheSyncAlertEvent event) { // 只处理 ERROR 和 CRITICAL 级别的告警 if (event.getLevel() == AlertLevel.ERROR || event.getLevel() == AlertLevel.CRITICAL) { // 构建告警消息 String message = String.format( "【缓存同步告警】\n类型: %s\n级别: %s\n消息: %s\n实例: %s\n时间: %s", event.getType(), event.getLevel(), event.getMessage(), event.getInstanceId(), event.getTimestamp() ); // 发送到钉钉 sendToDingTalk(message); } } private void sendToDingTalk(String message) { // 实现钉钉 webhook 调用逻辑 // ... } } ``` ### 10.3 告警事件信息 `CacheSyncAlertEvent` 包含以下信息: - **type**:告警类型(AlertType 枚举) - **level**:告警级别(INFO/WARN/ERROR/CRITICAL) - **message**:告警消息描述 - **exception**:异常信息(可选) - **metadata**:附加元数据(Map 类型) - **timestamp**:告警发生时间 - **instanceId**:实例 ID ## 11. 项目结构 ``` cache-sync-spring-boot-starter/ ├── src/main/java/org/cache/sync/ │ ├── alert/ # 告警模块 │ │ ├── AlertLevel.java # 告警级别枚举 │ │ ├── AlertType.java # 告警类型枚举 │ │ ├── CacheSyncAlertEvent.java # 告警事件 │ │ ├── CacheSyncAlertHandler.java # 告警处理器接口 │ │ └── DefaultCacheSyncAlertHandler.java # 默认告警处理器 │ ├── annotation/ # 注解模块 │ │ └── LocalCacheEvict.java # 缓存清理注解 │ ├── aspect/ # AOP 切面 │ │ └── LocalCacheEvictAspect.java # 注解切面实现 │ ├── config/ # 配置模块 │ │ ├── CacheSyncAutoConfiguration.java # 自动配置类 │ │ ├── CacheSyncProperties.java # 配置属性 │ │ └── RedisKey.java # Redis Key 常量 │ ├── core/ # 核心模块 │ │ ├── CacheCleanHandler.java # 缓存清理处理器接口 │ │ ├── CacheSyncConsumer.java # 消息消费者 │ │ ├── CacheSyncPublisher.java # 消息发布者接口 │ │ ├── DefaultCacheCleanHandler.java # 默认清理处理器 │ │ ├── InternalMessage.java # 内部消息结构 │ │ └── RedisStreamCacheSyncPublisher.java # Redis Stream 发布者实现 │ ├── metrics/ # 监控指标 │ │ └── CacheSyncMetrics.java # Micrometer 指标 │ └── utils/ # 工具类 │ └── IpUtils.java # IP 工具类 └── src/test/ # 测试代码 ``` ## 12. 工作流程 ```mermaid graph TB A[业务代码更新数据] --> B{发布方式} B -->|注解方式| C[@LocalCacheEvict] B -->|编程方式| D[CacheSyncPublisher] C --> E[发布消息到 Redis Stream] D --> E E --> F[Redis Stream 存储消息] F --> G{各实例独立消费} G --> H[实例1: 读取消息] G --> I[实例2: 读取消息] G --> J[实例N: 读取消息] H --> K[匹配 CacheCleanHandler] I --> K J --> K K --> L[清理本地缓存] L --> M[XACK 确认消息] M --> N[完成] O[定期扫描] --> P{发现超时 Pending 消息?} P -->|是| Q[XCLAIM 认领消息] Q --> R[处理并确认] P -->|否| O S{发生异常?} -->|是| T[触发告警] T --> U[CacheSyncAlertHandler] U --> V[记录日志/发送通知] ``` ## 13. 常见问题 ## 14. 版本历史 - **1.0.0**:初始版本,实现基于 Redis Streams 的多实例本地缓存同步功能。