Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
433 views
in Technique[技术] by (71.8m points)

apache kafka - KafkaMessageListenerContainer.stop() is not stopping consumption of messages in message listener

UseCase: Given topic with 100 messages in kafka topic, I want to read messaged from offset 10 to offset 20. I could able to fetch from beginning offset. when i reach end offset, I have written code to stop the container.Even after execution of code, Consumer can consume further messages(from offset 21).It only stops after reading all messages in the topic

@Service
public class Consumer1  implements MessageListener<String, GenericRecord> {

 @Override
  public void onMessage(ConsumerRecord<String, GenericRecord> data) {
    log.info("feed record {}", data);
    if (data.offset() == 20) {
      feedService.stopConsumer();
    }
  }
}

@Service
public class FeedService{

    // start logic here

   public void stopConsumer() {
    kafkaMessageListenerContainer.stop();
  }

}

Note: I am using spring-kafka latest version(2.6.4). One observation is container stop method is being executed but consumer is not getting closed.And no errors on output


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The stop() doesn't terminate the current records batch cycle:

while (isRunning()) {
            try {
                pollAndInvoke();
            }
            catch (@SuppressWarnings(UNUSED) WakeupException e) {
                // Ignore, we're stopping or applying immediate foreign acks
            }

That pollAndInvoke() calls a KafkaConsumer.poll(), gets some records collection and invokes your onMessage() on each record. At some point you decide to call the stop, but it doesn't mean that we are really in the end of that records list to exit immediately.

We really stop on the next cycle when that isRunning() returns false for us already.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...