Skip to content

Commit

Permalink
feat(meta): ensure each command is applied to exactly one database (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Nov 1, 2024
1 parent 9dd7fba commit ca84504
Show file tree
Hide file tree
Showing 23 changed files with 468 additions and 272 deletions.
2 changes: 1 addition & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ message MigrationPlan {
}

message FlushRequest {
bool checkpoint = 1;
uint32 database_id = 1;
}

message FlushResponse {
Expand Down
8 changes: 7 additions & 1 deletion src/frontend/src/handler/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ pub(super) async fn handle_flush(handler_args: HandlerArgs) -> Result<RwPgRespon

pub(crate) async fn do_flush(session: &SessionImpl) -> Result<()> {
let client = session.env().meta_client();
let version_id = client.flush(true).await?;
let database_id = session
.env()
.catalog_reader()
.read_guard()
.get_database_by_name(session.database())?
.id();
let version_id = client.flush(database_id).await?;

// Wait for the snapshot to be synchronized, so that future reads in this session can see
// previous writes.
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus};
use risingwave_rpc_client::error::Result;
use risingwave_rpc_client::{HummockMetaClient, MetaClient};

use crate::catalog::DatabaseId;

/// A wrapper around the `MetaClient` that only provides a minor set of meta rpc.
/// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`,
/// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly.
Expand All @@ -48,7 +50,7 @@ use risingwave_rpc_client::{HummockMetaClient, MetaClient};
pub trait FrontendMetaClient: Send + Sync {
async fn try_unregister(&self);

async fn flush(&self, checkpoint: bool) -> Result<HummockVersionId>;
async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId>;

async fn wait(&self) -> Result<()>;

Expand Down Expand Up @@ -133,8 +135,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.try_unregister().await;
}

async fn flush(&self, checkpoint: bool) -> Result<HummockVersionId> {
self.0.flush(checkpoint).await
async fn flush(&self, database_id: DatabaseId) -> Result<HummockVersionId> {
self.0.flush(database_id).await
}

async fn wait(&self) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ pub struct MockFrontendMetaClient {}
impl FrontendMetaClient for MockFrontendMetaClient {
async fn try_unregister(&self) {}

async fn flush(&self, _checkpoint: bool) -> RpcResult<HummockVersionId> {
async fn flush(&self, _database_id: DatabaseId) -> RpcResult<HummockVersionId> {
Ok(INVALID_VERSION_ID)
}

Expand Down
89 changes: 48 additions & 41 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::model::TableParallelism;
use risingwave_meta::stream::{RescheduleOptions, ScaleControllerRef, WorkerReschedule};
Expand Down Expand Up @@ -118,47 +118,54 @@ impl ScaleService for ScaleServiceImpl {
} = request.into_inner();

let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;

let streaming_job_ids = self
for (database_id, worker_reschedules) in self
.metadata_manager
.catalog_controller
.get_fragment_job_id(
worker_reschedules
.keys()
.map(|id| *id as FragmentId)
.collect(),
)
.await?;

let table_parallelisms = streaming_job_ids
.into_iter()
.map(|id| (TableId::new(id as _), TableParallelism::Custom))
.collect();

self.stream_manager
.reschedule_actors(
worker_reschedules
.into_iter()
.map(|(fragment_id, reschedule)| {
let PbWorkerReschedule { worker_actor_diff } = reschedule;
(
fragment_id,
WorkerReschedule {
worker_actor_diff: worker_actor_diff
.into_iter()
.map(|(worker_id, diff)| (worker_id as _, diff as _))
.collect(),
},
)
})
.collect(),
RescheduleOptions {
resolve_no_shuffle_upstream,
skip_create_new_actors: false,
},
Some(table_parallelisms),
)
.await?;
.split_fragment_map_by_database(worker_reschedules)
.await?
{
let database_id = DatabaseId::new(database_id as _);
let streaming_job_ids = self
.metadata_manager
.catalog_controller
.get_fragment_job_id(
worker_reschedules
.keys()
.map(|id| *id as FragmentId)
.collect(),
)
.await?;

let table_parallelisms = streaming_job_ids
.into_iter()
.map(|id| (TableId::new(id as _), TableParallelism::Custom))
.collect();

self.stream_manager
.reschedule_actors(
database_id,
worker_reschedules
.into_iter()
.map(|(fragment_id, reschedule)| {
let PbWorkerReschedule { worker_actor_diff } = reschedule;
(
fragment_id,
WorkerReschedule {
worker_actor_diff: worker_actor_diff
.into_iter()
.map(|(worker_id, diff)| (worker_id as _, diff as _))
.collect(),
},
)
})
.collect(),
RescheduleOptions {
resolve_no_shuffle_upstream,
skip_create_new_actors: false,
},
Some(table_parallelisms),
)
.await?;
}

Ok(Response::new(RescheduleResponse {
success: true,
Expand Down
36 changes: 26 additions & 10 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_connector::source::SplitMetaData;
use risingwave_meta::controller::fragment::StreamingJobInfo;
use risingwave_meta::manager::{LocalNotification, MetadataManager};
use risingwave_meta::model;
use risingwave_meta::model::ActorId;
use risingwave_meta::stream::{SourceManagerRunningInfo, ThrottleConfig};
use risingwave_meta_model::{SourceId, StreamingParallelism};
use risingwave_meta_model::{ObjectId, SourceId, StreamingParallelism};
use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
use risingwave_pb::meta::list_actor_splits_response::FragmentType;
use risingwave_pb::meta::list_table_fragments_response::{
Expand Down Expand Up @@ -72,7 +72,7 @@ impl StreamManagerService for StreamServiceImpl {
self.env.idle_manager().record_activity();
let req = request.into_inner();

let version_id = self.barrier_scheduler.flush(req.checkpoint).await?;
let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?;
Ok(Response::new(FlushResponse {
status: None,
hummock_version_id: version_id.to_u64(),
Expand All @@ -81,17 +81,27 @@ impl StreamManagerService for StreamServiceImpl {

#[cfg_attr(coverage, coverage(off))]
async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
self.barrier_scheduler
.run_command(Command::pause(PausedReason::Manual))
.await?;
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(
DatabaseId::new(database_id as _),
Command::pause(PausedReason::Manual),
)
.await?;
}
Ok(Response::new(PauseResponse {}))
}

#[cfg_attr(coverage, coverage(off))]
async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
self.barrier_scheduler
.run_command(Command::resume(PausedReason::Manual))
.await?;
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(
DatabaseId::new(database_id as _),
Command::resume(PausedReason::Manual),
)
.await?;
}
Ok(Response::new(ResumeResponse {}))
}

Expand Down Expand Up @@ -123,6 +133,12 @@ impl StreamManagerService for StreamServiceImpl {
}
};

let database_id = self
.metadata_manager
.catalog_controller
.get_object_database_id(request.id as ObjectId)
.await?;
let database_id = DatabaseId::new(database_id as _);
// TODO: check whether shared source is correct
let mutation: ThrottleConfig = actor_to_apply
.iter()
Expand All @@ -138,7 +154,7 @@ impl StreamManagerService for StreamServiceImpl {
.collect();
let _i = self
.barrier_scheduler
.run_command(Command::Throttle(mutation))
.run_command(database_id, Command::Throttle(mutation))
.await?;

Ok(Response::new(ApplyThrottleResponse { status: None }))
Expand Down
37 changes: 17 additions & 20 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,9 @@ pub enum CreateStreamingJobType {
/// collected.
#[derive(Debug, Clone, strum::Display)]
pub enum Command {
/// `Plain` command generates a barrier with the mutation it carries.
///
/// Barriers from all actors marked as `Created` state will be collected.
/// After the barrier is collected, it does nothing.
Plain(Option<Mutation>),
/// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
/// all messages before the checkpoint barrier should have been committed.
Flush,

/// `Pause` command generates a `Pause` barrier with the provided [`PausedReason`] **only if**
/// the cluster is not already paused. Otherwise, a barrier with no mutation will be generated.
Expand Down Expand Up @@ -305,10 +303,6 @@ pub enum Command {
}

impl Command {
pub fn barrier() -> Self {
Self::Plain(None)
}

pub fn pause(reason: PausedReason) -> Self {
Self::Pause(reason)
}
Expand All @@ -319,7 +313,7 @@ impl Command {

pub(crate) fn fragment_changes(&self) -> Option<HashMap<FragmentId, CommandFragmentChanges>> {
match self {
Command::Plain(_) => None,
Command::Flush => None,
Command::Pause(_) => None,
Command::Resume(_) => None,
Command::DropStreamingJobs {
Expand Down Expand Up @@ -405,7 +399,7 @@ impl Command {

pub fn need_checkpoint(&self) -> bool {
// todo! Reviewing the flow of different command to reduce the amount of checkpoint
!matches!(self, Command::Plain(None) | Command::Resume(_))
!matches!(self, Command::Resume(_))
}
}

Expand Down Expand Up @@ -450,7 +444,7 @@ pub struct CommandContext {

pub table_ids_to_commit: HashSet<TableId>,

pub command: Command,
pub command: Option<Command>,

/// The tracing span of this command.
///
Expand All @@ -475,7 +469,7 @@ impl CommandContext {
barrier_info: BarrierInfo,
subscription_info: InflightSubscriptionInfo,
table_ids_to_commit: HashSet<TableId>,
command: Command,
command: Option<Command>,
span: tracing::Span,
) -> Self {
Self {
Expand All @@ -494,7 +488,7 @@ impl Command {
pub fn to_mutation(&self, current_paused_reason: Option<PausedReason>) -> Option<Mutation> {
let mutation =
match self {
Command::Plain(mutation) => mutation.clone(),
Command::Flush => None,

Command::Pause(_) => {
// Only pause when the cluster is not already paused.
Expand Down Expand Up @@ -896,11 +890,11 @@ impl Command {

/// Returns the paused reason after executing the current command.
pub fn next_paused_reason(
&self,
this: Option<&Self>,
current_paused_reason: Option<PausedReason>,
) -> Option<PausedReason> {
match self {
Command::Pause(reason) => {
match this {
Some(Command::Pause(reason)) => {
// Only pause when the cluster is not already paused.
if current_paused_reason.is_none() {
Some(*reason)
Expand All @@ -909,7 +903,7 @@ impl Command {
}
}

Command::Resume(reason) => {
Some(Command::Resume(reason)) => {
// Only resume when the cluster is paused with the same reason.
if current_paused_reason == Some(*reason) {
None
Expand Down Expand Up @@ -966,8 +960,11 @@ impl CommandContext {
&self,
barrier_manager_context: &GlobalBarrierWorkerContextImpl,
) -> MetaResult<()> {
match &self.command {
Command::Plain(_) => {}
let Some(command) = &self.command else {
return Ok(());
};
match command {
Command::Flush => {}

Command::Throttle(_) => {}

Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ impl CreatingStreamingJobControl {
pub(super) fn on_new_command(
&mut self,
control_stream_manager: &mut ControlStreamManager,
command: &Command,
command: Option<&Command>,
barrier_info: &BarrierInfo,
) -> MetaResult<()> {
let table_id = self.info.table_fragments.table_id();
let start_consume_upstream =
if let Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge) = command {
if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
jobs_to_merge.contains_key(&table_id)
} else {
false
Expand Down
Loading

0 comments on commit ca84504

Please sign in to comment.