Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] inconsistency between paimon table buckets and kafka topic partitions causes data write errors #4763

Open
2 tasks done
qyfftf opened this issue Dec 24, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@qyfftf
Copy link

qyfftf commented Dec 24, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

paimon-1.0-snapshot

Compute Engine

flink 1.18.0

Minimal reproduce step

1.create an paimon catalog and use kafka as the logsystem.

CREATE CATALOG paimon_catalog WITH (
    'type' = 'paimon',
    'log.system.auto-register' = 'true',
    'warehouse' = '/Users/paimon/path'
)

2.create an primary table and use kafka as the logsystem.

use paimon_catalog;

CREATE TABLE IF NOT EXISTS test_tb (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
)
with (
'bucket' ='10',
'kafka.topic' ='test_topic',
'log.consistency' ='eventual',
'log.system.partitions' ='1',
'log.system.replication' ='1',
'kafka.bootstrap.servers' ='127.0.0.1:9092',
'log.system' ='kafka'
));

3.create an datagen table

CREATE  temporary TABLE datagen_table (
    user_id BIGINT,
    item_id BIGINT,
    behavior STRING,
    dt STRING,
    hh STRING
) WITH (
    'connector' = 'datagen',
    'fields.user_id.min'='1',
    'fields.user_id.max'='10000000',
    'rows-per-second'='10'
);

4.use the datagen table's data insert into test_tb

insert into test_tb select * from datagen_table

What doesn't meet your expectations?

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Partition 5 of topic test_topic with partition count 1 is not present in metadata after 60000 ms.
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1428)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:859)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:240)
	at org.apache.paimon.flink.sink.RowDataStoreWriteOperator.processElement(RowDataStoreWriteOperator.java:136)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Partition 5 of topic test_topic with partition count 1 is not present in metadata after 60000 ms.

The bucket number of the paimon table is 10, but the partition number of the Kakfa topic is 1, which causes the data to be sent to a non-existent partition when it is sent to Kafka.

Anything else?

image In this code, the partitions of the data produced by Kafka are obtained based on the bucket. Does this mean that the number of partitions of Kafka must be consistent with the number of buckets in the table? In our production scenario, the number of partitions of Kafka is always the default

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@qyfftf qyfftf added the bug Something isn't working label Dec 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant