1,928 questions
0
votes
1
answer
76
views
Right config to produce big batches into a kafka broker
I am developing a Quarkus app that produce and consume Kafka messages from a topic. I am trying to configure it to create big batches of messages but I am having trouble with it. This is my code:
...
0
votes
0
answers
49
views
How to update Kafka Admin Client SSL properties using SslBundle
I am trying to publish messages to a Kafka cluster that is SSL protected.
I have correct certificates on the application side. I checked I can consume from this kafka cluster using those. Also, the ...
1
vote
0
answers
85
views
Circuit breaker pattern for Kafka producer
I would like to implement a circuit breaker pattern for the Kafka producer, not the consumer.
There is documentation that can be found online regarding implementing the circuit breaker pattern for ...
0
votes
0
answers
45
views
Does a transactional Kafka producer provide exactly-once semantics upon restarts?
A Kafka broker compares the PID (producer ID) and a message sequence number (generated by the producer) to deduplicate messages for idempotent producers. Once a transactional producer retains the PID ...
1
vote
0
answers
181
views
UNKNOWN_TOPIC_OR_PARTITION handling in Spring Kafka Producer
I have Kafka Producer and Consumer configured in Spring Boot application (Spring Boot version 3.4.2).
application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-...
0
votes
0
answers
72
views
Kafka: Failed to allocate memory within the configured max blocking time 60000 ms
KAFKA: Error while sending message for exception: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms
Configuration I am using:
...
0
votes
0
answers
53
views
How to publish streaming websocket data to Kafka?
I am receiving tick data from a websocket connection. As soon as I receive the data, I need to publish it to Kafka. What I have tried so far is, as soon as I receive the data from the websocket, I ...
0
votes
1
answer
33
views
In Helidon SE 4.1.6 , how to send data to a specific partition using kafka producer
I want to use Helidon SE 4.1.6 and producer the data to a specific partition of Apache Kafka using producer.
Detail :
I have gone through the https://helidon.io/docs/latest/se/reactive-messaging#...
0
votes
0
answers
112
views
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Receuvung the errir when producing a message to a kafka topic from airflow task (python operator).
When producing the ...
1
vote
0
answers
59
views
Kafka schema registry go Protobuf - Broker: Broker failed to validate record
I'm verifying the Protobuf schema using the Kafka schema registry. The problem is that even though I put in the correct schema, I still get the error Broker: Broker failed to verify record.
The schema ...
0
votes
2
answers
1k
views
Invalid token error while setting up OAuth2 SASL authentication on Kafka
I have the following setup:
Kafka broker (3.9.0)
Kafka producer (for now, using the producer-console in kafka itself)
This setup works fine for basic TCP, TLS and even tried SASL authentication using ...
0
votes
0
answers
75
views
Spring Boot can`t connect to Kafka in Kubernates
Im new in kafka and kubernates. Im using bitnami chart to deploy kafka in my kind cluster kubernetes (on my own server). Im trying to connect to kafka from my spring boot application, that started on ...
0
votes
0
answers
37
views
Add unique value to a key in Kafka message using Kouncil
I have a json payload that I am using to produce a kafka message via Kouncil, json is as below:
{
"version": "1.0",
"id": "product-1730976584",
&...
0
votes
0
answers
44
views
Kafka Producer Key Shows Incorrect Value After Sending Message
I am working on a Spring Boot application where I am using Kafka to send messages to a topic. I am setting a specific key for each message, but after successfully sending the message, when I check the ...
-1
votes
1
answer
101
views
PySpark Serialization Issue with Kafka Producer in foreachPartition
i'm working with PySpark and trying to send a partitioned DataFrame to Kafka using the confluent-kafka library's Producer. However, I'm encountering a serialization problem with the Kafka producer on ...
0
votes
1
answer
48
views
How Can I Reduce Delay When Sending File Contents to Kafka Producer Using Script?
I want to write a bash script to constantly add new files to a folder and use cat to send the file contents to a Kafka producer with:
cat $FILEPATH |$KAFKA_HOME/bin/kafka-console-producer.sh --...
0
votes
0
answers
129
views
Kafka producer configured with SSL in Java "throws java.lang.OutOfMemoryError: Java heap space"
A Java application (Spring Boot based) has a Kafka producer configured for secured remote connection, see the defined properties below:
2024-07-01 15:04:24,238 [main] INFO c.i.t.a....
0
votes
1
answer
79
views
Kafka Streams - a deleted target topic
For testing purposes I deleted the target topic and was expecting the application to timeout after some time. However after some research I understood that Kafka Streams retries a message by default ...
1
vote
1
answer
90
views
Does the "acks" configuration apply to the 'sender' thread of the KafkaProducer client?
I have a couple of questions regarding the async nature of the KafkaProducer client which were unanswered by the official documentation.
Background:
I am working on a project where I need to log some ...
0
votes
1
answer
42
views
Azure Function Multioutput define parameter
I am using Azure Function v4 with .NET 8.0 as Isolated Process and want to use multiple output binding as described here.
It looks like this:
public class MultiOutputType
{
[KafkaOutput("...
-1
votes
1
answer
55
views
service nodejs connect to container kafka laradock result timeout
with this code on node js, I create simple for test data to produce message:
const kafka = require('kafka-node');
// Create a Kafka client with the Kafka broker's hostname (kafka) and port (9092)
...
2
votes
1
answer
192
views
Kafka idempotent producers
According to the documentation, an idempotent kafka producer maintains a sequence number, ensuring deduplication of events and also, ordering.
Does this mean that within a single producer session, an ...
0
votes
1
answer
1k
views
Produce json message with headers via kafka CLI
Try to produce simple message via kafka kafka-console-producer.sh
Message:
{"status": "success", "properties": []}
Headers:
HeaderKey1:HeaderValue1
HeaderKey2:...
0
votes
1
answer
733
views
AWS MSK Transactions Support
I'm trying to create a Kafka Producer inside a Lambda Function with Exactly-Once Delivery support enabled to push messages to MSK.
Edit: MSK IAM Auth is used for security protocol between Kafka and ...
0
votes
1
answer
4k
views
Topic not present in metadata after 60000ms
Use case scenario :
I am trying to make sure that in case the broker (one of bootstrap ones) which producer connects to in order to fetch the meta data is not available, then after blocking it for max....
0
votes
0
answers
47
views
Producer Issues -- Apache Kafka, Docker, Python
I've set up an instance of Apache Kafka via the Bitnami Kafka Image, localhost:9092, and created a topic ('testtopic'). I'm trying to write events to the topic from Python by creating a producer. ...
0
votes
1
answer
39
views
kafka - how to check if cluster is not able to receive all the data being sent by producer
we have a kafka cluster with 3 brokers on GKE(m/c type - e2-standard-8 i.e. 8 vCPU, 32GB memory).
Each Broker has space(SSD) of 500GB, and it is current 17% full i.e. space is not an issue.
We are ...
0
votes
0
answers
252
views
Kafka producer client is not able to connect to schema registry
I am trying to run a job on google Dataproc which pushes a record into a Kafka topic. It uses a schema registry for schema validation and serialization. Representative code below:
case class ...
1
vote
1
answer
307
views
Giving completion instructions ahead of calling send with MockProducer
I have the following code (Obviously made as an example):
class RandomClass(
val producer: Producer<String, String>
) {
fun randomFunction(): Boolean {
// Using .get() because I want to ...
1
vote
1
answer
868
views
How work maxAge in Spring Kafka Factory Producer?
We are using transactional producers and sometimes we find ourselves in the situation where there is no traffic for more than 7 days, which leads to the loss of transactional id metadata. The next ...
-1
votes
1
answer
137
views
where should I get missing properties for creating the Kafka producer?
Could you explain where should I get missing properties in the following code?
(smth should be in the lsat line)
KafkaProperties prop = new KafkaProperties();
KafkaProperties.Producer producer = prop....
1
vote
1
answer
181
views
How to get access to the confluent registry schema id used by kafka producer?
I have a kafka producer that uses confluent schema registry. I know that there is an algorithm based on which the KafkaAvroSerializer finds out the matching schemaId from the confluent schema registry ...
1
vote
0
answers
76
views
Does kafka producer reconnect fs2.Kafka
the microservice does not always switch to another kafka node, in case of loss of connection with the node to which it was connected and simply crashes.
If we create a producer and transfer several ...
1
vote
0
answers
193
views
Application running out of memory due to apache kafka metric objects while using kafkaProducer
We try to send 1000s of messages to a kafka-topic using kafkaProducer(org.apache.kafka.clients.producer.KafkaProducer) objects on our tomcat application. Lately we have been observing an issue with ...
0
votes
0
answers
602
views
Intermittently getting TimeoutException while pushing to kafka topic
Error:
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic topic_name not present in metadata after 500 ms
We are getting ...
2
votes
2
answers
601
views
Send Map from KafkaProducer to KafkaConsumer
I am using java 21 and Spring Boot 3.2.0 (SNAPSHOT). I want to use send a Map<Short,List<Customer>> from KafkaProducer to KafkaConsumer.
The KafkaConsumerConfig class:
@Configuration
...
0
votes
1
answer
428
views
How to gracefully stop Kafka consumer after processing a specific number of messages in Python?
I have an Airflow DAG with a BashOperator that runs a Kafka producer, generating a random count of messages. These messages are consumed by a Kafka consumer, which writes them to a JSON file. However, ...
2
votes
1
answer
358
views
Failed to construct kafka producer when sending list of strings
I am new with Kafka and I am trying to read a text file and create a list of strings that I want to send for the consumers. I am using Java 21 and Spring Boot 3.2.0 (SNAPSHOT).
This is the Kafka ...
1
vote
0
answers
161
views
reporoducing org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) error
We had an incident in prod where some messages were lost. On post incident investigation they wanted to see the error occur, and see if our fix would work.
Mission: get org.apache.kafka.common.errors....
0
votes
1
answer
753
views
MSK - kafka-console-producer error with Kafka client version 3.4.0
I have been working with MSK Serverless/Provisioned and kafka-clients 3.4.0.
Everything worked as it should when I use * for a topic name.
arn:aws:kafka:region:account-id:topic/cluster-name/cluster-...
0
votes
1
answer
298
views
Transactional KafkaProducer recover from timeout
I am testing a Kotlin KafkaProducer application using Transactions and the possible scenarios:
Happy path: (working fine)
begin transaction
producer.send
commit transaction
Assert consume.poll has a ...
1
vote
1
answer
682
views
Zombie fencing of a single producer with Spring Kafka
I have a Spring Kafka application, which has a single producer, which is NOT part of a pure kafka consume->process->produce chain, i.e. the producer is not triggered by a kafka consumer.
...
0
votes
1
answer
1k
views
Spring Cloud Stream 4.0.4 | Kafka Producer | Functional Style
I have come across below example of writing Kafka Producer with Spring Cloud Stream in functional style.
Dependency
<dependency>
<groupId>org.springframework.cloud</groupId>
&...
0
votes
1
answer
2k
views
Kafka Producer to distribute messages to all partitions equally
I have a producer that send messages to kafka topic. My topic has 10 partitions in same consumer group and I am looking for round robin distribution of messages. I have specified RoundRobinPartitioner ...
0
votes
0
answers
123
views
Kafka producer & consumer sync process
I want delete 10 items so I send 10 items to topic of kafka, my idea is that when producer send 10 items to kafka and it reponse ack, I return status 200 to client. But it has a problem if consumer ...
0
votes
1
answer
814
views
How to fix KafkaTimeoutError when connecting to Red Panda using kafka-python
I get the following error when trying to connect a kafka-python producer to my Red Panda database.
KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
I have a ...
0
votes
1
answer
744
views
kafka topic with 1 partition . 1 producer is already producing to that partition. What issues can I face if i add another producer
I have a topic with 1 partition.
One of the systems is acting as a producer and sending data to that partition.
I want to add another different system as producer and send data to that partition only.
...
0
votes
1
answer
369
views
Kafka producer - Cannot append records to read-only mirror topic
I running a producer and raising an exception with the following code.
@GetMapping("/send")
public ResultVo send(@RequestParam(value = "content") String content) {
...
1
vote
2
answers
707
views
All produced kafka messages are sending always to partition 0 even the key hash value is 1
I have a Kafka topic with 2 partitions. and I have created a message handler with 2 different keys.
@Bean
@ServiceActivator(inputChannel = "pushDataRequestChannel")
public MessageHandler ...
1
vote
1
answer
567
views
Does kafka include any schema registry version as part of avro messages?
I am working on a kafka project, implementing a producer, and one of the requirements is: "Each message generated shall include the respective schema id and version of the schema registry"
...