Skip to content

Commit

Permalink
chore: rename to CreateFlowProcedure
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Apr 28, 2024
1 parent 330dab5 commit 7b663e5
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, Procedu
pub mod alter_logical_tables;
pub mod alter_table;
pub mod create_database;
pub mod create_flow_task;
pub mod create_flow;
pub mod create_logical_tables;
pub mod create_table;
mod create_table_template;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ use crate::rpc::ddl::CreateFlowTask;
use crate::{metrics, ClusterId};

/// The procedure of flow task creation.
pub struct CreateFlowTaskProcedure {
pub struct CreateFlowProcedure {
pub context: DdlContext,
pub data: CreateFlowTaskData,
}

impl CreateFlowTaskProcedure {
impl CreateFlowProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlowTask";

/// Returns a new [CreateFlowTaskProcedure].
/// Returns a new [CreateFlowProcedure].
pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self {
Self {
context,
Expand All @@ -70,7 +70,7 @@ impl CreateFlowTaskProcedure {
/// Deserializes from `json`.
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(CreateFlowTaskProcedure { context, data })
Ok(CreateFlowProcedure { context, data })
}

async fn on_prepare(&mut self) -> Result<Status> {
Expand All @@ -87,7 +87,7 @@ impl CreateFlowTaskProcedure {
// Safety: must be allocated.
let mut create_flow_task = Vec::with_capacity(self.data.peers.len());
for peer in &self.data.peers {
let requester = self.context.datanode_manager.flownode(peer).await;
let requester = self.context.node_manager.flownode(peer).await;
let request = FlowRequest {
body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())),
};
Expand Down Expand Up @@ -126,7 +126,7 @@ impl CreateFlowTaskProcedure {
}

#[async_trait]
impl Procedure for CreateFlowTaskProcedure {
impl Procedure for CreateFlowProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

use snafu::ensure;

use crate::ddl::create_flow_task::CreateFlowTaskProcedure;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::error::{self, Result};

impl CreateFlowTaskProcedure {
impl CreateFlowProcedure {
/// Checks:
/// - The new task name doesn't exist.
/// - The source tables exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::ddl::create_flow_task::CreateFlowTaskProcedure;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;

impl CreateFlowTaskProcedure {
impl CreateFlowProcedure {
/// Allocates the [FlowTaskId].
pub(crate) async fn allocate_flow_task_id(&mut self) -> Result<()> {
//TODO(weny, ruihang): We doesn't support the partitions. It's always be 1, now.
Expand Down

0 comments on commit 7b663e5

Please sign in to comment.