From d5fb169d14f7e5438713280d4dc0042a7e76233a Mon Sep 17 00:00:00 2001 From: Zhenghua Gao Date: Tue, 21 May 2024 18:09:17 +0800 Subject: [PATCH] [SEDONA-559] Make the flink example project work --- .gitignore | 4 +- examples/flink-sql/pom.xml | 36 +++----- .../flink-sql/src/main/java/FlinkExample.java | 19 +++-- .../src/main/resources/log4j2.properties | 25 ++++++ examples/pom.xml | 83 +++++++++++++++++++ examples/spark-sql/pom.xml | 11 ++- flink/pom.xml | 1 - pom.xml | 2 + 8 files changed, 145 insertions(+), 36 deletions(-) create mode 100644 examples/flink-sql/src/main/resources/log4j2.properties create mode 100644 examples/pom.xml diff --git a/.gitignore b/.gitignore index 955529bd74..a98ae6991c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ -/target/ +target /.idea -/*.iml +*.iml /*.ipr /*.iws /.settings/ diff --git a/examples/flink-sql/pom.xml b/examples/flink-sql/pom.xml index 5ac89ed8be..1ed4b59c31 100644 --- a/examples/flink-sql/pom.xml +++ b/examples/flink-sql/pom.xml @@ -20,19 +20,21 @@ 4.0.0 + + + org.apache.sedona + sedona-examples + 1.6.0 + + org.apache.sedona sedona-flink-example - 1.0.0 - + Sedona : Examples : Flink jar UTF-8 - 1.5.1-28.2 compile - 2.12 - 1.5.1 - 1.14.3 compile @@ -40,12 +42,7 @@ org.apache.sedona sedona-flink_${scala.compat.version} - ${sedona.version} - - - org.datasyslab - geotools-wrapper - ${geotools.version} + ${project.version} org.apache.flink @@ -56,28 +53,21 @@ org.apache.flink - flink-streaming-java_${scala.compat.version} - ${flink.version} - ${flink.scope} - - - - org.apache.flink - flink-connector-kafka_${scala.compat.version} + flink-streaming-java ${flink.version} ${flink.scope} org.apache.flink - flink-clients_${scala.compat.version} + flink-clients ${flink.version} ${flink.scope} org.apache.flink - flink-table-api-java-bridge_${scala.compat.version} + flink-table-api-java-bridge ${flink.version} ${flink.scope} @@ -103,7 +93,7 @@ org.apache.flink - flink-runtime-web_${scala.compat.version} + flink-runtime-web ${flink.version} test diff --git a/examples/flink-sql/src/main/java/FlinkExample.java b/examples/flink-sql/src/main/java/FlinkExample.java index d39c34b782..c59eb8125e 100644 --- a/examples/flink-sql/src/main/java/FlinkExample.java +++ b/examples/flink-sql/src/main/java/FlinkExample.java @@ -21,6 +21,7 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + import org.apache.sedona.flink.SedonaFlinkRegistrator; import org.apache.sedona.flink.expressions.Constructors; @@ -43,17 +44,18 @@ public static void main(String[] args) { // Create a fake WKT string table source Table pointWktTable = Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames); + // Create a geometry column - Table pointTable = pointWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(), - $(pointColNames[0])).as(pointColNames[0]), - $(pointColNames[1])); + Table pointTable = pointWktTable.select( + call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]), + $(pointColNames[1])); + // Create S2CellID pointTable = pointTable.select($(pointColNames[0]), $(pointColNames[1]), call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array")); // Explode s2id array tableEnv.createTemporaryView("pointTable", pointTable); pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)"); - pointTable.execute().print(); // Create a fake WKT string table source @@ -68,13 +70,16 @@ public static void main(String[] args) { // Explode s2id array tableEnv.createTemporaryView("polygonTable", polygonTable); polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)"); - polygonTable.execute().print(); + + // TODO: TableImpl.print() occurs EOF Exception due to https://issues.apache.org/jira/browse/FLINK-35406 + // Use polygonTable.execute().print() when FLINK-35406 is fixed. + polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row)); // Join two tables by their S2 ids Table joinResult = pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon"))); // Optional: remove false positives - joinResult = joinResult.where("ST_Contains(geom_polygon, geom_point)"); - joinResult.execute().print(); + joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point"))); + joinResult.execute().collect().forEachRemaining(row -> System.out.println(row)); } } diff --git a/examples/flink-sql/src/main/resources/log4j2.properties b/examples/flink-sql/src/main/resources/log4j2.properties new file mode 100644 index 0000000000..9206863eda --- /dev/null +++ b/examples/flink-sql/src/main/resources/log4j2.properties @@ -0,0 +1,25 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/examples/pom.xml b/examples/pom.xml new file mode 100644 index 0000000000..16afc09aee --- /dev/null +++ b/examples/pom.xml @@ -0,0 +1,83 @@ + + + 4.0.0 + + org.apache.sedona + sedona-parent + 1.6.0 + + + sedona-examples + Sedona : Examples : + pom + + + flink-sql + spark-sql + + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + compile + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + compile + + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + compile + + + + org.slf4j + slf4j-api + ${slf4j.version} + compile + + + + + org.apache.logging.log4j + log4j-1.2-api + ${log4j.version} + compile + + + + + org.geotools + gt-referencing + ${geotools.version} + compile + + + diff --git a/examples/spark-sql/pom.xml b/examples/spark-sql/pom.xml index 7826d0c83a..de02c52a5e 100644 --- a/examples/spark-sql/pom.xml +++ b/examples/spark-sql/pom.xml @@ -20,11 +20,16 @@ 4.0.0 + + + org.apache.sedona + sedona-examples + 1.6.0 + + org.apache.sedona sedona-spark-example - 1.0.0 - - ${project.groupId}:${project.artifactId} + Sedona : Examples : Spark Maven Example for SedonaDB jar diff --git a/flink/pom.xml b/flink/pom.xml index 55b778be20..b823d5783c 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -34,7 +34,6 @@ ${skip.deploy.common.modules} - 1.19.0 provided diff --git a/pom.xml b/pom.xml index 5d62b55b46..24181d34ac 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ 3.0 2.17.2 + 1.19.0 1.7.36 2.0.0 4.1.1 @@ -731,6 +732,7 @@ flink flink-shaded snowflake + examples