Comments (11)
Hi @LokeshAlamuri !
Is the problem still present after we made the fix in the PR #3347?
Thanks
Can we treat this a duplication and close respectively?
from spring-kafka.
Issue from the API side is not fixed. I shall make the initial changes and provide them for your review.
from spring-kafka.
Looking to your concern again, I would say that behavior is correct.
It was always like that it is like that everywhere.
The stopped state mean that this component does not accept new requests anymore.
I our case the MessageListenerContainer
does not poll Kafka broker anymore.
All those messages currently in handling by the MessageListener
is out of scope of this state change.
You might better to look to something what is called "graceful shutdown": https://docs.spring.io/spring-boot/reference/web/graceful-shutdown.html#page-title.
There is ContainerProperties.setListenerTaskExecutor()
for your consideration.
The default one (created on the fly) is SimpleAsyncTaskExecutor
, which does interrupt threads on its close, but does not wait for their completion by default:
/**
* Specify a timeout (in milliseconds) for task termination when closing
* this executor. The default is 0, not waiting for task termination at all.
* <p>Note that a concrete >0 timeout specified here will lead to the
* wrapping of every submitted task into a task-tracking runnable which
* involves considerable overhead in case of a high number of tasks.
* However, for a modest level of submissions with longer-running
* tasks, this is feasible in order to arrive at a graceful shutdown.
* <p>Note that {@code SimpleAsyncTaskExecutor} does not participate in
* a coordinated lifecycle stop but rather just awaits task termination
* on {@link #close()}.
* @param timeout the timeout in milliseconds
* @since 6.1
* @see #close()
* @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setAwaitTerminationMillis
*/
public void setTaskTerminationTimeout(long timeout) {
You may consider to inject any other AsyncTaskExecutor
impl which can do a graceful shutdown for you.
See more info in the ThreadPoolTaskExecutor
Javadocs.
The point is that stopping a component does not mean that application does not work any more.
Therefore I'm leaning to close this as Works as Designed
.
And as I said: it is like that everywhere in Spring where we implement org.springframework.context.Lifecycle
contract.
from spring-kafka.
I agree with you. There are multiple ways to shutdown the application in a graceful manner. Even, stop()
call on org.springframework.kafka.listener.ConcurrentMessageListenerContainer
does make a graceful shutdown of spring-kafka message listener component. But, the API definition of isChildRunning()
would give picture as if all the containers are stopped processing. These APIs could be made bit more perfect, by leveraging the childStopped()
call. Please let me know, if this issue could be closed. I shall close it.
from spring-kafka.
Let us keep aside the use case. One query.
If isChildRunning
does return 'true' immediately after stop process is initiated, how can the framework user knows when the actual processing completes through API call(As of now we can get to know from ConcurrentContainerStoppedEvent).
I think this has to be corrected to return false if actually any message processing is happening. This reports the actual state of the spring-kafka component.
I am having an idea on how to fix this issue. I will make the complete changes and provide them for your review. Could you please evaluate this issue one more time.
from spring-kafka.
OK.
I think you indeed are talking about a graceful shutdown.
First of all I was going to reject your request, but now I see that in Spring AMQP we do have such a logic:
/**
* The time to wait for workers in milliseconds after the container is stopped. If any
* workers are active when the shutdown signal comes they will be allowed to finish
* processing as long as they can finish within this timeout. Defaults
* to 5 seconds.
* @param shutdownTimeout the shutdown timeout to set
*/
public void setShutdownTimeout(long shutdownTimeout) {
Which is used like:
Runnable awaitShutdown = () -> {
logger.info("Waiting for workers to finish.");
try {
boolean finished = this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
if (finished) {
logger.info("Successfully waited for workers to finish.");
}
The special ActiveObjectCounter
abstraction is implemented there in Spring AMQP, but I believe it can be achieved some other way.
I guess a default shutdownTimeout
should be like 0
to avoid any breaking changes.
Since your request is a new behavior for the whole container abstraction.
Let me know if that makes sense to you and you are OK going forward for the fix!
from spring-kafka.
Issue is regarding isChildRunning
API returning true even though actual message processing is happening. I have requested to fix this issue, since framework user is unable to find the exact state of the spring-kafka component. Ideally, any one would expect the API to return true only after the processing is completed. This can be easily identified through childStopped
API.
Points what you are mentioning is bit different related to stop
API in org.springframework.context.Lifecycle
.
from spring-kafka.
Since your request is a new behavior for the whole container abstraction.
My request does not modify the existing behavior. But, provides the right state of the spring-kafka subsystem.
from spring-kafka.
I guess a default
shutdownTimeout
should be like0
to avoid any breaking changes.
Since your request is a new behavior for the whole container abstraction.
We are already doing this in AbstractMessageListenerContainer
. But the default is 10000 milliseconds. Let me know, if you want me to change this to 0.
public final void stop(boolean wait) { this.lifecycleLock.lock(); try { if (isRunning()) { if (wait) { final CountDownLatch latch = new CountDownLatch(1); doStop(latch::countDown); try { latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR publishContainerStoppedEvent(); } catch (@SuppressWarnings("unused") InterruptedException e) { Thread.currentThread().interrupt(); } } else { doStop(this::publishContainerStoppedEvent); } } } finally { this.lifecycleLock.unlock(); } }
from spring-kafka.
OK! Thank you for looking into that!
Apparently you are fully on board with the code.
So, we have everything what could give us a graceful shutdown.
Your only concern that isChildRunning()
give us a false
whenever it is not yet.
I will be glad to see the fix from you.
It won't make it into the release for today though.
Thank you!
from spring-kafka.
Related Issues (20)
- Add tracing headers to be mapped to string HOT 1
- On kotlin application, Spring kafka 3.2.0 doesn't take the message conversion logic, because of the wrong coroutine detection on MessagingMessageListenerAdapter HOT 1
- Possible inconsistency in DLT topic naming convention
- Prevent Embedded Kafka logs auto delete HOT 3
- read-committed breaks async-acks HOT 2
- Batch Listener when encountered a poison pill, records after that poison pill are moved to next poll. HOT 6
- getUnregisteredListenerContainer return null HOT 4
- getUnregisteredListenerContainer return null
- getUnregisteredListenerContainer return null HOT 1
- Mismatch Between Registered and Found Callbacks in Multi-group Listener Cases HOT 7
- Request to consider providing default ApplicationListener to stop the SpringBoot Kafka Consumer application on container failures HOT 9
- No replacement for KafkaSendCallback was found. HOT 1
- Cannot read field "internalTopologyBuilder" because "topology" is null HOT 4
- Application receiving ContainerStoppedEvent even though active MessageListenerContainer instances are processing messages. HOT 16
- Support customer-defined MessagConverter in @KafkaListener annotated method. HOT 1
- The `KafkaListenerEndpointRegistry` refactoring for `List.isEmpty()` instead of `List.size() > 0`
- Fence the MessageLisnterContainer restart once the ConcurrentMessageListenerContainer is stopped HOT 3
- Revise `AbstractConsumerSeekAware.onPartitionsAssigned()` for concurreny HOT 17
- ConcurrentKafkaListenerContainerFactory to handle multiple RecordInterceptors HOT 6
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spring-kafka.