Skip to content

Commit

Permalink
Merge pull request #297 from j3-signalroom/github_issue-7
Browse files Browse the repository at this point in the history
GitHub issue 7
  • Loading branch information
j3-signalroom authored Oct 8, 2024
2 parents 84d69a2 + 7675be4 commit b25d228
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 54 deletions.
9 changes: 8 additions & 1 deletion .blog/run-terraform-locally-script-explanation.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ The script helps you manage the lifecycle of Terraform-managed infrastructure re
The script should be run with the following syntax:

```bash
scripts/run-terraform-locally.sh <create | delete> --profile=<SSO_PROFILE_NAME> --confluent_api_key=<CONFLUENT_API_KEY> --confluent_api_secret=<CONFLUENT_API_SECRET> --snowflake_warehouse=<SNOWFLAKE_WAREHOUSE> --service_account_user=<SERVICE_ACCOUNT_USER> --day_count=<DAY_COUNT> --auto_offset_reset=<earliest | latest> --number_of_api_keys_to_retain=<NUMBER_OF_API_KEYS_TO_RETAIN>
scripts/run-terraform-locally.sh <create | delete> --profile=<SSO_PROFILE_NAME>
--confluent_api_key=<CONFLUENT_API_KEY>
--confluent_api_secret=<CONFLUENT_API_SECRET>
--snowflake_warehouse=<SNOWFLAKE_WAREHOUSE>
--service_account_user=<SERVICE_ACCOUNT_USER>
--day_count=<DAY_COUNT>
--auto_offset_reset=<earliest | latest>
--number_of_api_keys_to_retain=<NUMBER_OF_API_KEYS_TO_RETAIN>
```

- **create**: Deploy infrastructure using Terraform.
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ All notable changes to this project will be documented in this file.

The format is base on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.30.10.000] - 2024-10-08
### Added
- Sink the data in DataGeneratorApp datastreams to Apache Iceberg tables.

### Changed
- No longer makes a distinction between languages when creating the Docker containers.

