CopyPastor

Detecting plagiarism made easy.

Score: 1.8066081404685974; Reported for: String similarity, Exact paragraph match Open both answers

Possible Plagiarism

Reposted on 2023-08-31
by Ashish Lahoti

Original Post

Original - Posted on 2023-08-31
by Ashish Lahoti



            
Present in both answers; Present only in the new answer; Present only in the old answer;

Just extending the answer from @i.bondarenko If you want to configure all the properties from property file then you can write your own custom Kafka configuration to support multiple producer config something like this:-
```yaml kafka: producer: bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 string-producer: retries: 0 acks: all key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer byte-producer: retries: 0 acks: all key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.BytesSerializer ```
Read the configuration from class file:-
```java @Configuration @ConfigurationProperties(prefix = "kafka") @Getter @Setter public class KafkaCustomProperties { private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092")); private String clientId; private Map<String, String> properties = new HashMap<>(); private Map<String, KafkaProperties.Producer> producer; private Map<String, KafkaProperties.Consumer> consumer; private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl(); private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() { Map<String, Object> properties = new HashMap<>(); if (this.bootstrapServers != null) { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); } if (this.clientId != null) { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); } properties.putAll(this.ssl.buildProperties()); properties.putAll(this.security.buildProperties()); if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); } return properties; } } ```
use this configuration to generate `KafkaTemplate` beans for each producer using `@Qualifier` annotation
```java @Configuration @RequiredArgsConstructor @Slf4j public class KafkaMultipleProducerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean @Qualifier("producer-string") public KafkaTemplate<String, Object> producerStringKafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer1")); }
@Bean @Qualifier("producer-byte") public KafkaTemplate<String, Object> producerByteKafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer2")); }
private ProducerFactory<String, Object> producerFactory(String producerName) { Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties()); if (nonNull(kafkaCustomProperties.getProducer())) { KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName); if (nonNull(producerProperties)) { properties.putAll(producerProperties.buildProperties()); } } log.info("Kafka Producer '{}' properties: {}", producerName, properties); return new DefaultKafkaProducerFactory<>(properties); } } ```
and use these `KafkaTemplate` beans to publish message to different producer config.
Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.
Spring boot doesn't provide out of the box support for multiple producer configuration. You can write your own custom kafka configuration to support multiple producer config something like this:-
```yaml kafka: producer: producer1: topic: topic1 bootstrap-servers: server1:9092,server1:9093,server1:9094 retries: 0 acks: all producer2: topic: topic2 bootstrap-servers: server2:9092,server2:9093,server2:9094 retries: 2 acks: 1 producer3: ... producer4: ... ```
Read the configuration from class file:-
```java @Configuration @ConfigurationProperties(prefix = "kafka") @Getter @Setter public class KafkaCustomProperties { private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092")); private String clientId; private Map<String, String> properties = new HashMap<>(); private Map<String, KafkaProperties.Producer> producer; private Map<String, KafkaProperties.Consumer> consumer; private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl(); private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() { Map<String, Object> properties = new HashMap<>(); if (this.bootstrapServers != null) { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); } if (this.clientId != null) { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); } properties.putAll(this.ssl.buildProperties()); properties.putAll(this.security.buildProperties()); if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); } return properties; } } ```
use this configuration to generate `KafkaTemplate` beans for each producer using `@Qualifier` annotation
```java @Configuration @RequiredArgsConstructor @Slf4j public class KafkaMultipleProducerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean @Qualifier("producer1") public KafkaTemplate<String, Object> producer1KafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer1")); }
@Bean @Qualifier("producer2") public KafkaTemplate<String, Object> producer2KafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer2")); }
private ProducerFactory<String, Object> producerFactory(String producerName) { Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties()); if (nonNull(kafkaCustomProperties.getProducer())) { KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName); if (nonNull(producerProperties)) { properties.putAll(producerProperties.buildProperties()); } } log.info("Kafka Producer '{}' properties: {}", producerName, properties); return new DefaultKafkaProducerFactory<>(properties); } } ```
and use these `KafkaTemplate` beans to publish message to different producer config.
Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.

        
Present in both answers; Present only in the new answer; Present only in the old answer;