Quarkus reactive messaging with kafka

I have two microservices, a producer and a consumer. The producer writes an increment of a number every two seconds to a kafka topic. The consumer has two instances running, consuming these increments. I have noticed a few odd things that I want to solve:

  • When there are no consumers yet, and the producer starts producing the messages are being saved in kafka. When a consumer comes online, it does not handle the already existing messages that the producer already produced, rather it starts consuming the messages that are coming in now. How can a consumer also process all the not-consumed messages?
  • When there are two consumers, I expect both consumers to consume equally. Now only one of the consumers gets all the load, and the other is just sitting there. How can I spread the load over the number of consumers?
  • It looks like kafka saves all the records being produced, even when it already is consumed by a consumer. Is there any way to prevent this? I can’t find good information about for example acknowledgements.

Anyone knows an answer on one of these three questsions?

Consumer config:

mp.messaging.incoming.stocks.connector=smallrye-kafka mp.messaging.incoming.stocks.topic=stocks mp.messaging.incoming.stocks.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.incoming.stocks.group.id=test1 mp.messaging.incoming.stocks.auto.offset.reset=earliest 

Producer config:

mp.messaging.outgoing.stock-quote.connector=smallrye-kafka mp.messaging.outgoing.stock-quote.topic=stocks mp.messaging.outgoing.stock-  quote.value.serializer=org.apache.kafka.common.serialization.StringSerializer 
Add Comment
1 Answer(s)

Kafka is saving all the records by default as it is classified as an event-streaming engine (Real-time Messaging + storage). The default config for message retention in topic is 7 days (168 hours) and can be changed using topic config retention.ms=10000 (10 seconds or whatever value you want).

For concurrent consumption you need to make sure that your topic is partitioned as topic partitioning is the parallelism unit in Kafka. Also as I remember that Quarkus/Smallrye dev-team was in process of implementing Kafka rebalance listener. If this one is in it place with latest Quarkus, then the remaining part is to partition your topic.

For consuming messages before assigning partitions to consumer we need Dev. team input here, I can’t advise to this one.

Add Comment

Your Answer

By posting your answer, you agree to the privacy policy and terms of service.