Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [datastudio] "add customjar" does not work #3240

Closed
2 of 3 tasks
jiangwwwei opened this issue Mar 5, 2024 · 2 comments
Closed
2 of 3 tasks

[Bug] [datastudio] "add customjar" does not work #3240

jiangwwwei opened this issue Mar 5, 2024 · 2 comments
Assignees
Labels
Bug Something isn't working

Comments

@jiangwwwei
Copy link

jiangwwwei commented Mar 5, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

My resource configuration is correct (oss -> s3 -> minio), and I can also execute JAR tasks to the Kubernetes cluster via Dinky's rs protocol without any issues.

However, when I use "add customjar", it shows ClassNotFoundException when executing the check (whether it's an SQL task or a JAR task).

The JAR task cannot be submitted for execution because it fails during the checking phase.
The SQL task fails the sql check, but it can still be submitted to the K8s cluster.And the container logs immediately report a ClassNotFoundException and the container enters a loop of restarting.

However, if I first launch the Flink session on K8S using Dinky and then submit these tasks to the session using Dinky, the tasks can run normally (although Dinky's local check still fails to pass).

Exception in sql task check

java.lang.IllegalArgumentException: get com.commontest.TestProto$TestPb descriptors error!
	at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:94)
	at org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema.<init>(PbRowDataSerializationSchema.java:49)
	at org.apache.flink.formats.protobuf.PbEncodingFormat.createRuntimeEncoder(PbEncodingFormat.java:47)
	at org.apache.flink.formats.protobuf.PbEncodingFormat.createRuntimeEncoder(PbEncodingFormat.java:31)
	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(KafkaDynamicSink.java:388)
	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getSinkRuntimeProvider(KafkaDynamicSink.java:194)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:149)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:176)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
...
Caused by: java.lang.ClassNotFoundException: com.commontest.TestProto$TestPb
	at org.springframework.boot.web.embedded.tomcat.TomcatEmbeddedWebappClassLoader.loadClass(TomcatEmbeddedWebappClassLoader.java:72)
	at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1220)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:398)
	at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:89)
	... 129 more

Exception in jar task check:

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'dw.realtime.ads.app.TestAddCustomJar' could not be loaded due to a linkage failure.
	at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:493)
	at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:153)
	at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
	at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
	at org.dinky.trans.dml.ExecuteJarOperation.getStreamGraph(ExecuteJarOperation.java:83)
	... 143 more
Caused by: java.lang.NoClassDefFoundError: com/commontest/app/BaseApp
	at java.base/java.lang.ClassLoader.defineClass1(Native Method)
	at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
	at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
	at java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:555)
	at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:458)
	at java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:452)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:451)
	at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:398)
	at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:479)
	... 147 more
Caused by: java.lang.ClassNotFoundException: com.commontest.app.BaseApp
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
	at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	... 162 more

Exception in sql task on k8s cluster:

2024-03-05T15:15:21.608543612+08:00 Caused by: java.lang.IllegalArgumentException: get com.commontest.TestProto$TestPb descriptors error!
2024-03-05T15:15:21.608545675+08:00 	at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:94) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608547816+08:00 	at org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema.<init>(PbRowDataSerializationSchema.java:49) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]
	at org.apache.flink.formats.protobuf.PbEncodingFormat.createRuntimeEncoder(PbEncodingFormat.java:47) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608563955+08:00 	at org.apache.flink.formats.protobuf.PbEncodingFormat.createRuntimeEncoder(PbEncodingFormat.java:31) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608574597+08:00 	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(KafkaDynamicSink.java:388) ~[flink-sql-connector-kafka-1.16.0.jar:1.16.0]
	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getSinkRuntimeProvider(KafkaDynamicSink.java:194) ~[flink-sql-connector-kafka-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608579752+08:00 	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:149) ~[flink-table-planner_2.12-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608582285+08:00 	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:176) ~[flink-table-planner_2.12-1.16.0.jar:1.16.0]
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) ~[flink-table-planner_2.12-1.16.0.jar:1.16.0]
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) ~[flink-table-planner_2.12-1.16.0.jar:1.16.0]
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-scala_2.12-1.16.0.jar:1.16.0]

...
2024-03-05T15:15:21.608643710+08:00 Caused by: java.lang.ClassNotFoundException: com.commontest.TestProto$TestPb
	at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
