Just extending the answer from @i.bondarenko
If you want to configure all the properties from property file then you can write your own custom Kafka configuration to support multiple producer config something like this:-
```yaml
kafka:
producer:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
string-producer:
retries: 0
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
byte-producer:
retries: 0
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.BytesSerializer
```
Read the configuration from class file:-
```java
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
private Map<String, String> properties = new HashMap<>();
private Map<String, KafkaProperties.Producer> producer;
private Map<String, KafkaProperties.Consumer> consumer;
private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
}
```
use this configuration to generate `KafkaTemplate` beans for each producer using `@Qualifier` annotation
```java
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean
@Qualifier("producer-string")
public KafkaTemplate<String, Object> producerStringKafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer1"));
}
@Bean
@Qualifier("producer-byte")
public KafkaTemplate<String, Object> producerByteKafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer2"));
}
private ProducerFactory<String, Object> producerFactory(String producerName) {
Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
if (nonNull(kafkaCustomProperties.getProducer())) {
KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
if (nonNull(producerProperties)) {
properties.putAll(producerProperties.buildProperties());
}
}
log.info("Kafka Producer '{}' properties: {}", producerName, properties);
return new DefaultKafkaProducerFactory<>(properties);
}
}
```
and use these `KafkaTemplate` beans to publish message to different producer config.
Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.
Spring boot doesn't provide out of the box support for multiple producer configuration. You can write your own custom kafka configuration to support multiple producer config something like this:-
```yaml
kafka:
producer:
producer1:
topic: topic1
bootstrap-servers: server1:9092,server1:9093,server1:9094
retries: 0
acks: all
producer2:
topic: topic2
bootstrap-servers: server2:9092,server2:9093,server2:9094
retries: 2
acks: 1
producer3:
...
producer4:
...
```
Read the configuration from class file:-
```java
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
private Map<String, String> properties = new HashMap<>();
private Map<String, KafkaProperties.Producer> producer;
private Map<String, KafkaProperties.Consumer> consumer;
private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
}
```
use this configuration to generate `KafkaTemplate` beans for each producer using `@Qualifier` annotation
```java
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean
@Qualifier("producer1")
public KafkaTemplate<String, Object> producer1KafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer1"));
}
@Bean
@Qualifier("producer2")
public KafkaTemplate<String, Object> producer2KafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer2"));
}
private ProducerFactory<String, Object> producerFactory(String producerName) {
Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
if (nonNull(kafkaCustomProperties.getProducer())) {
KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
if (nonNull(producerProperties)) {
properties.putAll(producerProperties.buildProperties());
}
}
log.info("Kafka Producer '{}' properties: {}", producerName, properties);
return new DefaultKafkaProducerFactory<>(properties);
}
}
```
and use these `KafkaTemplate` beans to publish message to different producer config.
Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.