Skip to content

Commit

Permalink
[SEDONA-559] Make the flink example project work
Browse files Browse the repository at this point in the history
  • Loading branch information
docete committed May 27, 2024
1 parent 3f0cb0a commit d5fb169
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 36 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/target/
target
/.idea
/*.iml
*.iml
/*.ipr
/*.iws
/.settings/
Expand Down
36 changes: 13 additions & 23 deletions examples/flink-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,29 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-examples</artifactId>
<version>1.6.0</version>
</parent>

<groupId>org.apache.sedona</groupId>
<artifactId>sedona-flink-example</artifactId>
<version>1.0.0</version>

<name>Sedona : Examples : Flink</name>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<geotools.version>1.5.1-28.2</geotools.version>
<geotools.scope>compile</geotools.scope>
<scala.compat.version>2.12</scala.compat.version>
<sedona.version>1.5.1</sedona.version>
<flink.version>1.14.3</flink.version>
<flink.scope>compile</flink.scope>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-flink_${scala.compat.version}</artifactId>
<version>${sedona.version}</version>
</dependency>
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geotools-wrapper</artifactId>
<version>${geotools.version}</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand All @@ -56,28 +53,21 @@
<!-- For Flink DataStream API-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
<!-- Flink Kafka connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.compat.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
<!-- For playing flink in IDE-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.compat.version}</artifactId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
<!-- For Flink flink api, planner, udf/udt, csv-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.compat.version}</artifactId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
</dependency>
Expand All @@ -103,7 +93,7 @@
<!-- For Flink Web Ui in test-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.compat.version}</artifactId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down
19 changes: 12 additions & 7 deletions examples/flink-sql/src/main/java/FlinkExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.sedona.flink.SedonaFlinkRegistrator;
import org.apache.sedona.flink.expressions.Constructors;

Expand All @@ -43,17 +44,18 @@ public static void main(String[] args) {

// Create a fake WKT string table source
Table pointWktTable = Utils.createTextTable(env, tableEnv, Utils.createPointWKT(testDataSize), pointColNames);

// Create a geometry column
Table pointTable = pointWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
$(pointColNames[0])).as(pointColNames[0]),
$(pointColNames[1]));
Table pointTable = pointWktTable.select(
call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]),
$(pointColNames[1]));

// Create S2CellID
pointTable = pointTable.select($(pointColNames[0]), $(pointColNames[1]),
call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
// Explode s2id array
tableEnv.createTemporaryView("pointTable", pointTable);
pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point, s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)");
pointTable.execute().print();


// Create a fake WKT string table source
Expand All @@ -68,13 +70,16 @@ public static void main(String[] args) {
// Explode s2id array
tableEnv.createTemporaryView("polygonTable", polygonTable);
polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)");
polygonTable.execute().print();

// TODO: TableImpl.print() occurs EOF Exception due to https://issues.apache.org/jira/browse/FLINK-35406
// Use polygonTable.execute().print() when FLINK-35406 is fixed.
polygonTable.execute().collect().forEachRemaining(row -> System.out.println(row));

// Join two tables by their S2 ids
Table joinResult = pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
// Optional: remove false positives
joinResult = joinResult.where("ST_Contains(geom_polygon, geom_point)");
joinResult.execute().print();
joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"), $("geom_point")));
joinResult.execute().collect().forEachRemaining(row -> System.out.println(row));
}

}
25 changes: 25 additions & 0 deletions examples/flink-sql/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender

appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
83 changes: 83 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-parent</artifactId>
<version>1.6.0</version>
</parent>

<artifactId>sedona-examples</artifactId>
<name>Sedona : Examples : </name>
<packaging>pom</packaging>

<modules>
<module>flink-sql</module>
<module>spark-sql</module>
</modules>

<dependencies>
<!-- Add a logging Framework, to make the examples produce -->
<!-- logs when executing in the IDE -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>compile</scope>
</dependency>

<!-- Let log4j code log to log4j2 logfile -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>

<!-- Add geotools dependencies, to make the examples use geotools in the IDE -->
<dependency>
<groupId>org.geotools</groupId>
<artifactId>gt-referencing</artifactId>
<version>${geotools.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
11 changes: 8 additions & 3 deletions examples/spark-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-examples</artifactId>
<version>1.6.0</version>
</parent>

<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-example</artifactId>
<version>1.0.0</version>

<name>${project.groupId}:${project.artifactId}</name>
<name>Sedona : Examples : Spark</name>
<description>Maven Example for SedonaDB</description>
<packaging>jar</packaging>

Expand Down
1 change: 0 additions & 1 deletion flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

<properties>
<maven.deploy.skip>${skip.deploy.common.modules}</maven.deploy.skip>
<flink.version>1.19.0</flink.version>
<flink.scope>provided</flink.scope>
</properties>

Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<spark.compat.version>3.0</spark.compat.version>
<log4j.version>2.17.2</log4j.version>

<flink.version>1.19.0</flink.version>
<slf4j.version>1.7.36</slf4j.version>
<googles2.version>2.0.0</googles2.version>
<uberh3.version>4.1.1</uberh3.version>
Expand Down Expand Up @@ -731,6 +732,7 @@
<module>flink</module>
<module>flink-shaded</module>
<module>snowflake</module>
<module>examples</module>
</modules>
</profile>
<profile>
Expand Down

0 comments on commit d5fb169

Please sign in to comment.