Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: rename flow task to flow #3833

Merged
merged 25 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
eca3d3d
refactor: rename to `MIN_USER_FLOW_ID`
WenyXu Apr 28, 2024
4e8ffd7
refactor: rename to `FLOW_ID_SEQ`
WenyXu Apr 28, 2024
65d2746
refactor: rename to `flow_id_sequence`
WenyXu Apr 28, 2024
064d98d
refactor: rename to `FlowMetadataManager`
WenyXu Apr 28, 2024
e1b73c9
refactor: rename flow_task.rs to flow.rs
WenyXu Apr 28, 2024
a8f71ea
refactor: rename to FlowInfoManager
WenyXu Apr 28, 2024
6ead3ae
refactor: rename to FlowName
WenyXu Apr 28, 2024
b9a38a1
refactor: rename to FlownodeFlow
WenyXu Apr 28, 2024
d392432
refactor: rename to TableFlow
WenyXu Apr 28, 2024
7f91274
refactor: remove TASK
WenyXu Apr 28, 2024
1d6fd8a
refactor: rename to __flow
WenyXu Apr 28, 2024
6ca8e61
refactor: rename to flow_id
WenyXu Apr 28, 2024
a106ffb
refactor: rename to flow_name
WenyXu Apr 28, 2024
4202560
refactor: update comments
WenyXu Apr 28, 2024
85e7c00
refactor: rename to flow_metadata_manager
WenyXu Apr 28, 2024
4727159
refactor: rename to flow_metadata_allocator
WenyXu Apr 28, 2024
6aee746
refactor: rename to FlowMetadataAllocator
WenyXu Apr 28, 2024
c759957
refactor: rename task suffix
WenyXu Apr 28, 2024
0db6d5e
refactor: rename FlowTask to FlowInfo
WenyXu Apr 29, 2024
247d1a9
refactor: rename FlowTaskScoped to FlowScoped
WenyXu Apr 29, 2024
1e5c9d0
refactor: rename FlowTaskId to FlowId
WenyXu Apr 29, 2024
d63686a
chore: bump proto to b5412f7
WenyXu Apr 29, 2024
cb27452
chore: apply suggestions from CR
WenyXu Apr 29, 2024
4a66a12
chore: apply suggestions from CR
WenyXu Apr 29, 2024
dec1360
chore: apply suggestions from CR
WenyXu Apr 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aba235025ac5643c12bfdcefd656af11ad58ea8e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b5412f72257c18410fdccbb893fa5d245b846141" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
4 changes: 2 additions & 2 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlowTask(_)) => "ddl.create_flow",
Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task",
Some(Expr::CreateFlow(_)) => "ddl.create_flow",
Some(Expr::DropFlow(_)) => "ddl.drop_flow",
None => "ddl.empty",
}
}
Expand Down
34 changes: 17 additions & 17 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use std::{fs, path};
use async_trait::async_trait;
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_catalog::consts::{MIN_USER_FLOW_TASK_ID, MIN_USER_TABLE_ID};
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::task_meta::{FlowTaskMetadataAllocator, FlowTaskMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow_task::{FlowTaskMetadataManager, FlowTaskMetadataManagerRef};
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::NodeManagerRef;
Expand All @@ -47,7 +47,7 @@ use frontend::server::Services;
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
use meta_srv::metasrv::{FLOW_TASK_ID_SEQ, TABLE_ID_SEQ};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
Expand Down Expand Up @@ -419,9 +419,9 @@ impl StartCommand {
.step(10)
.build(),
);
let flow_task_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_TASK_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_TASK_ID as u64)
let flow_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_ID as u64)
.step(10)
.build(),
);
Expand All @@ -431,23 +431,23 @@ impl StartCommand {
));
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let flow_task_metadata_manager = Arc::new(FlowTaskMetadataManager::new(kv_backend.clone()));
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let flow_task_meta_allocator = Arc::new(
FlowTaskMetadataAllocator::with_noop_peer_allocator(flow_task_id_sequence),
);
let flow_meta_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
flow_id_sequence,
));

let ddl_task_executor = Self::create_ddl_task_executor(
procedure_manager.clone(),
node_manager.clone(),
multi_cache_invalidator,
table_metadata_manager,
table_meta_allocator,
flow_task_metadata_manager,
flow_task_meta_allocator,
flow_metadata_manager,
flow_meta_allocator,
)
.await?;

