# MessageSampler **Repository Path**: eatfashdaddy/message-sampler ## Basic Information - **Project Name**: MessageSampler - **Description**: MessageSampler是一个用于连接和测试多种消息队列(MQ)的Java工具,支持发送和接收消息的基本操作。同时也是一个JMeter插件,可以在JMeter测试计划中使用。 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 2 - **Forks**: 1 - **Created**: 2025-06-12 - **Last Updated**: 2026-03-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # MessageSampler - MQ消息采样工具 (v1.0.0) ## 项目概述 MessageSampler是一个用于连接和测试多种消息队列(MQ)的Java工具,支持发送和接收消息的基本操作。同时也是一个JMeter插件,可以在JMeter测试计划中使用。 **兼容性**:JMeter 5.6.2及以上版本 **支持的MQ版本**: - Apache Kafka 3.0.0 - IBM MQ 9.2.4.0 - ActiveMQ 5.16.5 - RabbitMQ 5.14.2 - RocketMQ 4.9.4 ## 支持的MQ类型 - ActiveMQ - RabbitMQ - RocketMQ - IBM MQ - Apache Kafka ## 项目架构 ### 核心类图 ```mermaid classDiagram class MQSampler { +testStarted() +sample() +testEnded() } class MQClientFactory { +createClient(type: String): MQClientInterface } class MQClientInterface { <> +connect(config: String, username: String, password: String, useSSL: boolean) +disconnect() +sendMessage(destination: String, message: String) +receiveMessage(destination: String, groupId: String): String } class MQClient { <> #config: String #username: String #password: String #useSSL: boolean +connect() +disconnect() +sendMessage() +receiveMessage() } class KafkaClient { +connect() +disconnect() +sendMessage() +receiveMessage() } class IBMMQClient { +connect() +disconnect() +sendMessage() +receiveMessage() } class RabbitMQClient { +connect() +disconnect() +sendMessage() +receiveMessage() } class RocketMQClient { +connect() +disconnect() +sendMessage() +receiveMessage() } class ActiveMQClient { +connect() +disconnect() +sendMessage() +receiveMessage() } MQSampler --> MQClientFactory MQClientFactory ..> MQClientInterface MQClientInterface <|-- MQClient MQClient <|-- KafkaClient MQClient <|-- IBMMQClient MQClient <|-- RabbitMQClient MQClient <|-- RocketMQClient MQClient <|-- ActiveMQClient ``` ### 设计模式 - **工厂模式**:通过MQClientFactory创建不同类型的MQ客户端 - **策略模式**:每种MQ客户端实现相同的接口,可以互相替换 - **模板方法模式**:MQClient基类提供通用实现,子类实现特定逻辑 ### 核心类说明 - **MQSampler**: JMeter采样器入口,处理测试逻辑 - **MQClientFactory**: 工厂类,根据配置创建对应的MQ客户端 - **MQClientInterface**: 定义MQ客户端通用接口 - **MQClient**: 抽象基类,提供公共实现 - **KafkaClient**: Kafka协议实现 - **IBMMQClient**: IBM MQ协议实现 - **RabbitMQClient**: RabbitMQ协议实现 - **RocketMQClient**: RocketMQ协议实现 - **ActiveMQClient**: ActiveMQ协议实现 ## 配置参数说明 ### 通用参数 - `server`: 服务器地址(格式因MQ类型而异) - `username`: 用户名(可选) - `password`: 密码(可选) - `useSSL`: 是否启用SSL(默认false) ### IBM MQ专用参数 服务器地址格式: `host:port:channel:queueManager` - `ccsid`: 字符编码标识符(可选),默认0表示使用MQ默认编码 - 常见值: 1208(UTF-8), 1381(GBK), 37(EBCDIC US-Canada) - 完整列表参考IBM MQ文档 ### Kafka专用参数 服务器地址格式: `host1:port1,host2:port2,...` - `bootstrap.servers`: 必填,Kafka集群地址(逗号分隔) - `topic`: 必填,生产/消费的主题名称 - `group.id`: 消费者组ID(消费者必填) - `acks`: 生产者确认机制(默认all) - `auto.offset.reset`: 消费者偏移量重置策略(默认latest) - `host`: 必填,MQ服务器主机名/IP - `port`: 可选,默认1414 - `channel`: 可选,默认"SYSTEM.DEF.SVRCONN" - `queueManager`: 可选,默认"QM1" ## SSL配置指南 ### Kafka SSL配置 ```java Properties props = new Properties(); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "/path/to/truststore.jks"); props.put("ssl.truststore.password", "truststore-password"); props.put("ssl.keystore.location", "/path/to/keystore.jks"); props.put("ssl.keystore.password", "keystore-password"); props.put("ssl.key.password", "key-password"); ``` ### 通用SSL配置 1. 设置`useSSL=true` 2. 配置以下系统属性: ```java System.setProperty("javax.net.ssl.trustStore", "path/to/truststore"); System.setProperty("javax.net.ssl.trustStorePassword", "password"); System.setProperty("javax.net.ssl.keyStore", "path/to/keystore"); System.setProperty("javax.net.ssl.keyStorePassword", "password"); ``` ### IBM MQ SSL配置 除通用配置外,还需设置: ```java MQEnvironment.sslCipherSuite = "TLS_RSA_WITH_AES_256_CBC_SHA256"; ``` ## 使用示例 ### Kafka使用示例 #### 生产者示例 ```java MQClient client = MQClientFactory.createClient("kafka"); client.connect("kafka1:9092,kafka2:9092", null, null, false); client.sendMessage("test-topic", "Hello Kafka"); ``` #### 消费者示例 ```java MQClient client = MQClientFactory.createClient("kafka"); client.connect("kafka1:9092,kafka2:9092", null, null, false); String msg = client.receiveMessage("test-topic", "consumer-group-1"); // 高级消费者配置示例 Properties props = new Properties(); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "false"); String msg = client.receiveMessage("test-topic", "consumer-group-1", props); ``` #### 配置参数说明 - `bootstrap.servers`: 必填,Kafka集群地址(逗号分隔) - `topic`: 必填,生产/消费的主题名称 - `group.id`: 消费者组ID(消费者必填) - `acks`: 生产者确认机制(默认all) - `auto.offset.reset`: 消费者偏移量重置策略(默认latest) ### IBM MQ使用示例 #### 连接配置 ```java MQClient client = MQClientFactory.createClient("ibmmq"); // 基本连接 client.connect("QMGR_NAME/CHANNEL_NAME@HOSTNAME(PORT)", "username", "password", false); // SSL连接示例 Properties sslProps = new Properties(); sslProps.put("com.ibm.mq.cfg.useIBMCipherMappings", "false"); sslProps.put("javax.net.ssl.trustStore", "/path/to/truststore.jks"); sslProps.put("javax.net.ssl.trustStorePassword", "password"); client.connect("QMGR_NAME/CHANNEL_NAME@HOSTNAME(PORT)", "username", "password", true, sslProps); ``` #### 发送消息 ```java client.sendMessage("QUEUE_NAME", "Hello IBM MQ"); // 发送带有消息属性的消息 Properties msgProps = new Properties(); msgProps.put("JMS_IBM_Character_Set", "1208"); msgProps.put("JMS_IBM_Encoding", "546"); client.sendMessage("QUEUE_NAME", "Hello IBM MQ", msgProps); ``` #### 接收消息 ```java String msg = client.receiveMessage("QUEUE_NAME", null); // 接收带有选择器的消息 Properties selectorProps = new Properties(); selectorProps.put("JMSCorrelationID", "CORRELATION_ID"); String msg = client.receiveMessage("QUEUE_NAME", null, selectorProps); ``` #### 配置参数说明 - `QMGR_NAME`: 队列管理器名称 - `CHANNEL_NAME`: 服务器连接通道名称 - `HOSTNAME`: MQ服务器主机名 - `PORT`: 连接端口(默认1414) - `username`: 连接用户名 - `password`: 连接密码 - `useSSL`: 是否使用SSL连接 ### ActiveMQ使用示例 #### 连接配置 ```java MQClient client = MQClientFactory.createClient("activemq"); // 基本连接 client.connect("tcp://localhost:61616", "admin", "admin", false); // SSL连接示例 Properties sslProps = new Properties(); sslProps.put("transport.trustStore", "/path/to/truststore.jks"); sslProps.put("transport.trustStorePassword", "password"); sslProps.put("transport.keyStore", "/path/to/keystore.jks"); sslProps.put("transport.keyStorePassword", "password"); client.connect("ssl://localhost:61617", "admin", "admin", true, sslProps); ``` #### 发送消息 ```java // 发送到队列 client.sendMessage("TEST.QUEUE", "Hello ActiveMQ"); // 发送到主题 client.sendMessage("topic://TEST.TOPIC", "Hello Topic"); // 发送带有消息属性的消息 Properties msgProps = new Properties(); msgProps.put("JMSXGroupID", "GROUP1"); msgProps.put("JMSPriority", "9"); client.sendMessage("TEST.QUEUE", "Priority Message", msgProps); ``` #### 接收消息 ```java // 从队列接收 String msg = client.receiveMessage("TEST.QUEUE", null); // 从主题接收(需要持久订阅) String msg = client.receiveMessage("topic://TEST.TOPIC", "DurableSubscriber1"); // 接收带有选择器的消息 Properties selectorProps = new Properties(); selectorProps.put("JMSPriority", ">5"); String msg = client.receiveMessage("TEST.QUEUE", null, selectorProps); ``` #### 配置参数说明 - `brokerURL`: 必填,ActiveMQ broker地址(格式: tcp://host:port) - `username`: 连接用户名(可选) - `password`: 连接密码(可选) - `useSSL`: 是否使用SSL连接 - `clientID`: 持久订阅的客户端ID(可选) ## JMeter插件使用指南 ### 安装步骤 1. 构建项目: ```bash mvn clean package ``` 2. 将生成的jar文件复制到JMeter的`lib/ext`目录 3. 重启JMeter ### 在JMeter中使用 1. 添加线程组 2. 在线程组中添加"MQ Sampler" 3. 配置MQ类型和连接参数 4. 运行测试计划 ![JMeter插件截图](images/sp250614_114738.png) ## 开发者指南 ### 添加新的MQ客户端 1. 创建新类实现`MQClientInterface`接口 2. 在`MQClientFactory`中添加对新类型的支持 3. 添加必要的依赖到pom.xml 4. 编写单元测试 ### 代码贡献 1. Fork项目 2. 创建特性分支 3. 提交Pull Request 4. 确保所有测试通过 5. 更新文档 ## 构建和运行 ### 构建项目 ```bash mvn clean package ``` 构建完成后,会在`target`目录下生成以下文件: - `jmeter-mq-sampler-1.0.0-jar-with-dependencies.jar` - 包含所有依赖的可执行jar文件 - `jmeter-mq-sampler-1.0.0.jar` - 不包含依赖的jar文件 ### 验证构建 ```bash # 检查jar文件是否包含所有必要依赖 unzip -l target/jmeter-mq-sampler-1.0.0-jar-with-dependencies.jar | grep META-INF/MANIFEST.MF ``` ### 运行测试 ```bash # 运行单元测试 mvn test # 运行集成测试(需要配置测试环境) mvn verify ``` ### 安装到JMeter 将`jmeter-mq-sampler-1.0.0-jar-with-dependencies.jar`复制到JMeter的`lib/ext`目录并重启JMeter。 ## 注意事项 1. SSL证书路径需要根据实际环境配置 2. 生产环境建议使用更强的密码套件 3. 连接参数需根据MQ服务器配置调整 4. CCSID参数需与MQ服务器端配置匹配,否则可能出现乱码问题