By JohnD

2019-05-15 16:11:42 8 Comments

My application is listening to several topics. Some of them are compacted topics used to load in memory some data.

I wanted to load first thoses data, so I used a SmartLifecycle to manually start those container before the other containers.

It's working great, but for simplicity, I tried to use a containerGroup

@KafkaListener(id = "myId", containerGroup = "compacted", ...)

Then in the SmartLifecycle bean I used :

        Collection<MessageListenerContainer> compactedListenerContainers = applicationContext.getBean("compacted", Collection.class);

But once I do that, after the "start" method is finished, the other containers are never started.

If I replace this line by :

Collection<MessageListenerContainer> compactedListenerContainers = Arrays.asList(registry.getListenerContainer("myId"));

Its working.

Any idea why getting the bean for a containerGroup prevent all other listener to work ? Knowing that all other @KafkaListeners are just defined by :

@KafkaListener(topics = "myTopic")


After further investigations, the problem is related to the KafkaListenerEndpointRegistry.

If the SmartLifeCycle bean is created with "KafkaListenerEndpointRegistry" as a dependency the application is working. Even if I'm not using the registry at all.

But if the SmartLifeCycle bean is created without this registry, the application fail.


@Gary Russell 2019-05-15 16:32:14

You need to show your container factory.

I presume you have autoStartup set to false since you are manually starting them.

So the others won't start either; since you want to start them after your compacted topics are loaded, simply call start() on the endpoint registry and it will start the others.

Or you can put the others in another containerGroup.

@JohnD 2019-05-15 17:48:22

I'll update tomorrow with the factories.But there are 2 different factories, one autoStartup and the other not. But for me the key here is that by swapping those 2 lines the behavior changes. Why ? I'm just getting the container differently, not interracting with them nor especially interracting with the blocked ones.

@Gary Russell 2019-05-15 17:51:36

I can't think of any reason why there would be a difference; but I am confused; I thought you didn't want the other containers to start until your compacted logs had been loaded. Perhaps a small sample app would help me see what you are doing.

@JohnD 2019-05-20 07:24:49

did you have time to found out something ?

@Gary Russell 2019-05-20 13:53:40

Sorry, I didn't have a chance to look at this last week. It's because the registry returns true from isRunning() if any of its registered containers is running. I didn't write that code, so I am not sure what the intended logic was there. In the next release, I'll change the registry to have its own lifecycle status; in the meantime, you can call registry.getListenerContainers().foreach(c -> c.start()).

@JohnD 2019-05-20 15:08:36

Oh ok, but why having the registry as parameter in my bean creation matters ? Since in my example adding/removing it changes the behavior. Your advice is to add registry.getListenerContainers().foreach(c -> c.start()) at the end of my start() method in my SmartLifeCycle bean ?

@Gary Russell 2019-05-20 15:14:37

Sorry, I don't know what you mean by but why having the registry as parameter in my bean creation matters ? . Yes, you can do it there.

@JohnD 2019-05-20 15:18:00

In my github repo the difference between the line 59 and 64 of KafkaConfiguration is the KafkaListenerEndpointRegistry presence in the parameters. Knowing that the line 59 version is working

@Gary Russell 2019-05-20 15:28:19

By adding that parameter, you have effectively made the registry a dependency of the CompactedTopicsInitializer. See the javadocs for DefaultLifecycleProcessor.doStart() - Start the specified bean as part of the given set of Lifecycle beans, making sure that any beans that it depends on are started first. So, even though the registry is in a later phase, it is actually started before your initializer (it will only start autoStartup containers though). Since it's called first, isRunning() returns false.

@JohnD 2019-05-20 15:37:36

Ok perfect, thanks for the explication. Making it depends on the registry was in fact breaking what I intented to do. The registry was started before my bean. So I have to inject the registry in my bean without having it depends on registry ? xD

Related Questions

Sponsored Content

17 Answered Questions

[SOLVED] Why is char[] preferred over String for passwords?

11 Answered Questions

21 Answered Questions

[SOLVED] What is reflection and why is it useful?

26 Answered Questions

3 Answered Questions

[SOLVED] Why is printing "B" dramatically slower than printing "#"?

9 Answered Questions

[SOLVED] Why is subtracting these two times (in 1927) giving a strange result?

  • 2011-07-27 08:15:58
  • Freewind
  • 653745 View
  • 6819 Score
  • 9 Answer
  • Tags:   java date timezone

14 Answered Questions

[SOLVED] Why does Java have transient fields?

  • 2009-05-26 12:11:36
  • Animesh
  • 689871 View
  • 1467 Score
  • 14 Answer
  • Tags:   java field transient

26 Answered Questions

[SOLVED] What is a serialVersionUID and why should I use it?

38 Answered Questions

[SOLVED] Why use getters and setters/accessors?

15 Answered Questions

[SOLVED] Why does this code using random strings print "hello world"?

  • 2013-03-03 04:38:06
  • 0x56794E
  • 197920 View
  • 1768 Score
  • 15 Answer
  • Tags:   java string random

Sponsored Content