# springKafka-yml
**Repository Path**: bruce6213/spring-kafka-yml
## Basic Information
- **Project Name**: springKafka-yml
- **Description**: springBoot集成kafka,使用yml配置文件
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2024-01-11
- **Last Updated**: 2025-06-06
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 简易Demo
注:springBoot支持自动装配Apach Kafka
配置类中的配置会覆盖yml文件中的配置
## 引入依赖
```
org.springframework.kafka
spring-kafka
```
## yml文件配置
```
# 应用服务 WEB 访问端口
server:
port: 8080
spring:
kafka:
bootstrap-servers: localhost:9092 # 服务器地址
listener:
ack-mode: manual # 手动响应,必须配置否则仍会自动提交
type: batch # 批量消费
producer:
batch-size: 16384 # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算
retries: 10 # 消息重发次数,; 只有当所有重试都失败时,Spring Kafka才会调用onFailure回调
buffer-memory: 33554432 # 生产者内存缓冲区的大小
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1 # 用于确认消息是否发送成功;默认为1, 表示leader已经收到消息;0表示不管消息有没有被正常消费都直接返回给生产者;-1/all表示需要等一个leader和follower同步完成后再返回生产者
consumer:
group-id: "jxbdy-test" # 消费者组ID, 程序中具体有多少个消费者,还得看实际启动了多少个消费者实例(本程序中该消费组中只有一个消费者)
enable-auto-commit: false # 手动提交;默认是消费者自动提交的,但为了防止在消费过程中发生异常, 导致消息丢失,所以需要手动提交
auto-commit-interval: 2000 # 自动提交间隔(单位为毫秒)
auto-offset-reset: latest # 当消费者找不到自己的 offset(位移)时,应该从哪里开始消费(latest为新消息产生时、earliest为最开始,none表示抛出异常)
max-poll-records: 50 # 单次拉取消息的最大条数
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# fetch-max-wait: 5000 # 拉取数据最大等待时间(单位为毫秒)
# fetch-min-size: 1024 # 拉取数据最小字节数(单位为字节) 1024B=1KB
```
## 生产者
```
package com.jxsr.kafka.demos.web;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description : kafka生产者
* @Author : Bruce Lee
* @CreateTime : 2024/1/11
*/
@RestController
@Slf4j
public class Producter {
@Autowired
private KafkaTemplate kafkaTemplate;
@GetMapping("/send")
public void test(){
String data = "Apach Kafka";
kafkaTemplate.send("test", data).addCallback(new ListenableFutureCallback>() {
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息失败:{}", throwable.getMessage());
}
@Override
public void onSuccess(SendResult result) {
log.info("发送消息成功:{}-{}-{}", result.getRecordMetadata().topic(), result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
}
});
log.info("生产了一条新数据:{}", data);
}
}
```
注:生产者支持回调机制,用于监控消息是否发送成功
## 消费者
```
package com.jxsr.kafka.demos.web;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Description : kafka消费者
* @Author : Bruce Lee
* @CreateTime : 2024/1/11
*/
@Slf4j
@Component
public class Consumer {
/**
* 开启批量消费
* @param records
* @param ack
*/
@KafkaListener(topics = {"test"})
public void func(List> records, Acknowledgment ack){
for (ConsumerRecord record : records) {
String topic = record.topic();
log.info("订阅topic:{}, 数据内容:{}", topic, record.value());
// 确认消息已消费
ack.acknowledge();
}
}
/**
*
* 未开启批量消费
* @param record
* @param ack
*/
// @KafkaListener(topics = {"test"})
// public void func(ConsumerRecord record, Acknowledgment ack){
// String topic = record.topic();
// log.info("订阅topic:{}, 数据内容:{}", topic, record.value());
// // 确认消息已消费
// ack.acknowledge();
// }
}
```
# 参考文章
[CSDN](https://blog.csdn.net/qq_20865839/article/details/133948989)
[配置类版本](https://gitee.com/bruce6213/spring-boot-kafka)