# cacheSync
**Repository Path**: alan_lw/cache-sync
## Basic Information
- **Project Name**: cacheSync
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Apache-2.0
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2026-04-09
- **Last Updated**: 2026-04-09
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 基于 Redis Streams 的多实例本地缓存同步工具
## 1. 项目简介
本项目是一个基于 Redis Streams 的轻量级 Spring Boot Starter,用于实现多实例部署场景下的本地缓存同步。当某个实例更新或删除数据时,会通知其他实例清理本地缓存,以保证缓存一致性。
### 核心特性
- **可靠广播**:每个实例都能收到缓存清理通知,且消息不丢失(至少一次)。
- **自动容错**:支持实例宕机后,未处理消息可被其他实例认领。
- **易于集成**:提供注解和 API,业务代码只需简单配置即可接入。
- **低侵入**:与现有本地缓存(如 Caffeine)解耦,仅提供清理事件机制。
- **监控指标**:暴露 Micrometer 指标,便于监控系统运行状态。
- **类型匹配**:支持基于 type 和 subType 的缓存清理处理器匹配机制,实现精细化缓存管理。
- **事务支持**:支持在事务提交后发送缓存清理消息,确保数据一致性。
- **延时消息**:支持发送延时缓存清理消息,适用于需要延迟清理缓存的场景。
## 2. 技术实现
- **消息存储**:使用 Redis 5.0+ 引入的 Streams 数据结构。
- **广播机制**:每个实例创建独立的消费者组,确保每个实例都能消费所有消息。
- **故障恢复**:定期扫描 Pending 消息,认领超时消息。
- **Stream 清理**:通过配置限制 Stream 长度,避免内存溢出。
- **延时消息**:使用 Redis ZSET 存储延时消息,通过独立线程定期扫描到期消息并发布到 Stream。
## 3. 快速开始
### 3.1 依赖引入
在 Maven 项目的 `pom.xml` 文件中添加以下依赖:
```xml
org.cache.sync
cache-sync-starter
1.0.0
```
### 3.2 配置项
在 `application.yml` 或 `application.properties` 文件中添加以下配置:
```yaml
cache:
sync:
enabled: true # 是否启用缓存同步组件
stream-key: "cache:sync:stream" # Redis Stream 的 key
consumer-group-prefix: "cache-sync-group" # 消费者组前缀,实际名称为 prefix-instanceId
instance-id: ${spring.application.name}-${random.value} # 实例唯一标识,默认随机生成
message-timeout-ms: 300000 # 消息超时时间(毫秒),超过后可由其他实例认领,默认 5 分钟
max-retry: 3 # 最大重试次数(认领次数),超过则丢弃
batch-size: 10 # 每次 XREADGROUP 读取的消息数量
block-ms: 5000 # 阻塞读取时间(毫秒)
max-len: 100 # Stream 最大保留消息数量(通过 ~ 近似裁剪)
thread-pool-size: 1 # 消费线程数(通常 1 即可)
enable-metrics: true # 是否暴露 Micrometer 指标
```
### 3.3 使用方式
#### 3.3.1 注解方式(推荐)
使用 `@LocalCacheEvict` 注解标记需要自动发布缓存清理消息的方法:
```java
import org.cache.sync.annotation.LocalCacheEvict;
@Service
public class MyService {
// 基本用法,使用默认的type和subType
@LocalCacheEvict(cacheKey = "#id") // 当方法执行完后自动发布清理消息
public void updateData(String id) {
// 更新数据库逻辑
}
// 高级用法,指定type和subType
@LocalCacheEvict(type = "user", subType = "profile", cacheKey = "#id")
public void updateUserProfile(String id) {
// 更新用户资料
}
// 非事务模式
@LocalCacheEvict(type = "product", subType = "info", cacheKey = "#productId", afterTransaction = false)
public void updateProductInfo(String productId) {
// 更新产品信息
}
}
```
#### 3.3.2 编程方式
直接注入 `CacheSyncPublisher` Bean 并调用其方法:
```java
import org.cache.sync.core.CacheSyncPublisher;
import java.util.HashMap;
import java.util.Map;
@Service
public class UserService {
@Autowired
private CacheSyncPublisher cacheSyncPublisher;
public void deleteUser(String userId) {
// 删除数据库
userDao.delete(userId);
// 构建元数据
Map metadata = new HashMap<>();
metadata.put("operation", "delete");
// 广播清理本地缓存
cacheSyncPublisher.publishCacheClean("user", "profile", "user:" + userId, metadata, true);
}
// 发送延时缓存清理消息(2秒后执行)
public void updateUserStatus(String userId) {
// 更新用户状态
userDao.updateStatus(userId, "active");
// 构建元数据
Map metadata = new HashMap<>();
metadata.put("operation", "update");
// 广播延时清理本地缓存
cacheSyncPublisher.publishCacheClean("user", "status", "user:status:" + userId, 2000, metadata, true);
}
}
```
#### 3.3.3 自定义清理处理器
实现 `CacheCleanHandler` 接口来自定义缓存清理逻辑:
```java
import org.cache.sync.core.CacheCleanHandler;
import java.util.Map;
@Component
public class UserCacheCleanHandler implements CacheCleanHandler {
static final Cache localCache = Caffeine.newBuilder()
.initialCapacity(4)
.maximumSize(512)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build();
// 只处理用户相关的缓存清理
@Override
public String supportType() {
return "user";
}
@Override
public String supportSubType() {
return "profile";
}
@Override
public void cacheSync(String type, String subType, String cacheKey, Map metadata) {
// 清理本地缓存的实现
// localCache.invalidateAll();
localCache.invalidate(cacheKey);
System.out.println("清理缓存: " + cacheKey + ", 操作: " + metadata.get("operation"));
}
}
```
## 4. 监控指标
本组件暴露以下 Micrometer 指标:
- `cache.sync.messages.published`:发布消息总数(Counter)
- `cache.sync.messages.consumed`:消费成功总数(Counter)
- `cache.sync.messages.failed`:消费失败总数(Counter)
- `cache.sync.pending.size`:当前消费者组 PEL 中的消息数量(Gauge)
- `cache.sync.lag`:最近一次消费的 ID 与 Stream 最新 ID 的差距(Gauge)
这些指标可以通过 Spring Boot Actuator 暴露,便于监控系统运行状态。
## 5. 故障恢复
当实例宕机后,未处理的消息会保存在 Redis Streams 的 PEL(Pending Entries List)中。其他实例会定期扫描 PEL,认领超时的消息并处理,确保消息不会丢失。
## 6. 性能与可靠性
- **性能**:单实例处理能力 ≥ 1000 条/秒(消息体小,仅做缓存 key 清理)。
- **延迟**:发布延迟 P99 ≤ 10ms(网络正常,Redis 单机);广播时,所有实例消费延迟 P99 ≤ 50ms。
- **可靠性**:依赖 Redis 高可用(Sentinel / Cluster),组件本身支持自动重连和故障切换。
## 7. 环境要求
- **Redis**:版本 ≥ 5.0(支持 Streams)
- **Spring Boot**:3.x
- **Java**:17+
## 8. 常见问题
### 8.1 Redis 连接失败
- 检查 Redis 服务是否正常运行
- 检查网络连接是否畅通
- 检查配置的 Redis 地址和端口是否正确
### 8.2 消息未被消费
- 检查实例是否正常运行
- 检查消费者组是否正确创建
- 检查 Stream 中是否有消息
### 8.3 内存占用过高
- 调整 `maxlen` 参数,限制 Stream 长度
- 定期清理 Stream 中的历史消息
## 9. 测试验证
项目包含完整的单元测试,验证以下功能:
- Redis 连接正常
- 消息发布功能正常
- 带元数据的消息发布功能正常
- 监控指标功能正常
- 延时消息功能正常
## 10. 版本历史
- **1.0.0**:初始版本,实现基于 Redis Streams 的多实例本地缓存同步功能。