# snackmq **Repository Path**: dbin0123/snackmq ## Basic Information - **Project Name**: snackmq - **Description**: SnackMQ是一个基于Java开发的轻量级消息队列系统,旨在提供高性能、高可用的分布式消息传递服务。系统采用模块化设计,支持事务消息、延迟消息、消费者组管理等核心功能,并通过Raft协议实现分布式一致性。 - **Primary Language**: Java - **License**: AGPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 1 - **Created**: 2025-06-09 - **Last Updated**: 2026-04-22 ## Categories & Tags **Categories**: Uncategorized **Tags**: Netty, Java ## README # SnackMQ - 高性能分布式消息队列 ## 项目简介 SnackMQ是一个基于Java开发的轻量级消息队列系统,旨在提供高性能、高可用的分布式消息传递服务。系统采用模块化设计,支持事务消息、延迟消息、消费者组管理等核心功能,并通过Raft协议实现分布式一致性。 ## 核心特性 - **高性能**:基于Netty和Java NIO实现的网络通信和存储机制 - **分布式架构**:使用JRaft实现Leader选举和日志复制 - **丰富的消息类型**:支持普通消息、事务消息、延迟消息 - **多交换机类型**:实现Fanout、Direct、Topic等多种路由策略 - **完善的监控**:集成Micrometer实现系统指标收集和暴露 - **高可用设计**:支持集群部署和故障自动转移 ## 快速开始 ### 环境要求 - JDK 8+ - Maven 3.6+ - 至少2GB内存 - 10GB可用磁盘空间 ### 构建项目 ```bash # 克隆仓库 git clone https://github.com/dbin0123/snackmq.git cd snackmq # 构建项目 mvn clean package -DskipTests ``` ### 启动Broker ```bash # 启动单节点Broker java -jar broker/target/broker-1.0.0.jar --server.port=9876 ``` ### 使用示例 #### 1. 生产者示例 ```java // 创建生产者 ProducerConfig producerConfig = ProducerConfig.builder().username("username").password("password").build(); Producer producer = new DefaultProducer("localhost:9876", producerConfig); // 启动生产者 producer.start(); // 准备消息 Message message = Message.builder() .topic(TPS_TEST_TOPIC) .key("key-" + messageId) .body(("TPS Test Body " + messageId).getBytes(StandardCharsets.UTF_8)) .build(); // 发送消息 SendResult result = producer.send(message); System.out.println("消息发送结果: " + result); // 关闭生产者 producer.shutdown(); ``` #### 2. 消费者示例 ```java // 2. 创建并启动消费者 ConsumerConfig consumerConfig = ConsumerConfig.builder().consumerGroup(TEST_CONSUMER_GROUP).username("username").password("password").build(); Consumer consumer = new DefaultPushConsumer("localhost:9876", consumerConfig); consumer.start(); System.out.println("消费者已启动"); // 启动消费者 consumer.subscribe(TPS_TEST_TOPIC, (MessageListener) message -> { System.out.println("收到消息: " + new String(message.getBody())); return ConsumeResult.SUCCESS; }); // 保持程序运行 Thread.sleep(Long.MAX_VALUE); ``` ## 技术架构 ### 模块划分 - **common**: 公共工具类、协议定义和常量 - **network**: 基于Netty的网络通信层 - **storage**: 消息存储和索引管理 - **cluster**: 基于JRaft的集群管理 - **client**: 生产者和消费者API - **management**: 系统监控和管理功能 - **broker**: 整合各模块的主服务入口 ## 许可证 本项目采用AGPL-3.0许可证。详情请参见LICENSE文件。