## [0.30.02.000] - 2024-10-08
### Added
- Markdowns that explain in detail the `run-terraform-locally.sh` and `run-flink-locally.sh`, respectively.
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ Install the [Terraform CLI](https://developer.hashicorp.com/terraform/tutorials

#### 2.1.1 Run locally
```bash
scripts/run-terraform-locally.sh <create | delete> --profile=<SSO_PROFILE_NAME> \
--confluent_api_key=<CONFLUENT_API_KEY> \
--confluent_api_secret=<CONFLUENT_API_SECRET> \
--snowflake_warehouse=<SNOWFLAKE_WAREHOUSE> \
--service_account_user=<SERVICE_ACCOUNT_USER> \
--day_count=<DAY_COUNT> \
--auto_offset_reset=<earliest | latest> \
scripts/run-terraform-locally.sh <create | delete> --profile=<SSO_PROFILE_NAME>
--confluent_api_key=<CONFLUENT_API_KEY>
--confluent_api_secret=<CONFLUENT_API_SECRET>
--snowflake_warehouse=<SNOWFLAKE_WAREHOUSE>
--service_account_user=<SERVICE_ACCOUNT_USER>
--day_count=<DAY_COUNT>
--auto_offset_reset=<earliest | latest>
--number_of_api_keys_to_retain=<NUMBER_OF_API_KEYS_TO_RETAIN>
```
> Argument placeholder|Replace with
Expand Down
2 changes: 1 addition & 1 deletion java/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Java-based Flink Apps
Discover how Apache Flink® can transform your data pipelines! Explore hands-on examples of Flink applications using the [DataStream API](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/datastream_api/) and [Table API](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/overview/) in Java. You'll see how these technologies integrate seamlessly with AWS, GitHub, and Terraform.
Discover how Apache Flink® can transform your data pipelines! Explore hands-on examples of Flink applications using the [DataStream API](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/datastream_api/) and [Table API](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/overview/) in Java. You'll see how these technologies integrate seamlessly with AWS, GitHub, Terraform, and Apache Iceberg.

Curious about the differences between the DataStream API and Table API? Click [here](../.blog/datastream-vs-table-api.md) to learn more and find the best fit for your next project.

Expand Down
21 changes: 21 additions & 0 deletions java/app/src/main/java/kickstarter/Common.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.*;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.Catalog;


public class Common {
Expand Down Expand Up @@ -47,4 +49,23 @@ public static String getAppOptions(final String[] args) {
public static ObjectMapper getMapper() {
return new ObjectMapper().registerModule(new JavaTimeModule());
}

/**
* This method checks if a catalog exists in the TableEnvironment.
*
* @param tblEnv the TableEnvironment instance.
* @param catalogName the name of the catalog to check.
* @return true if the catalog exists, false otherwise.
*/
public static boolean isCatalogExist(final TableEnvironment tblEnv, final String catalogName) {
// Check if the catalog exists
Catalog catalog = null;
try {
catalog = tblEnv.getCatalog(catalogName).orElse(null);
} catch (Exception e) {
System.err.println("Error while checking catalog existence: " + e.getMessage());
}

return (catalog != null) ? true : false;
}
}
132 changes: 130 additions & 2 deletions java/app/src/main/java/kickstarter/DataGeneratorApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.*;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.iceberg.flink.FlinkCatalog;
import java.util.*;
import java.util.stream.StreamSupport;

import kickstarter.model.*;

import java.util.*;


/**
* This class creates fake flight data for fictional airlines <b>Sunset Air</b> and
Expand Down Expand Up @@ -157,11 +161,135 @@ public static void main(String[] args) throws Exception {
*/
sunsetStream.sinkTo(sunsetSink).name("sunset_sink");

// --- Create a TableEnvironment
EnvironmentSettings settings =
EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env, settings);

/*
* Define the CREATE CATALOG Flink SQL statement to register the Iceberg catalog
* using the HadoopCatalog to store metadata in AWS S3 (i.e., s3a://), a Hadoop-
* compatible filesystem. Then execute the Flink SQL statement to register the
* Iceberg catalog
*/
String catalogName = "apache_kickstarter";
String bucketName = Common.getAppOptions(args).replace("_", "-"); // --- To follow S3 bucket naming convention, replace underscores with hyphens if exist
try {
if(!Common.isCatalogExist(tblEnv, catalogName)) {
tblEnv.executeSql(
"CREATE CATALOG " + catalogName + " WITH ("
+ "'type' = 'iceberg',"
+ "'catalog-type' = 'hadoop',"
+ "'warehouse' = 's3a://" + bucketName + "/warehouse',"
+ "'property-version' = '1',"
+ "'io-impl' = 'org.apache.iceberg.hadoop.HadoopFileIO'"
+ ");"
);
} else {
System.out.println("The " + catalogName + " catalog already exists.");
}
} catch(final Exception e) {
System.out.println("A critical error occurred to during the processing of the catalog because " + e.getMessage());
System.exit(1);
}

// --- Use the Iceberg catalog
tblEnv.useCatalog(catalogName);

// --- Print the current catalog name
System.out.println("Current catalog: " + tblEnv.getCurrentCatalog());

// --- Check if the database exists. If not, create it
String databaseName = "airlines";

// Check if the namespace exists, if not, create it
try {
org.apache.flink.table.catalog.Catalog catalog = tblEnv.getCatalog("apache_kickstarter").orElseThrow(() -> new RuntimeException("Catalog not found"));
if (catalog instanceof FlinkCatalog) {
FlinkCatalog flinkCatalog = (FlinkCatalog) catalog;
if (!flinkCatalog.databaseExists(databaseName)) {
flinkCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap<>(), ""), false);
}
}
tblEnv.executeSql("USE " + databaseName);
} catch (Exception e) {
System.out.println("A critical error occurred during the processing of the database because " + e.getMessage());
System.exit(1);
}

// --- Print the current database name
System.out.println("Current database: " + tblEnv.getCurrentDatabase());

// --- Check if the table(s) exists. If not, create them
String tableNames[] = {"skyone_airline", "sunset_airline"};
Schema schema = Schema.newBuilder()
.column("email_address", DataTypes.STRING())
.column("departure_time", DataTypes.STRING())
.column("departure_airport_code", DataTypes.STRING())
.column("arrival_time", DataTypes.STRING())
.column("arrival_airport_code", DataTypes.STRING())
.column("flight_number", DataTypes.STRING())
.column("confirmation_code", DataTypes.STRING())
.column("ticket_price", DataTypes.DECIMAL(10, 2))
.column("aircraft", DataTypes.STRING())
.column("booking_agency_email", DataTypes.STRING())
.build();

// --- Convert DataStream to Table
Table tables[] = {tblEnv.fromDataStream(skyOneStream, schema), tblEnv.fromDataStream(sunsetStream, schema)};

