springboot整合kafka的基本步骤如何通过自定义KafkaTemplate在推送消息的同时在控制台打印实际访问kafka的地址ip和端口等信息?

Spring Boot整合Kafka的基本步骤如下:

  1. 在pom.xml文件中添加Kafka和Spring Kafka的依赖:
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.5.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.5.0</version>
</dependency>
  1. 在application.yml文件中添加Kafka相关的配置:
spring:
  kafka:
    bootstrap-servers: localhost:9092
  1. 创建一个KafkaProducerConfig类,用于配置KafkaTemplate:
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  1. 在需要发送消息的地方注入KafkaTemplate,并使用send()方法发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String topic, String message) {
    kafkaTemplate.send(topic, message);
}
  1. 如果想在发送消息的同时在控制台打印实际访问Kafka的地址ip和端口等信息,可以自定义KafkaTemplate。创建一个MyKafkaTemplate类,继承KafkaTemplate,并重写doSend()方法:
public class MyKafkaTemplate<K, V> extends KafkaTemplate<K, V> {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyKafkaTemplate.class);

    public MyKafkaTemplate(ProducerFactory<K, V> producerFactory) {
        super(producerFactory);
    }

    @Override
    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
        LOGGER.info("Sending message to topic '{}' at {}", producerRecord.topic(), ((DefaultKafkaProducerFactory<?, ?>) getProducerFactory()).getConfigurationProperties().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        return super.doSend(producerRecord);
    }
}
  1. 修改KafkaProducerConfig类中的kafkaTemplate()方法,返回MyKafkaTemplate类的实例:
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new MyKafkaTemplate<>(producerFactory());
}

标签: 科技


原文地址: https://cveoy.top/t/topic/boiF 著作权归作者所有。请勿转载和采集!