Skip to content

Commit

Permalink
Support Streaming JOB
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs committed Jul 28, 2023
1 parent 6e37a95 commit 3ad3f98
Show file tree
Hide file tree
Showing 25 changed files with 496 additions and 111 deletions.
36 changes: 0 additions & 36 deletions Event.md

This file was deleted.

80 changes: 44 additions & 36 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ terminal String
KW_ENGINES,
KW_ENTER,
KW_ERRORS,
KW_EVENT,
KW_EVENTS,
KW_EVERY,
KW_EXCEPT,
Expand Down Expand Up @@ -428,6 +427,7 @@ terminal String
KW_ISOLATION,
KW_INVERTED,
KW_JOB,
KW_JOBS,
KW_JOIN,
KW_JSON,
KW_JSONB,
Expand Down Expand Up @@ -581,6 +581,7 @@ terminal String
KW_STOP,
KW_STORAGE,
KW_STREAM,
KW_STREAMING,
KW_STRING,
KW_STRUCT,
KW_SUM,
Expand All @@ -594,6 +595,7 @@ terminal String
KW_TABLET,
KW_TABLETS,
KW_TASK,
KW_TASKS,
KW_TEMPORARY,
KW_TERMINATED,
KW_TEXT,
Expand Down Expand Up @@ -668,7 +670,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt,
show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, show_create_load_stmt, show_create_reporitory_stmt,
describe_stmt, alter_stmt,
create_event_stmt,pause_event_stmt,resume_event_stmt,stop_event_stmt,show_event_stmt,
create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
Expand Down Expand Up @@ -803,9 +805,8 @@ nonterminal Qualifier opt_set_qualifier;
nonterminal Operation set_op;
nonterminal ArrayList<String> opt_common_hints;
nonterminal String optional_on_ident;
nonterminal String opt_event_starts;
nonterminal String opt_event_ends;
nonterminal String event_do;
nonterminal String opt_job_starts;
nonterminal String opt_job_ends;

nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;

Expand Down Expand Up @@ -1138,16 +1139,16 @@ stmt ::=
{: RESULT = stmt; :}
| show_create_routine_load_stmt : stmt
{: RESULT = stmt; :}
| create_event_stmt : stmt
| create_job_stmt : stmt
{: RESULT = stmt; :}
| pause_event_stmt : stmt
| pause_job_stmt : stmt
{: RESULT = stmt; :}
| show_event_stmt : stmt
| show_job_stmt : stmt
{: RESULT = stmt; :}
| stop_event_stmt : stmt
{: RESULT = stmt; :}
| resume_event_stmt : stmt
| stop_job_stmt : stmt
{: RESULT = stmt; :}
| resume_job_stmt : stmt
{: RESULT = stmt; :}
| show_create_load_stmt : stmt
{: RESULT = stmt; :}
| show_create_reporitory_stmt : stmt
Expand Down Expand Up @@ -2630,19 +2631,24 @@ resource_desc ::=
RESULT = new ResourceDesc(resourceName, properties);
:}
;
create_event_stmt ::=
KW_CREATE KW_EVENT job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY INTEGER_LITERAL:time_interval ident:time_unit opt_event_starts:startsTime opt_event_ends:endsTime opt_comment:comment KW_DO stmt:executeSql
create_job_stmt ::=
KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY INTEGER_LITERAL:time_interval ident:time_unit opt_job_starts:startsTime opt_job_ends:endsTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,null,time_interval,time_unit, startsTime, endsTime,comment,executeSql);
CreateJobStmt stmt = new CreateJobStmt(jobLabel,null,false,time_interval,time_unit, startsTime, endsTime,comment,executeSql);
RESULT = stmt;
:}
| KW_CREATE KW_EVENT job_label:jobLabel KW_ON KW_SCHEDULER KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
| KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_STREAMING KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,null,null,null,null,comment,executeSql);
CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,true,null,null,null,null,comment,executeSql);
RESULT = stmt;
:}
| KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,false,null,null,null,null,comment,executeSql);
RESULT = stmt;
:}
;
opt_event_starts ::=
opt_job_starts ::=
{:
RESULT = null;
:}
Expand All @@ -2652,50 +2658,48 @@ create_event_stmt ::=
:}
;

