Skip to content

Commit

Permalink
Feat solid split n (#3098)
Browse files Browse the repository at this point in the history
Signed-off-by: Lichao <lecho.sun@gmail.com>
Co-authored-by: leechor <leechor@users.noreply.github.com>
  • Loading branch information
leechor and leechor authored Feb 17, 2024
1 parent d54f19d commit 3d753f1
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.dinky.service.DataBaseService;
import org.dinky.service.StudioService;
import org.dinky.service.TaskService;
import org.dinky.sql.FlinkQuery;
import org.dinky.utils.FlinkTableMetadataUtil;
import org.dinky.utils.RunTimeUtil;

Expand Down Expand Up @@ -74,6 +73,7 @@ public class StudioServiceImpl implements StudioService {
private final DataBaseService dataBaseService;
private final TaskService taskService;
private final Cache<String, JobManager> jobManagerCache = CacheUtil.newTimedCache(1000 * 60 * 2);
private final String DEFAULT_CATALOG = "default_catalog";

private IResult executeMSFlinkSql(StudioMetaStoreDTO studioMetaStoreDTO) {
String envSql = taskService.buildEnvSql(studioMetaStoreDTO);
Expand Down Expand Up @@ -142,7 +142,7 @@ public List<Catalog> getMSCatalogs(StudioMetaStoreDTO studioMetaStoreDTO) {
if (Dialect.isCommonSql(studioMetaStoreDTO.getDialect())) {
DataBase dataBase = dataBaseService.getById(studioMetaStoreDTO.getDatabaseId());
if (!Asserts.isNull(dataBase)) {
Catalog defaultCatalog = Catalog.build(FlinkQuery.defaultCatalog());
Catalog defaultCatalog = Catalog.build(DEFAULT_CATALOG);
Driver driver = Driver.build(dataBase.getDriverConfig());
defaultCatalog.setSchemas(driver.listSchemas());
catalogs.add(defaultCatalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.assertion.DinkyAssert;
import org.dinky.config.Dialect;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.context.TenantContextHolder;
import org.dinky.data.annotations.ProcessStep;
import org.dinky.data.app.AppParamConfig;
Expand Down Expand Up @@ -644,8 +645,6 @@ public List<Task> listFlinkSQLEnv() {
@Transactional(rollbackFor = Exception.class)
public Task initDefaultFlinkSQLEnv(Integer tenantId) {
TenantContextHolder.set(tenantId);
String separator = SystemConfiguration.getInstances().getSqlSeparator();
separator = separator.replace("\\r", "\r").replace("\\n", "\n");
String name = "DefaultCatalog";

Task defaultFlinkSQLEnvTask = getTaskByNameAndTenantId(name, tenantId);
Expand All @@ -658,7 +657,11 @@ public Task initDefaultFlinkSQLEnv(Integer tenantId) {
+ "'password' = '%s',\n"
+ " 'url' = '%s'\n"
+ ")%suse catalog my_catalog%s",
dsProperties.getUsername(), dsProperties.getPassword(), dsProperties.getUrl(), separator, separator);
dsProperties.getUsername(),
dsProperties.getPassword(),
dsProperties.getUrl(),
FlinkSQLConstant.SEPARATOR,
FlinkSQLConstant.SEPARATOR);

if (null != defaultFlinkSQLEnvTask) {
defaultFlinkSQLEnvTask.setStatement(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ public static void submit(AppParamConfig config) throws SQLException {
loadDep(appTask.getType(), config.getTaskId(), executorConfig);
log.info("The job configuration is as follows: {}", executorConfig);

String[] statements =
SqlUtil.getStatements(sql, SystemConfiguration.getInstances().getSqlSeparator());
String[] statements = SqlUtil.getStatements(sql);
Optional<JobClient> jobClient = Optional.empty();
try {
if (Dialect.FLINK_JAR == appTask.getDialect()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,6 @@ public boolean isUseRestAPI() {
return useRestAPI.getValue();
}

public String getSqlSeparator() {
return sqlSeparator.getValue();
}

public int getJobIdWait() {
return jobIdWait.getValue();
}
Expand Down
4 changes: 2 additions & 2 deletions dinky-common/src/main/java/org/dinky/utils/SqlUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.dinky.utils;

import org.dinky.assertion.Asserts;
import org.dinky.data.model.SystemConfiguration;

import java.util.Map;
import java.util.regex.Pattern;
Expand All @@ -33,11 +32,12 @@
public class SqlUtil {

private static final String SEMICOLON = ";";
private static final String SQL_SEPARATOR = ";\\s*(?:\\n|--.*)";

private SqlUtil() {}

public static String[] getStatements(String sql) {
return getStatements(sql, SystemConfiguration.getInstances().getSqlSeparator());
return getStatements(sql, SQL_SEPARATOR);
}

public static String[] getStatements(String sql, String sqlSeparator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.dinky.constant;

import org.dinky.sql.FlinkQuery;

/**
* FlinkSQLConstant
*
Expand All @@ -30,13 +28,7 @@ public class FlinkSQLConstant {
private FlinkSQLConstant() {}

/** 分隔符 */
public static final String SEPARATOR = FlinkQuery.separator();
/** DDL 类型 */
public static final String DDL = "DDL";
/** DML 类型 */
public static final String DML = "DML";
/** DATASTREAM 类型 */
public static final String DATASTREAM = "DATASTREAM";
public static final String SEPARATOR = ";\\n";

/** The define identifier of FlinkSQL Variable */
public static final String VARIABLES = ":=";
Expand Down
28 changes: 7 additions & 21 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.dinky.explainer;

import org.dinky.assertion.Asserts;
import org.dinky.constant.FlinkSQLConstant;
import org.dinky.data.model.LineageRel;
import org.dinky.data.model.SystemConfiguration;
import org.dinky.data.result.ExplainResult;
import org.dinky.data.result.SqlExplainResult;
import org.dinky.executor.CustomTableEnvironment;
Expand Down Expand Up @@ -77,34 +75,22 @@ public class Explainer {

private Executor executor;
private boolean useStatementSet;
private String sqlSeparator;
private ObjectMapper mapper = new ObjectMapper();
private JobManager jobManager;

public Explainer(Executor executor, boolean useStatementSet, JobManager jobManager) {
this(executor, useStatementSet, FlinkSQLConstant.SEPARATOR, jobManager);
init();
}

public Explainer(Executor executor, boolean useStatementSet, String sqlSeparator, JobManager jobManager) {
this.executor = executor;
this.useStatementSet = useStatementSet;
this.sqlSeparator = sqlSeparator;
this.jobManager = jobManager;
}

public void init() {
sqlSeparator = SystemConfiguration.getInstances().getSqlSeparator();
}

public static Explainer build(
Executor executor, boolean useStatementSet, String sqlSeparator, JobManager jobManager) {
return new Explainer(executor, useStatementSet, sqlSeparator, jobManager);
public static Explainer build(Executor executor, boolean useStatementSet, JobManager jobManager) {
return new Explainer(executor, useStatementSet, jobManager);
}

public Explainer initialize(JobConfig config, String statement) {
DinkyClassLoaderUtil.initClassLoader(config, jobManager.getDinkyClassLoader());
String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(statement), sqlSeparator);
String[] statements = SqlUtil.getStatements(SqlUtil.removeNote(statement));
List<UDF> udfs = parseUDFFromStatements(statements);
jobManager.setJobParam(new JobParam(udfs));
try {
Expand Down Expand Up @@ -199,7 +185,7 @@ public List<UDF> parseUDFFromStatements(String[] statements) {

public ExplainResult explainSql(String statement) {
log.info("Start explain FlinkSQL...");
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
List<SqlExplainResult> sqlExplainRecords = new ArrayList<>();
int index = 1;
boolean correct = true;
Expand Down Expand Up @@ -335,7 +321,7 @@ public ExplainResult explainSql(String statement) {
}

public ObjectNode getStreamGraph(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
jobParam.getDdl().forEach(statementParam -> executor.executeSql(statementParam.getValue()));

if (!jobParam.getTrans().isEmpty()) {
Expand All @@ -351,7 +337,7 @@ public ObjectNode getStreamGraph(String statement) {
}

public JobPlanInfo getJobPlanInfo(String statement) {
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement, sqlSeparator));
JobParam jobParam = pretreatStatements(SqlUtil.getStatements(statement));
jobParam.getDdl().forEach(statementParam -> executor.executeSql(statementParam.getValue()));

if (!jobParam.getTrans().isEmpty()) {
Expand Down Expand Up @@ -380,7 +366,7 @@ public List<LineageRel> getLineage(String statement) {
this.initialize(jobConfig, statement);

List<LineageRel> lineageRelList = new ArrayList<>();
for (String item : SqlUtil.getStatements(statement, sqlSeparator)) {
for (String item : SqlUtil.getStatements(statement)) {
try {
String sql = FlinkInterceptor.pretreatStatement(executor, item);
if (Asserts.isNullString(sql)) {
Expand Down
2 changes: 0 additions & 2 deletions dinky-core/src/main/java/org/dinky/job/JobBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public abstract class JobBuilder {
protected Executor executor;
protected boolean useStatementSet;
protected boolean useGateway;
protected String sqlSeparator;
protected Job job;

public JobBuilder(JobManager jobManager) {
Expand All @@ -42,7 +41,6 @@ public JobBuilder(JobManager jobManager) {
this.executor = jobManager.getExecutor();
this.useStatementSet = jobManager.isUseStatementSet();
this.useGateway = jobManager.isUseGateway();
this.sqlSeparator = jobManager.getSqlSeparator();
this.job = jobManager.getJob();
}

Expand Down
Loading

0 comments on commit 3d753f1

Please sign in to comment.