2

I am new with Kafka and I am trying to read a text file and create a list of strings that I want to send for the consumers. I am using Java 21 and Spring Boot 3.2.0 (SNAPSHOT).

This is the Kafka producer configuration:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfig(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());

        return properties;
    }

    @Bean
    public ProducerFactory<String, List<String>> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
}

This is the Topic configuration:

@Configuration
public class KafkaTopicConfig {
    
    @Bean
    public NewTopic alexTopic(){
        return TopicBuilder.name("securityTopic")
                .build();
    }
}

This is the "application.properties":

spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092

This is how I am receiving the file:

public Optional uploadFile(MultipartFile file){
    if(file.isEmpty()){
        return Optional.of(new EmptyFileException("File is empty please double check"));
    }
    
    return Optional.of(this.upload.uploadFile(file));
}

This is how I am sending the "List<String>" to kafka:

public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

@Override
public String uploadFile(MultipartFile file) {
    try(BufferedReader reader = new BufferedReader(
                                    new InputStreamReader(file.getInputStream()))){

        String line;
        int i=0;
        List<String> list = new ArrayList<>();
        while ((line=reader.readLine())!=null) {
            if(i==10){
                this.kafkaTemplate.send("securityTopic",list);

If I am sending pure Strings and I am using:

StringSerializer.class

instead of:

ListSerializer.class.getName()

in the Kafka producer configuration, and also change the the other generics to String instead of List<String> is working. But how can I send List<String> then? Again I am using Kafka for 2 or 3 days and I don't know if I have to create my own serializer or use something that is built in?

The error that gives me is as in the title: "Failed to construct kafka producer".

I have tried also to use "ListSerializer.class" also but is not working.

I also tried to use "JsonSerialize.class" and is still the same error.

Thank you for your time to read this!

1 Answer 1

0

It was the wrong import. the correct import is:

"import org.springframework.kafka.support.serializer.JsonSerializer;"

Sign up to request clarification or add additional context in comments.

2 Comments

You also need to look closer to the ListSerializer. It requires a CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS property. In your case that one is just plain StringSerializer. Same for ListDeserializer on the consumer side.
@ArtemBilan thank, I already did after I figure it out the what was the issue.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.