opt_event_ends ::=
opt_job_ends ::=
{:
RESULT = null;
:}
| KW_ENDS STRING_LITERAL:endTime
{:
RESULT = endTime;
:}
;
event_do ::=
KW_DO STRING_LITERAL:sqlString
{:
RESULT = sqlString;
:}
;
resume_event_stmt ::=
KW_RESUME KW_EVENT job_label:jobLabel
;
resume_job_stmt ::=
KW_RESUME KW_JOB job_label:jobLabel
{:
RESULT = new ResumeJobStmt(jobLabel);
:}
;
show_event_stmt ::=
KW_SHOW KW_EVENT
show_job_stmt ::=
KW_SHOW KW_JOBS
{:
RESULT = new ShowJobStmt(null,null);
:}
| KW_SHOW KW_EVENT KW_FOR job_label:jobLabel
| KW_SHOW KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobStmt(jobLabel,null);
:}
| KW_SHOW KW_JOB KW_TASKS KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobTaskStmt(jobLabel);
:}
;
pause_event_stmt ::=
KW_PAUSE KW_EVENT KW_FOR job_label:jobLabel
pause_job_stmt ::=
KW_PAUSE KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new PauseJobStmt(jobLabel);
:}
;

stop_event_stmt ::=
KW_STOP KW_EVENT KW_FOR job_label:jobLabel
stop_job_stmt ::=
KW_STOP KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new StopJobStmt(jobLabel);
:}
;
;
// Routine load statement
create_routine_load_stmt ::=
KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel optional_on_ident:tableName
Expand Down Expand Up @@ -7491,6 +7495,8 @@ keyword ::=
{: RESULT = id; :}
| KW_JOB:id
{: RESULT = id; :}
| KW_JOBS:id
{: RESULT = id; :}
| KW_JSON:id
{: RESULT = id; :}
| KW_JSONB:id
Expand Down Expand Up @@ -7629,6 +7635,8 @@ keyword ::=
{: RESULT = id; :}
| KW_STREAM:id
{: RESULT = id; :}
| KW_STREAMING:id
{: RESULT = id; :}
| KW_STRUCT:id
{: RESULT = id; :}
| KW_STRING:id
Expand Down Expand Up @@ -7695,7 +7703,7 @@ keyword ::=
{: RESULT = id; :}
| KW_TASK:id
{: RESULT = id; :}
| KW_EVENT:id
| KW_TASKS:id
{: RESULT = id; :}
| KW_ROUTINE:id
{: RESULT = id; :}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public class CreateJobStmt extends DdlStmt {
private static final ImmutableSet<String> supportStmtClassName = new ImmutableSet.Builder<String>()
.add(NativeInsertStmt.class.getName()).build();

public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, Long interval, String intervalTimeUnit,
public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, Boolean isStreamingJob,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) {
this.labelName = labelName;
this.onceJobStartTimestamp = onceJobStartTimestamp;
Expand All @@ -93,6 +94,7 @@ public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, Long int
this.comment = comment;
this.stmt = doStmt;
this.job = new Job();
job.setStreamingJob(isStreamingJob);
}

private String parseExecuteSql(String sql) throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.List;

/**
* SHOW EVENT [FOR JobName]
* SHOW JOB [FOR JobName]
* eg: show event
* return all job in connection db
* eg: show event for test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;

import java.util.List;

/**
* SHOW JOB TASKS [FOR JobName]
*/
public class ShowJobTaskStmt extends ShowStmt {

private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("JobId")
.add("TaskId")
.add("StartTime")
.add("EndTime")
.add("Status")
.add("ErrorMsg")
.build();

private final LabelName labelName;
private String dbFullName; // optional
private String name; // optional

public ShowJobTaskStmt(LabelName labelName) {
this.labelName = labelName;
}

public String getDbFullName() {
return dbFullName;
}

public String getName() {
return name;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
checkLabelName(analyzer);
}

private void checkAuth() throws AnalysisException {
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
}
}

private void checkLabelName(Analyzer analyzer) throws AnalysisException {
String dbName = labelName == null ? null : labelName.getDbName();
if (Strings.isNullOrEmpty(dbName)) {
dbFullName = analyzer.getContext().getDatabase();
if (Strings.isNullOrEmpty(dbFullName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
} else {
dbFullName = ClusterNamespace.getFullName(getClusterName(), dbName);
}
if (null == labelName) {
throw new AnalysisException("Job name is null");
}
name = labelName.getLabelName();
}

public static List<String> getTitleNames() {
return TITLE_NAMES;
}

@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();

for (String title : TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
}

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}
Loading

0 comments on commit 3ad3f98

Please sign in to comment.