// ---
int index = -1;
for (String tableName : tableNames) {
index += 1;
try {
TableResult result = tblEnv.executeSql("SHOW TABLES IN " + databaseName);
@SuppressWarnings("null")
boolean tableExists = StreamSupport.stream(
Spliterators.spliteratorUnknownSize(result.collect(), Spliterator.ORDERED), false)
.anyMatch(row -> row.getField(0).equals(tableName));
if(!tableExists) {
// --- Define the table using Flink SQL
tblEnv.executeSql(
"CREATE TABLE " + databaseName + "." + tableName + " ("
+ "email_address STRING, "
+ "departure_time STRING, "
+ "departure_airport_code STRING, "
+ "arrival_time STRING, "
+ "arrival_airport_code STRING, "
+ "flight_number STRING, "
+ "confirmation_code STRING, "
+ "ticket_price DECIMAL, "
+ "aircraft STRING, "
+ "booking_agency_email STRING) "
+ "WITH ("
+ "'write.format.default' = 'parquet',"
+ "'write.target-file-size-bytes' = '134217728',"
+ "'partitioning' = 'arrival_airport_code',"
+ "'format-version' = '2');"
);
} else {
System.out.println("The " + tableName + " table already exists.");
}
} catch(final Exception e) {
System.out.println("A critical error occurred to during the processing of the table because " + e.getMessage());
System.exit(1);
}

tblEnv.createTemporaryView(tableName + "_view", tables[index]);

// --- Insert DataStream into the table
tblEnv.executeSql("INSERT INTO " + tableName + " SELECT * FROM " + tables[index]);
}

try {
// --- Execute the Flink job graph (DAG)
env.execute("DataGeneratorApp");
} catch (Exception e) {
System.out.println("The Flink App stopped early due to the following: " + e.getMessage());
e.printStackTrace();
}
}
}
23 changes: 8 additions & 15 deletions linux-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
FROM flink:1.19.1-scala_2.12-java17

# Build argument(s)
ARG FLINK_LANGUAGE
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY
ARG AWS_REGION
ARG AWS_S3_BUCKET

# Map build argument(s) to container environment variable(s)
ENV FLINK_LANGUAGE=${FLINK_LANGUAGE}
ENV AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
ENV AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
ENV AWS_REGION=${AWS_REGION}
Expand All @@ -24,7 +22,6 @@ ENV PYTHON_PATCH=.9
ENV PYTHON_VERSION_WITH_PATCH=${PYTHON_VERSION}${PYTHON_PATCH}
ENV HADOOP_VERSION=3.4.0
ENV ICEBERG_VERSION=1.6.1
ENV AWS_SDK_VERSION=1.12.772

# Create/set folder contianer environment variable(s)
ENV FLINK_CONF_DIR=/opt/flink/conf
Expand Down Expand Up @@ -63,20 +60,16 @@ RUN apt-get update && apt-get install -y \
python3-venv \
zip

# Download and install the Flink plugins
RUN curl -L "https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" -o "/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar"
RUN curl -L "${MAVEN_ROOT_URL}flink/flink-s3-fs-hadoop/${APACHE_FLINK_VERSION_WITH_PATCH}/flink-s3-fs-hadoop-${APACHE_FLINK_VERSION_WITH_PATCH}.jar" -o "${FLINK_LIB_DIR}flink-s3-fs-hadoop-${APACHE_FLINK_VERSION_WITH_PATCH}.jar"
RUN curl -L "${MAVEN_ROOT_URL}iceberg/iceberg-flink-runtime-${APACHE_FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${APACHE_FLINK_VERSION}-${ICEBERG_VERSION}.jar" -o "${FLINK_LIB_DIR}iceberg-flink-runtime-${APACHE_FLINK_VERSION}-${ICEBERG_VERSION}.jar"
RUN curl -L "${MAVEN_ROOT_URL}flink/flink-sql-connector-kafka/3.2.0-${APACHE_FLINK_VERSION}/flink-sql-connector-kafka-3.2.0-${APACHE_FLINK_VERSION}.jar" -o "${FLINK_LIB_DIR}flink-sql-connector-kafka-3.2.0-${APACHE_FLINK_VERSION}.jar"
RUN curl -L "${MAVEN_ROOT_URL}flink/flink-connector-kafka/3.2.0-${APACHE_FLINK_VERSION}/flink-connector-kafka-3.2.0-${APACHE_FLINK_VERSION}.jar" -o "${FLINK_LIB_DIR}flink-connector-kafka-3.2.0-${APACHE_FLINK_VERSION}.jar"
RUN curl -L "${MAVEN_ROOT_URL}flink/flink-json/${APACHE_FLINK_VERSION_WITH_PATCH}/flink-json-${APACHE_FLINK_VERSION_WITH_PATCH}.jar" -o "${FLINK_LIB_DIR}flink-json-${APACHE_FLINK_VERSION_WITH_PATCH}.jar"

