-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once exception #7857
base: dev
Are you sure you want to change the base?
Conversation
# | ||
env { | ||
execution.parallelism = 1 | ||
job.mode = "BATCH" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fcb-xiaobo hi, can we add an e2e for Streaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,i'll add it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am running the example module locally, SeaTunnelApiExample, using the following configuration
env {
execution.parallelism = 1
job.mode = "STREAMING"
}
source {
Kafka {
bootstrap.servers = "xxx:8092"
topic = "test_topic_source"
# The default format is json, which is optional
format = json
start_mode = earliest
}
}
transform {}
sink{
kafka {
format = JSON
topic = "test01"
bootstrap.servers = "xxx:8092"
semantics = EXACTLY_ONCE
}
}
I use the Kafka producer console to send messages, and then open the consumer console to consume messages
Problem 1: Messages are only consumed during startup each time
Problem 2: The seatunnel log reports an error, as shown in the figure below, but the Flink UI checkpoints are all successful
Problem 3: During the startup process, after sending a new message, consumers cannot consume it anymore
I have also debugged the source code locally and did not find any specific issues. I am not sure if this is a bug or if my usage is incorrect. I would like to ask for your advice. If it is convenient for you, you can communicate directly on WeChat, which will be more convenient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
link #7871
@@ -107,6 +111,10 @@ public void abortTransaction(long checkpointId) { | |||
|
|||
@Override | |||
public List<KafkaSinkState> snapshotState(long checkpointId) { | |||
if (recordNumInTransaction == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it's always equal to 0
kafkaProducer.close(); | ||
// kafkaProducer will abort the transaction if you call close() without a duration arg | ||
// which will cause an exception when Committer commit the transaction later. | ||
kafkaProducer.close(Duration.ZERO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we move him to KafkaInternalProducer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fcb-xiaobo Is it missing here
9582870
to
e63cee9
Compare
e63cee9
to
722ab57
Compare
hi @fcb-xiaobo Is there any progress on this task? |
The code has been corrected and is up to date. Isn't it waiting for review? Or is there anything else that needs to be done? |
722ab57
to
05ea9a5
Compare
container.executeJob("/kafka/fake_to_kafka_exactly_once.conf"); | ||
String topicName = "kafka_topic_exactly_once"; | ||
Map<String, String> data = getKafkaConsumerData(topicName); | ||
Assertions.assertEquals(10, data.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to compare the data content
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,I will update the test cases later
531a166
to
75f5412
Compare
Purpose of this pull request
close #7755
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.