springboot整合kafka的基本步骤如何通过自定义KafkaTemplate在推送消息的同时在控制台打印实际访问kafka的地址ip和端口等信息?
Spring Boot整合Kafka的基本步骤如下:
- 在pom.xml文件中添加Kafka和Spring Kafka的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
- 在application.yml文件中添加Kafka相关的配置:
spring:
kafka:
bootstrap-servers: localhost:9092
- 创建一个KafkaProducerConfig类,用于配置KafkaTemplate:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- 在需要发送消息的地方注入KafkaTemplate,并使用send()方法发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
- 如果想在发送消息的同时在控制台打印实际访问Kafka的地址ip和端口等信息,可以自定义KafkaTemplate。创建一个MyKafkaTemplate类,继承KafkaTemplate,并重写doSend()方法:
public class MyKafkaTemplate<K, V> extends KafkaTemplate<K, V> {
private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaTemplate.class);
public MyKafkaTemplate(ProducerFactory<K, V> producerFactory) {
super(producerFactory);
}
@Override
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
LOGGER.info("Sending message to topic '{}' at {}", producerRecord.topic(), ((DefaultKafkaProducerFactory<?, ?>) getProducerFactory()).getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
return super.doSend(producerRecord);
}
}
- 修改KafkaProducerConfig类中的kafkaTemplate()方法,返回MyKafkaTemplate类的实例:
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new MyKafkaTemplate<>(producerFactory());
}
原文地址: https://cveoy.top/t/topic/boiF 著作权归作者所有。请勿转载和采集!