# Download and install the Flink libraries that work with your Python-based Flink Apps
RUN if [ "$FLINK_LANGUAGE" = "python" ]; then \
# Download and install the Flink plugins and libraries
RUN curl -L "https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" -o "/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" && \
curl -L "${MAVEN_ROOT_URL}flink/flink-s3-fs-hadoop/${APACHE_FLINK_VERSION_WITH_PATCH}/flink-s3-fs-hadoop-${APACHE_FLINK_VERSION_WITH_PATCH}.jar" -o "${FLINK_LIB_DIR}flink-s3-fs-hadoop-${APACHE_FLINK_VERSION_WITH_PATCH}.jar" && \
curl -L "${MAVEN_ROOT_URL}iceberg/iceberg-flink-runtime-${APACHE_FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${APACHE_FLINK_VERSION}-${ICEBERG_VERSION}.jar" -o "${FLINK_LIB_DIR}iceberg-flink-runtime-${APACHE_FLINK_VERSION}-${ICEBERG_VERSION}.jar" && \
curl -L "${MAVEN_ROOT_URL}flink/flink-sql-connector-kafka/3.2.0-${APACHE_FLINK_VERSION}/flink-sql-connector-kafka-3.2.0-${APACHE_FLINK_VERSION}.jar" -o "${FLINK_LIB_DIR}flink-sql-connector-kafka-3.2.0-${APACHE_FLINK_VERSION}.jar" && \
curl -L "${MAVEN_ROOT_URL}flink/flink-connector-kafka/3.2.0-${APACHE_FLINK_VERSION}/flink-connector-kafka-3.2.0-${APACHE_FLINK_VERSION}.jar" -o "${FLINK_LIB_DIR}flink-connector-kafka-3.2.0-${APACHE_FLINK_VERSION}.jar" && \
curl -L "${MAVEN_ROOT_URL}flink/flink-json/${APACHE_FLINK_VERSION_WITH_PATCH}/flink-json-${APACHE_FLINK_VERSION_WITH_PATCH}.jar" -o "${FLINK_LIB_DIR}flink-json-${APACHE_FLINK_VERSION_WITH_PATCH}.jar" && \
curl -L "${MAVEN_ROOT_URL}iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar" -o "${FLINK_LIB_DIR}iceberg-aws-bundle-${ICEBERG_VERSION}.jar" && \
curl -L "${MAVEN_ROOT_URL}hadoop/hadoop-common/${HADOOP_VERSION}/hadoop-common-${HADOOP_VERSION}.jar" -o "${FLINK_LIB_DIR}hadoop-common-${HADOOP_VERSION}.jar" && \
curl -L "${MAVEN_ROOT_URL}kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" -o "${FLINK_LIB_DIR}kafka-clients-3.7.0.jar"; \
fi
curl -L "${MAVEN_ROOT_URL}kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.jar" -o "${FLINK_LIB_DIR}kafka-clients-3.7.0.jar";

# Set CLASSPATH environment variable
ENV CLASSPATH="${FLINK_LIB_DIR}*"
Expand Down
8 changes: 4 additions & 4 deletions linux-docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ services:
context: .
dockerfile: linux-Dockerfile
args:
FLINK_LANGUAGE: ${FLINK_LANGUAGE}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_REGION: ${AWS_REGION}
AWS_S3_BUCKET: ${AWS_S3_BUCKET}
FLINK_LANGUAGE: ${FLINK_LANGUAGE}
ports:
- 8081:8081
- 9092:9092
Expand All @@ -33,18 +33,18 @@ services:
- AWS_REGION=$AWS_REGION
- AWS_DEFAULT_REGION=$AWS_DEFAULT_REGION
- AWS_S3_BUCKET=${AWS_S3_BUCKET}
- FLINK_LANGUAGE=$FLINK_LANGUAGE
- FLINK_LANGUAGE=${FLINK_LANGUAGE}
# Apache Flink Task Manager
taskmanager:
build:
context: .
dockerfile: linux-Dockerfile
args:
FLINK_LANGUAGE: ${FLINK_LANGUAGE}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_REGION: ${AWS_REGION}
AWS_S3_BUCKET: ${AWS_S3_BUCKET}
FLINK_LANGUAGE: ${FLINK_LANGUAGE}
depends_on:
- jobmanager
command: taskmanager
Expand All @@ -70,4 +70,4 @@ services:
- AWS_REGION=$AWS_REGION
- AWS_DEFAULT_REGION=$AWS_DEFAULT_REGION
- AWS_S3_BUCKET=${AWS_S3_BUCKET}
- FLINK_LANGUAGE=$FLINK_LANGUAGE
- FLINK_LANGUAGE=${FLINK_LANGUAGE}
Loading

0 comments on commit b25d228

Please sign in to comment.