# kafka-use-master **Repository Path**: dont-touch-my-code/kafka-use-master ## Basic Information - **Project Name**: kafka-use-master - **Description**: kafka使用示例 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-11-18 - **Last Updated**: 2025-11-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## kafka 使用示例 ### spring-kafka * 在spring项目中,可以使用spring-kafka快速整合kafka,spring-kafka提供了简便的配置方式以及高可用的api #### kafka配置 * 在spring-kafka中可以通过yaml快速便捷的配置kafka所需的配置信息 ~~~yaml # 这里只是列举了常用的配置 根据实际项目选择,不需要全部配置 spring: kafka: # kafka服务连接信息(集群各个节点之间通过逗号分隔) bootstrap-servers: 101.35.99.117:9095,101.35.99.117:9096 producer: # key 序列化实现 key-serializer: org.apache.kafka.common.serialization.StringSerializer # value 序列化实现 value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 重置消费偏移量策略(消费者在没有找到消费偏移量或是偏移量无效)earliest:从最早开始消费 latest:从最新消息开始消费 auto-offset-reset: earliest # 指定消费者组(代码中指定的消费者组优先级更高) group-id: string-consumer-group # key 反序列化实现 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value 反序列化实现 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 是否自动提交偏移量 enable-auto-commit: false # 自动提交偏移量间隔 auto-commit-interval: 5000 listener: # 消费者监听器的并发线程数 concurrency: 3 ~~~ #### spring-kafka使用 * spring-kafka在配置完成后会主动注入kafka使用相关的api工具(如果没有自定义注入) ##### kafkaTemplate ###### 生产数据 ~~~java public class OfficialServiceImpl implements IOfficialService { @Value("${kafka.topic:default-kafka-topic}") private String topic; @Override public void publish(String message) { // 随机分区 kafkaTemplate.send(new ProducerRecord<>(topic, message)) .completable() .whenCompleteAsync(this::publishComplete); // 主题存在多个分区时,相同key的数据默认在一个分区 kafkaTemplate.send(new ProducerRecord<>(topic, "partition-key", message)) .completable() .whenCompleteAsync(this::publishComplete); } @Override public void publish(String message, Integer partition) { String partitionKey = "partition-" + partition; // 生产数据时指定分区 kafkaTemplate.send(new ProducerRecord<>(topic, partition, partitionKey, message)) .completable() .whenCompleteAsync(this::publishComplete); } // 异步处理生产数据结果 private void publishComplete(SendResult result, Throwable error) { String topic = result.getRecordMetadata().topic(); Object message = result.getProducerRecord().value(); if (error != null) { log.error("publish message: {}, occur exception: {}", message, error.getMessage()); } log.info("publish message: {}, in topic: {}", message, topic); } } ~~~ ###### 消费数据 * spring-kafka自动注入的kafkaTemplate是不支持直接消费数据的,一般直接使用KafkaListener注解 ~~~java public class KafkaAutoConfiguration { @Bean @ConditionalOnMissingBean({KafkaTemplate.class}) public KafkaTemplate kafkaTemplate(ProducerFactory kafkaProducerFactory, ProducerListener kafkaProducerListener, ObjectProvider messageConverter) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); // 这里只提供了生产者producer的工厂类(这个类是用来根据配置信息创建KafkaProducer的) KafkaTemplate kafkaTemplate = new KafkaTemplate(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener); map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic); map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix); return kafkaTemplate; } } ~~~ * 如果一定要使用kafkaTemplate消费数据,需要在手动在容器中注入kafkaTemplate,并设置KafkaConsumerFactory ~~~java @Configuration public class OfficialConfiguration { private KafkaProperties kafkaProperties; public OfficialConfiguration(KafkaProperties kafkaProperties) { this.kafkaProperties = kafkaProperties; } /** * 参数中的对象 如果没有特殊处理,可以依赖spring注入(spring-kafka已经自动注入了) * @param kafkaProducerFactory * @param consumerFactory * @param kafkaProducerListener * @param messageConverter * @return */ @Bean public KafkaTemplate kafkaTemplate(ProducerFactory kafkaProducerFactory, ConsumerFactory consumerFactory, ProducerListener kafkaProducerListener, ObjectProvider messageConverter) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); KafkaTemplate kafkaTemplate = new KafkaTemplate(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener); map.from(this.kafkaProperties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic); map.from(this.kafkaProperties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix); // 手动设置consumer工厂类 kafkaTemplate.setConsumerFactory(consumerFactory); return kafkaTemplate; } } ~~~ 1. 使用kafkaTemplate消费数据 ~~~java import com.jr.official.service.IOfficialService; public class OfficialServiceImpl implements IOfficialService { public void subscribe(String topic, Integer partition) { // 消费指定分区,指定偏移量的数据 ConsumerRecord receive = kafkaTemplate.receive(topic, partition, 0, KafkaOperations.DEFAULT_POLL_TIMEOUT); if (receive != null) { System.out.println(receive.value()); } // 消费指定分区的数据 TopicPartitionOffset topicPartitionOffset = new TopicPartitionOffset(topic, partition); ConsumerRecords consumerRecords = kafkaTemplate.receive(Collections.singletonList(topicPartitionOffset), KafkaOperations.DEFAULT_POLL_TIMEOUT); if (!consumerRecords.isEmpty()) { consumerRecords.forEach(record -> { System.out.println(record.value()); }); } } } ~~~ 2. 直接使用ConsumerFactory ~~~java public class OfficialServiceImpl implements IOfficialService { @Autowired private DefaultKafkaConsumerFactory consumerFactory; public String subscribe(String topic) { try (Consumer consumer = consumerFactory.createConsumer()) { consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords subMusics = consumer.poll(KafkaOperations.DEFAULT_POLL_TIMEOUT); StringBuilder recordBuilder = new StringBuilder(); if (subMusics != null && !subMusics.isEmpty()) { subMusics.forEach(record -> { System.out.println(record.value()); recordBuilder.append(record.value()); }); } return recordBuilder.toString(); } } } ~~~ 3. 使用KafkaConsumer ~~~java public class OfficialServiceImpl implements IOfficialService { public String subscribe(String topic) { @Autowired private KafkaProperties kafkaProperties; try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(kafkaProperties.buildConsumerProperties())) { kafkaConsumer.subscribe(Collections.singletonList(topic)); ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(100)); } catch (Exception e) { throw new RuntimeException(e); } } } ~~~ * 需要一直监听将代码放入while循环中即可 ##### KafkaListener ~~~java public class OfficialServiceImpl implements IOfficialService { /** * groupId:指定消费者组,没有指定就应该时在yaml中指定的消费者组中 * topic:指定监听的主题(可以指定多个) * topicPartitions:指定监听的主题以及分区,可以指定多个TopicPartition,每个TopicPartition可以指定不同的主题以及分区 * topicPattern:主题表达式(监听满足要求的主题) * topic、topicPartitions、topicPattern指定一个就可以了 * * */ @KafkaListener(groupId = "${kafka.group:default-kafka-group}", topicPartitions = { @TopicPartition(topic = "${kafka.topic:default-kafka-topic}", partitions = {"1", "2"}) }) public void subscribeListener(String message) { System.out.println("listener message: " + message); } } ~~~ ### kafka 命令 #### topic ##### 创建topic ~~~bash ./kafka-topics.sh --bootstrap-server localhost:9095 --create --partitions 3 --replication-factor 3 --topic kafka-topic # partitions: 分区数量 # replication-factor: 副本数量(参考broker数量,不要超过broker数量) ~~~ ##### 删除topic ~~~bash ./kafka-topics.sh --bootstrap-server localhost:9095 --delete --topic kafka-topic # 删除主题时可以使用正则表达式 ./kafka-topics.sh --bootstrap-server localhost:9095 --delete --topic "kafka-.*" ~~~ ##### 扩容topic ~~~bash # 正常都是增加分区,减少分区可能会出现问题 ./kafka-topics.sh --bootstrap-server localhost:9095 --alter --topic kafka-topic --partitions 4 ~~~ ##### 查询topic ~~~bash # 查询所有的主题(exclude-internal:排除kafka内部的主题) ./kafka-topics.sh --bootstrap-server localhost:9095 --list --exclude-internal # 查询部分主题(使用正则表达式过滤掉部分) ./kafka-topics.sh --bootstrap-server localhost:9095 --list --exclude-internal --topic "kafka-.*" # 查询单个主题的详细信息 ./kafka-topics.sh --bootstrap-server localhost:9095 --describe --topic kafka-topic ~~~ #### 配置 ##### 查询topic配置 ~~~bash # 查询主题的动态配置 ./kafka-configs.sh --bootstrap-server localhost:9095 --describe --topic kafka-topic # 查询主题的所有配置(动态配置+静态配置) ./kafka-configs.sh --bootstrap-server localhost:9095 --describe --all --topic kafka-topic ~~~ ##### 查看kafka版本信息 ~~~bash ./kafka-configs.sh --bootstrap-server localhost:9095 --describe --version ~~~ ##### 修改动态配置 ~~~bash # entity-type: 指定修改配置的类型(topics/clients/users/brokers/broker- loggers)这里指定的是topics # entity-name: 指定配置名称 # 修改主题的动态配置 ./kafka-configs.sh --bootstrap-server localhost:9095 --alter --entity-type topics --entity-name kafka-topic --add-config file.delete.delay.ms=20000,retention.ms=90000 ~~~ ##### 删除动态配置 ~~~bash ./kafka-configs.sh --bootstrap-server localhost:9095 --alter --entity-type topics --entity-name kafka-topic --delete-config file.delete.delay.ms ~~~ #### 生产消息 ##### 生产无key消息 ~~~bash ./kafka-console-producer.sh --bootstrap-server localhost:9095 --topic kafka-topic --producer.config ../config/producer.properties ~~~ ##### 生产带key消息 ~~~bash ./kafka-console-producer.sh --bootstrap-server localhost:9095 --topic kafka-topic --producer.config ../config/producer.properties --property parse.key=true ~~~ #### 消费消息 ~~~bash # print.key=true: 消费消息时同时输出key # from-beginning: 从头开始消费 # 从头开始消费主题 ./kafka-console-consumer.sh --bootstrap-server localhost:9095 --property print.key=true --topic kafka-topic --from-beginning # partition: 指定消费的分区 # offset: 指定消费偏移量 # 指定分区消费 ./kafka-console-consumer.sh --bootstrap-server localhost:9095 --property print.key=true --topic kafka-topic --partition 0 --offset 100 # group不能和partition一起使用 # 给客户端命名(客户端命名之后,如果之前有过消费) ./kafka-console-consumer.sh --bootstrap-server localhost:9095 --property print.key=true --topic kafka-topic --group kafka-group ~~~ [参考链接](https://cloud.tencent.com/developer/article/1844234)