Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved connections to the source reader from the split reader #29

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

tilakraj94
Copy link

Connections are currently established in the fetch method of the split reader. This method operates in a separate thread, and the connections are terminated when the split reader is closed.

However, acknowledgments (acks) are processed with a slight delay. By the time an acknowledgment is triggered, the split reader might already have been closed. Since message acknowledgments (Message.Ack) rely on the same connection that was used to receive the message, closing the connection prematurely causes the acknowledgment to fail.

To resolve this, connections need to remain active beyond the lifecycle of the split reader. They should only be closed when the source reader itself is closed to ensure all acknowledgments are successfully processed.

This behavior can be observed by running the testJsSourceBounded test case.

Test Case Observations:
The output log below demonstrates the issue:

> Task :compileJava
> Task :processResources UP-TO-DATE
> Task :classes
> Task :compileTestJava
> Task :processTestResources UP-TO-DATE
> Task :testClasses
> Task :test
13:42:07:243 [INFO] NatsSubjectSplitReader - Register split [Subject: sub-zPryy31KwQ] consumer for current reader.
13:42:12:273 [INFO] NatsSourceFetcherManager - Closing Fetcher
13:42:12:277 [INFO] NatsSubjectSplitReader -  Closing SplitReader
13:42:15:479 [ERROR] NatsJetStreamSourceReader - Failed to acknowledge cursors for checkpoint 1
java.lang.IllegalStateException: Connection is Closed
	at io.nats.client.impl.NatsConnection.publishInternal(NatsConnection.java:979)
	at io.nats.client.impl.NatsConnection.publish(NatsConnection.java:935)
	at io.nats.client.impl.NatsJetStreamMessage.ackReply(NatsJetStreamMessage.java:118)
	at io.nats.client.impl.NatsJetStreamMessage.ack(NatsJetStreamMessage.java:38)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at io.synadia.flink.v0.source.reader.NatsSubjectSplitReader.notifyCheckpointComplete(NatsSubjectSplitReader.java:142)
	at io.synadia.flink.v0.source.reader.NatsSourceFetcherManager.triggerAcknowledge(NatsSourceFetcherManager.java:115)
	at io.synadia.flink.v0.source.reader.NatsSourceFetcherManager.acknowledgeMessages(NatsSourceFetcherManager.java:104)
	at io.synadia.flink.v0.source.reader.NatsJetStreamSourceReader.notifyCheckpointComplete(NatsJetStreamSourceReader.java:96)
	at org.apache.flink.streaming.api.operators.SourceOperator.notifyCheckpointComplete(SourceOperator.java:551)
	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:104)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointComplete(RegularOperatorChain.java:145)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:467)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:400)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1430)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$16(StreamTask.java:1371)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$19(StreamTask.java:1410)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.base/java.lang.Thread.run(Thread.java:829)
13:42:15:527 [INFO] NatsJetStreamSourceReader - Closing Source Reader
13:42:15:527 [INFO] NatsJetStreamSourceReader - Closing Source Reader
13:42:15:527 [INFO] NatsJetStreamSourceReader - Closing Source Reader
13:42:15:520 [INFO] NatsJetStreamSourceReader - Closing Source Reader
13:42:15:523 [INFO] NatsJetStreamSourceReader - Closing Source Reader
13:42:15:523 [INFO] NatsJetStreamSourceReader - Closing Source Reader
13:42:15:547 [INFO] NatsJetStreamSourceReader - Closing Source Reader

Here, the test fails because the connection was closed as soon as the split reader shut down.

This PR moves the connections back to the source reader and get closed only when source reader closes.

Copy link
Collaborator

@scottf scottf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not completed this review. I need code to reproduce the original problem. Please add a code snipped to our slack chat.

// close all connections
for (ConnectionContext ctx : connections.values()) {
ctx.connection.close();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I previously commented that I don't think it's appropriate for this to manage connections passed in to it. Is there another way to do this? Can the factory still be used here? I mean if this object is being closed before an ack, maybe there is a lifecycle problem.

Copy link
Author

@tilakraj94 tilakraj94 Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we need to maintain a map since a split reader can handle multiple splits, with each split connection represented as a key-value pair. Additionally, there should be one map per source reader instance. These maps will remain active even if the fetcher thread pulling messages stops and new threads are created.

Perhaps we could maintain these maps within the connection factory itself? However, these maps are currently tied to the source reader’s context and do not exist outside it.

If moving them to the connection factory is something you’re considering, I can make the necessary changes to accommodate that. Let me know your thoughts!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you closing all the connections when one reader closes? Each reader should have it's own connection anyway.

@tilakraj94
Copy link
Author

I have not completed this review. I need code to reproduce the original problem. Please add a code snipped to our slack chat.

I will share a code snippet with you.

@tilakraj94
Copy link
Author

tilakraj94 commented Jan 12, 2025

You can reproduce the issue by re-running the SourceToSinkJsExample file in the examples section. To do so, add implementation 'org.slf4j:slf4j-simple:2.0.9' to your Gradle build to enable logging, run the example, and observe the exception being thrown.

Why does this happen?

Flink initializes multiple source reader instances, each of which is responsible for managing its own fetcher manager. The fetcher manager oversees fetcher threads, and each fetcher thread contains a split reader.
Each split reader handles reading one or more splits (in our current implementation, it processes one split per subject).

Here’s what happens:

  1. When the first fetcher thread starts, it fetches a bounded set of data and marks the splits as finished. In our case only one split is there.
  2. Flink’s internal mechanism monitors the fetcher threads to determine if they are idle. If a fetcher thread is found to be idle, it is shut down. In our case, this method returns true, causing the fetcher thread to shut down. During the shutdown process, the split reader associated with the fetcher thread is also closed.
  3. Now, when notifyCheckpointComplete(long checkpointId) is triggered by flink, it tries to acknowledge messages read in the first fetcher thread's split reader. However, since the fetcher thread and its associated resources (including the split reader) have already been closed, the acknowledgment fails.

Please let me know if you need any additional information or input from my side.

@tilakraj94 tilakraj94 force-pushed the connections-callback branch from c86ecea to 64172ab Compare January 13, 2025 21:37
Signed-off-by: Tilak Raj <tilak.raj94@gmail.com>
ConnectionFactory connectionFactory,
NatsJetStreamSourceConfiguration sourceConfiguration) {
private final Map<String, ConnectionContext> connections;
private JetStreamSubscription subscription;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this variable back to jetStreamSubscription

elementsQueue,
fetcherManager,
sourceConfiguration, connectionFactory, payloadDeserializer,
readerContext);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this whole section did not need to be refactored. It's clear what's going on and everything is inline and readable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I do if I do make a change like this, is I do a review myself and comment so other reviewers can see why the change was made.

NatsSubjectSplitReader splitReader =
(NatsSubjectSplitReader) splitFetcher.getSplitReader();
splitReader.notifyCheckpointComplete(partition, messages);
private void triggerAcknowledge(SplitFetcher<Message, NatsSubjectSplit> splitFetcher, String splitId, List<Message> messages) throws Exception { //TODO Change to nats specific exception
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any idea about this todo?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants