# springKafka-yml **Repository Path**: bruce6213/spring-kafka-yml ## Basic Information - **Project Name**: springKafka-yml - **Description**: springBoot集成kafka,使用yml配置文件 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2024-01-11 - **Last Updated**: 2025-06-06 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 简易Demo 注:springBoot支持自动装配Apach Kafka ​ 配置类中的配置会覆盖yml文件中的配置 ## 引入依赖 ``` org.springframework.kafka spring-kafka ``` ## yml文件配置 ``` # 应用服务 WEB 访问端口 server: port: 8080 spring: kafka: bootstrap-servers: localhost:9092 # 服务器地址 listener: ack-mode: manual # 手动响应,必须配置否则仍会自动提交 type: batch # 批量消费 producer: batch-size: 16384 # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算 retries: 10 # 消息重发次数,; 只有当所有重试都失败时,Spring Kafka才会调用onFailure回调 buffer-memory: 33554432 # 生产者内存缓冲区的大小 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: 1 # 用于确认消息是否发送成功;默认为1, 表示leader已经收到消息;0表示不管消息有没有被正常消费都直接返回给生产者;-1/all表示需要等一个leader和follower同步完成后再返回生产者 consumer: group-id: "jxbdy-test" # 消费者组ID, 程序中具体有多少个消费者,还得看实际启动了多少个消费者实例(本程序中该消费组中只有一个消费者) enable-auto-commit: false # 手动提交;默认是消费者自动提交的,但为了防止在消费过程中发生异常, 导致消息丢失,所以需要手动提交 auto-commit-interval: 2000 # 自动提交间隔(单位为毫秒) auto-offset-reset: latest # 当消费者找不到自己的 offset(位移)时,应该从哪里开始消费(latest为新消息产生时、earliest为最开始,none表示抛出异常) max-poll-records: 50 # 单次拉取消息的最大条数 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # fetch-max-wait: 5000 # 拉取数据最大等待时间(单位为毫秒) # fetch-min-size: 1024 # 拉取数据最小字节数(单位为字节) 1024B=1KB ``` ## 生产者 ``` package com.jxsr.kafka.demos.web; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description : kafka生产者 * @Author : Bruce Lee * @CreateTime : 2024/1/11 */ @RestController @Slf4j public class Producter { @Autowired private KafkaTemplate kafkaTemplate; @GetMapping("/send") public void test(){ String data = "Apach Kafka"; kafkaTemplate.send("test", data).addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable throwable) { log.error("发送消息失败:{}", throwable.getMessage()); } @Override public void onSuccess(SendResult result) { log.info("发送消息成功:{}-{}-{}", result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } }); log.info("生产了一条新数据:{}", data); } } ``` 注:生产者支持回调机制,用于监控消息是否发送成功 ## 消费者 ``` package com.jxsr.kafka.demos.web; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.util.List; /** * @Description : kafka消费者 * @Author : Bruce Lee * @CreateTime : 2024/1/11 */ @Slf4j @Component public class Consumer { /** * 开启批量消费 * @param records * @param ack */ @KafkaListener(topics = {"test"}) public void func(List> records, Acknowledgment ack){ for (ConsumerRecord record : records) { String topic = record.topic(); log.info("订阅topic:{}, 数据内容:{}", topic, record.value()); // 确认消息已消费 ack.acknowledge(); } } /** * * 未开启批量消费 * @param record * @param ack */ // @KafkaListener(topics = {"test"}) // public void func(ConsumerRecord record, Acknowledgment ack){ // String topic = record.topic(); // log.info("订阅topic:{}, 数据内容:{}", topic, record.value()); // // 确认消息已消费 // ack.acknowledge(); // } } ``` # 参考文章 [CSDN](https://blog.csdn.net/qq_20865839/article/details/133948989) [配置类版本](https://gitee.com/bruce6213/spring-boot-kafka)