2024-03-05T15:15:21.608646726+08:00 	at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
2024-03-05T15:15:21.608648144+08:00 	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67) ~[flink-dist-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608649659+08:00 	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) ~[flink-dist-1.16.0.jar:1.16.0]
	at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
2024-03-05T15:15:21.608656151+08:00 	at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192) ~[flink-dist-1.16.0.jar:1.16.0]
	at java.lang.Class.forName0(Native Method) ~[?:?]
2024-03-05T15:15:21.608659206+08:00 	at java.lang.Class.forName(Unknown Source) ~[?:?]
2024-03-05T15:15:21.608660640+08:00 	at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:89) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608662264+08:00 	at org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema.<init>(PbRowDataSerializationSchema.java:49) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608663653+08:00 	at org.apache.flink.formats.protobuf.PbEncodingFormat.createRuntimeEncoder(PbEncodingFormat.java:47) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608665027+08:00 	at org.apache.flink.formats.protobuf.PbEncodingFormat.createRuntimeEncoder(PbEncodingFormat.java:31) ~[flink-sql-protobuf-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608666617+08:00 	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createSerialization(KafkaDynamicSink.java:388) ~[flink-sql-connector-kafka-1.16.0.jar:1.16.0]
	at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getSinkRuntimeProvider(KafkaDynamicSink.java:194) ~[flink-sql-connector-kafka-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608670118+08:00 	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:149) ~[flink-table-planner_2.12-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608671522+08:00 	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:176) ~[flink-table-planner_2.12-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608672995+08:00 	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158) ~[flink-table-planner_2.12-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608674456+08:00 	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) ~[flink-table-planner_2.12-1.16.0.jar:1.16.0]
2024-03-05T15:15:21.608675873+08:00 	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-scala_2.12-1.16.0.jar:1.16.0]
	at scala.collection.Iterator.foreach(Iterator.scala:937) ~[flink-scala_2.12-1.16.0.jar:1.16.0]

What you expected to happen

"add customjar" can successfully add JAR files, allowing the check to pass and the tasks to be submitted and run successfully.

How to reproduce

dinky version : 1.0 rc4
"realtime-common-1.0-SNAPSHOT.jar" contains some common utility classes and proto classes.
Storage system: MinIO (S3 protocol)
Resource configuration: OSS (Dinky can upload/download files normally)
Cluster: Kubernetes (K8s)

Jar task:

add CUSTOMJAR 'rs:/common/realtime-common-1.0-SNAPSHOT.jar';

EXECUTE JAR WITH (
'uri'='rs:/ads/original-testAddCustomJar-SNAPSHOT.jar',
'main-class'='dw.realtime.ads.app.TestAddCustomJar',
'args'='--env prod --offset.init earliest --parallelism 1' 
);

Sql task:

add CUSTOMJAR 'rs:/common/realtime-common-1.0-SNAPSHOT.jar';

CREATE TABLE table_a(
...
)WITH (
 'connector' = 'kafka',
...
);

CREATE TABLE table_b
(
...
) WITH (
  'connector' = 'kafka',
...
  'value.format'='protobuf',
  'value.protobuf.message-class-name' = com.commontest.TestProto$TestPb',
  'value.protobuf.ignore-parse-errors' = 'true'
);

insert into table_b 
select ... 
from table_a;

Anything else

No response

Version

1.0.0

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jiangwwwei jiangwwwei added Bug Something isn't working Waiting for reply Waiting for reply labels Mar 5, 2024
Copy link

github-actions bot commented Mar 5, 2024

Hello @jiangwwwei, this issue is about K8S, so I assign it to @gaoyan1998 and @zackyoungh. If you have any questions, you can comment and reply.

你好 @jiangwwwei, 这个 issue 是关于 K8S 的,所以我把它分配给了 @gaoyan1998@zackyoungh。如有任何问题,可以评论回复。

@Zzm0809 Zzm0809 removed their assignment Mar 5, 2024
@DataLinkDC DataLinkDC deleted a comment from github-actions bot Mar 5, 2024
@aiwenmo aiwenmo removed the Waiting for reply Waiting for reply label Mar 5, 2024
@gaoyan1998
Copy link
Contributor

gaoyan1998 commented Mar 13, 2024

add customjar not support k8s, at 1.0.1 will support add jar 'rs:/xxx'

@Zzm0809 Zzm0809 closed this as completed Mar 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants