Skip to content

Commit

Permalink
update flink migration impl and log msgs
Browse files Browse the repository at this point in the history
  • Loading branch information
izhangzhihao committed Jan 7, 2024
1 parent 9ad99ca commit c44f10b
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 6 deletions.
3 changes: 1 addition & 2 deletions flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ dependencies {
implementation "org.apache.flink:flink-clients:${flinkVersion}"
implementation "org.apache.flink:flink-connector-files:${flinkVersion}"
implementation "org.apache.flink:flink-table-planner_${scalaVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-table-api-java-uber:${flinkVersion}"
implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"

// --------------------------------------------------------------
// Dependencies that should be part of the shadow jar, e.g.
Expand Down Expand Up @@ -254,7 +254,6 @@ shadowJar {
relocate "com.google", "com.github.sharpdata.sharpetl.google"
relocate "org.apache.commons.net", "com.github.sharpdata.sharpetl.commons.net"
relocate "com.zaxxer.hikari", "com.github.sharpdata.sharpetl.hikari"
//from '../hadoop'
archiveFileName = "sharp-etl-flink-standalone-${flinkVersion}_${scalaVersion}-${version}.jar"
mergeServiceFiles {
// https://github.com/flyway/flyway/issues/3482#issuecomment-1493367875
Expand Down
8 changes: 6 additions & 2 deletions flink/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ etl.default.jobTime.column=job_time
flyway.url=jdbc:flink_sharp_etl://localhost/sharp_etl
flyway.catalog=paimon
flyway.database=sharp_etl
flyway.warehouse=file:///Users/izhangzhihao/Downloads/sharp-etl/paimon-warehouse
flyway.driver=com.github.sharpdata.sharpetl.flink.extra.driver.FlinkJdbcDriver
flyway.warehouse=oss://sh-flink/warehouse
flyway.endpoint=oss-cn-shanghai-internal.aliyuncs.com
flyway.ak=AKAKAKAKAKAKA
flyway.sk=SKSKSKSKSKSKSKSKSK

flink.default.__table.exec.sort.non-temporal.enabled__=true
flink.default.execution.runtime-mode=batch
flink.default.sql-client.execution.result-mode=tableau
flink.default.sql-client.execution.result-mode=tableau
flink.default.table.dml-sync=true
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ object Entrypoint {
val errorHandler: CommandLine.IExecutionExceptionHandler =
new CommandLine.IExecutionExceptionHandler() {
def handleExecutionException(ex: Exception, commandLine: CommandLine, parseResult: CommandLine.ParseResult): Int = {
println("Failed to execute job, exiting with error: " + ex.getMessage)
ex.printStackTrace()
commandLine.getCommandSpec.exitCodeOnExecutionException
}
Expand All @@ -18,6 +19,7 @@ object Entrypoint {
args: _*
)
if (!succeed(code)) {
println("Failed to execute job, exiting with code " + code)
System.exit(code)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class SingleFlinkJobCommand extends SingleJobCommand {
ETLConfig.setPropertyPath(propertyPath, env)
val etlDatabaseType = JDBCUtil.dbType
val interpreter = getFlinkInterpreter(local, wfName, releaseResource, etlDatabaseType, readQualityCheckRules())
migrate()
//JavaVersionChecker.checkJavaVersion()
try {
migrate()
val wfInterpretingResult: WfEvalResult = LogDrivenInterpreter(
WorkflowReader.readWorkflow(wfName),
interpreter,
Expand All @@ -32,6 +32,10 @@ class SingleFlinkJobCommand extends SingleJobCommand {
).eval()
new NotificationUtil(jobLogAccessor).notify(Seq(wfInterpretingResult))
throwFirstException(Seq(wfInterpretingResult))
} catch {
case e: Exception =>
ETLLogger.error("Failed to execute job", e)
throw e
} finally {
interpreter.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,16 @@ object ETLFlinkSession {
if (!catalog.isPresent) {
if (local) {
ETLLogger.info(s"catalog $catalogName not found, create it")
session.executeSql(s"CREATE CATALOG $catalogName WITH ('type' = 'paimon', 'warehouse' = '${ETLConfig.getProperty("flyway.warehouse")}')")
session.executeSql(
s"""
|CREATE CATALOG $catalogName
|WITH (
| 'type' = 'paimon',
| 'warehouse' = '${ETLConfig.getProperty("flyway.warehouse")}',
| 'fs.oss.endpoint' = '${ETLConfig.getProperty("flyway.endpoint")}',
| 'fs.oss.accessKeyId' = '${ETLConfig.getProperty("flyway.ak")}',
| 'fs.oss.accessKeySecret' = '${ETLConfig.getProperty("flyway.sk")}'
|)""".stripMargin)
ETLFlinkSession.batchEnv.useCatalog(catalogName)
session.executeSql(s"CREATE DATABASE IF NOT EXISTS ${ETLConfig.getProperty("flyway.database")}")
} else {
Expand Down

0 comments on commit c44f10b

Please sign in to comment.