diff --git a/src/common/meta/src/lock_key.rs b/src/common/meta/src/lock_key.rs index ad09c064d31d..456d1ccffad7 100644 --- a/src/common/meta/src/lock_key.rs +++ b/src/common/meta/src/lock_key.rs @@ -22,6 +22,7 @@ const CATALOG_LOCK_PREFIX: &str = "__catalog_lock"; const SCHEMA_LOCK_PREFIX: &str = "__schema_lock"; const TABLE_LOCK_PREFIX: &str = "__table_lock"; const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock"; +const FLOW_TASK_NAME_LOCK_PREFIX: &str = "__flow_task_name_lock"; const REGION_LOCK_PREFIX: &str = "__region_lock"; /// [CatalogLock] acquires the lock on the tenant level. @@ -110,6 +111,32 @@ impl From for StringKey { } } +/// [FlowTaskNameLock] prevents any procedures trying to create a flow task named it. +pub enum FlowTaskNameLock { + Write(String), +} + +impl FlowTaskNameLock { + pub fn new(catalog: &str, table: &str) -> Self { + Self::Write(format!("{catalog}.{table}")) + } +} + +impl Display for FlowTaskNameLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let FlowTaskNameLock::Write(name) = self; + write!(f, "{}/{}", FLOW_TASK_NAME_LOCK_PREFIX, name) + } +} + +impl From for StringKey { + fn from(value: FlowTaskNameLock) -> Self { + match value { + FlowTaskNameLock::Write(_) => StringKey::Exclusive(value.to_string()), + } + } +} + /// [TableLock] acquires the lock on the table level. /// /// Note: Allows to read/modify the corresponding table's [TableInfoValue](crate::key::table_info::TableInfoValue),