Expand Down Expand Up @@ -480,8 +480,8 @@ impl StartCommand {
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
flow_task_metadata_manager: FlowTaskMetadataManagerRef,
flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
flow_metadata_manager: FlowMetadataManagerRef,
flow_metadata_allocator: FlowMetadataAllocatorRef,
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
Expand All @@ -491,8 +491,8 @@ impl StartCommand {
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
table_metadata_manager,
table_metadata_allocator,
flow_task_metadata_manager,
flow_task_metadata_allocator,
flow_metadata_manager,
flow_metadata_allocator,
},
procedure_manager,
true,
Expand Down
4 changes: 2 additions & 2 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ pub const DEFAULT_CATALOG_NAME: &str = "greptime";
pub const DEFAULT_SCHEMA_NAME: &str = "public";
pub const DEFAULT_PRIVATE_SCHEMA_NAME: &str = "greptime_private";

/// Reserves [0,MIN_USER_FLOW_TASK_ID) for internal usage.
/// Reserves [0,MIN_USER_FLOW_ID) for internal usage.
/// User defined table id starts from this value.
pub const MIN_USER_FLOW_TASK_ID: u32 = 1024;
pub const MIN_USER_FLOW_ID: u32 = 1024;
/// Reserves [0,MIN_USER_TABLE_ID) for internal usage.
/// User defined table id starts from this value.
pub const MIN_USER_TABLE_ID: u32 = 1024;
Expand Down
14 changes: 7 additions & 7 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};

use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::task_meta::FlowTaskMetadataAllocatorRef;
use crate::error::Result;
use crate::key::flow_task::FlowTaskMetadataManagerRef;
use crate::key::flow::FlowMetadataManagerRef;
use crate::key::table_route::PhysicalTableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
Expand All @@ -39,9 +39,9 @@ pub mod create_table;
mod create_table_template;
pub mod drop_database;
pub mod drop_table;
pub mod flow_meta;
mod physical_table_metadata;
pub mod table_meta;
pub mod task_meta;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
#[cfg(test)]
Expand Down Expand Up @@ -110,8 +110,8 @@ pub struct DdlContext {
pub table_metadata_manager: TableMetadataManagerRef,
/// Allocator for table metadata.
pub table_metadata_allocator: TableMetadataAllocatorRef,
/// Flow task metadata manager.
pub flow_task_metadata_manager: FlowTaskMetadataManagerRef,
/// Allocator for flow task metadata.
pub flow_task_metadata_allocator: FlowTaskMetadataAllocatorRef,
/// Flow metadata manager.
pub flow_metadata_manager: FlowMetadataManagerRef,
/// Allocator for flow metadata.
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
}
86 changes: 43 additions & 43 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ use super::utils::add_peer_context_if_needed;
use crate::ddl::utils::handle_retry_error;
use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::flow_task::flow_task_info::FlowTaskInfoValue;
use crate::key::FlowTaskId;
use crate::lock_key::{CatalogLock, FlowTaskNameLock, TableNameLock};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::FlowId;
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateFlowTask;
use crate::{metrics, ClusterId};

/// The procedure of flow task creation.
/// The procedure of flow creation.
pub struct CreateFlowProcedure {
pub context: DdlContext,
pub data: CreateFlowTaskData,
pub data: CreateFlowData,
}

impl CreateFlowProcedure {
Expand All @@ -56,13 +56,13 @@ impl CreateFlowProcedure {
pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self {
Self {
context,
data: CreateFlowTaskData {
data: CreateFlowData {
cluster_id,
task,
flow_task_id: None,
flow_id: None,
peers: vec![],
source_table_ids: vec![],
state: CreateFlowTaskState::CreateMetadata,
state: CreateFlowState::CreateMetadata,
},
}
}
Expand All @@ -76,51 +76,51 @@ impl CreateFlowProcedure {
async fn on_prepare(&mut self) -> Result<Status> {
self.check_creation().await?;
self.collect_source_tables().await?;
self.allocate_flow_task_id().await?;
self.data.state = CreateFlowTaskState::CreateFlows;
self.allocate_flow_id().await?;
self.data.state = CreateFlowState::CreateFlows;

Ok(Status::executing(true))
}

async fn on_flownode_create_flows(&mut self) -> Result<Status> {
// Safety: must be allocated.
let mut create_flow_task = Vec::with_capacity(self.data.peers.len());
let mut create_flow = Vec::with_capacity(self.data.peers.len());
for peer in &self.data.peers {
let requester = self.context.node_manager.flownode(peer).await;
let request = FlowRequest {
body: Some(PbFlowRequest::Create((&self.data).into())),
};
create_flow_task.push(async move {
create_flow.push(async move {
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(peer.clone()))
});
}

join_all(create_flow_task)
join_all(create_flow)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

self.data.state = CreateFlowTaskState::CreateMetadata;
self.data.state = CreateFlowState::CreateMetadata;
Ok(Status::executing(true))
}

/// Creates flow task metadata.
/// Creates flow metadata.
///
/// Abort(not-retry):
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self) -> Result<Status> {
// Safety: The flow task id must be allocated.
let flow_task_id = self.data.flow_task_id.unwrap();
// Safety: The flow id must be allocated.
let flow_id = self.data.flow_id.unwrap();
// TODO(weny): Support `or_replace`.
self.context
.flow_task_metadata_manager
.create_flow_task_metadata(flow_task_id, (&self.data).into())
.flow_metadata_manager
.create_flow_metadata(flow_id, (&self.data).into())
.await?;
info!("Created flow task metadata for flow {flow_task_id}");
Ok(Status::done_with_output(flow_task_id))
info!("Created flow metadata for flow {flow_id}");
Ok(Status::done_with_output(flow_id))
}
}

Expand All @@ -133,14 +133,14 @@ impl Procedure for CreateFlowProcedure {
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;

let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK
let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW
.with_label_values(&[state.as_ref()])
.start_timer();

match state {
CreateFlowTaskState::Prepare => self.on_prepare().await,
CreateFlowTaskState::CreateFlows => self.on_flownode_create_flows().await,
CreateFlowTaskState::CreateMetadata => self.on_create_metadata().await,
CreateFlowState::Prepare => self.on_prepare().await,
CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
CreateFlowState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
}
Expand All @@ -151,7 +151,7 @@ impl Procedure for CreateFlowProcedure {

fn lock_key(&self) -> LockKey {
let catalog_name = &self.data.task.catalog_name;
let task_name = &self.data.task.task_name;
let flow_name = &self.data.task.flow_name;
let sink_table_name = &self.data.task.sink_table_name;

LockKey::new(vec![
Expand All @@ -162,14 +162,14 @@ impl Procedure for CreateFlowProcedure {
&sink_table_name.catalog_name,
)
.into(),
FlowTaskNameLock::new(catalog_name, task_name).into(),
FlowNameLock::new(catalog_name, flow_name).into(),
])
}
}

/// The state of [CreateFlowTaskProcedure].
/// The state of [CreateFlowProcedure].
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateFlowTaskState {
pub enum CreateFlowState {
/// Prepares to create the flow.
Prepare,
/// Creates flows on the flownode.
Expand All @@ -180,22 +180,22 @@ pub enum CreateFlowTaskState {

/// The serializable data.
#[derive(Debug, Serialize, Deserialize)]
pub struct CreateFlowTaskData {
pub struct CreateFlowData {
pub(crate) cluster_id: ClusterId,
pub(crate) state: CreateFlowTaskState,
pub(crate) state: CreateFlowState,
pub(crate) task: CreateFlowTask,
pub(crate) flow_task_id: Option<FlowTaskId>,
pub(crate) flow_id: Option<FlowId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
}

impl From<&CreateFlowTaskData> for CreateRequest {
fn from(value: &CreateFlowTaskData) -> Self {
let flow_task_id = value.flow_task_id.unwrap();
impl From<&CreateFlowData> for CreateRequest {
fn from(value: &CreateFlowData) -> Self {
let flow_id = value.flow_id.unwrap();
let source_table_ids = &value.source_table_ids;

CreateRequest {
task_id: Some(api::v1::flow::TaskId { id: flow_task_id }),
flow_id: Some(api::v1::flow::TaskId { id: flow_id }),
source_table_ids: source_table_ids
.iter()
.map(|table_id| api::v1::TableId { id: *table_id })
Expand All @@ -206,21 +206,21 @@ impl From<&CreateFlowTaskData> for CreateRequest {
expire_when: value.task.expire_when.clone(),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
task_options: value.task.options.clone(),
flow_options: value.task.flow_options.clone(),
}
}
}

impl From<&CreateFlowTaskData> for FlowTaskInfoValue {
fn from(value: &CreateFlowTaskData) -> Self {
impl From<&CreateFlowData> for FlowInfoValue {
fn from(value: &CreateFlowData) -> Self {
let CreateFlowTask {
catalog_name,
task_name,
flow_name,
sink_table_name,
expire_when,
comment,
sql,
options,
flow_options: options,
..
} = value.task.clone();

Expand All @@ -231,12 +231,12 @@ impl From<&CreateFlowTaskData> for FlowTaskInfoValue {
.map(|(idx, peer)| (idx as u32, peer.id))
.collect::<BTreeMap<_, _>>();

FlowTaskInfoValue {
FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
sink_table_name,
flownode_ids,
catalog_name,
task_name,
flow_name,
raw_sql: sql,
expire_when,
comment,
Expand Down
Loading
Loading