You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I use below sql create source table:
create source table topic1(
uuid int,
data var
) with (
type = 'kafka',
kafka_topic = 'flow_message_test',
"auto.offset.reset" = latest,
kafka_broker = '10.16.98.10:9092',
kafka_group_id = 'test_json_parser',
value_type = 'json'
);
Data are as follows:
{
"uuid": 10455,
"data": ["{"applyid":"1590224987","base_info":"{\"city_id\":201}}"]
}
That sql grammar check pass. running throw exceptions blow:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:33)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:96)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:47)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
The text was updated successfully, but these errors were encountered:
I use below sql create source table:
create source table topic1(
uuid int,
data var
) with (
type = 'kafka',
kafka_topic = 'flow_message_test',
"auto.offset.reset" = latest,
kafka_broker = '10.16.98.10:9092',
kafka_group_id = 'test_json_parser',
value_type = 'json'
);
Data are as follows:
{
"uuid": 10455,
"data": ["{"applyid":"1590224987","base_info":"{\"city_id\":201}}"]
}
That sql grammar check pass. running throw exceptions blow:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:33)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:96)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:47)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
The text was updated successfully, but these errors were encountered: