Skip to content

Commit

Permalink
refactor: move mirror insertion tasks to the background runtime (#3879)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored May 8, 2024
1 parent 9d8f72d commit d997463
Showing 1 changed file with 45 additions and 33 deletions.
78 changes: 45 additions & 33 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use datatypes::schema::Schema;
use futures_util::{future, TryStreamExt};
use meter_macros::write_meter;
Expand Down Expand Up @@ -194,6 +194,41 @@ impl Inserter {
}

impl Inserter {
fn post_request(&self, requests: RegionInsertRequests) {
let node_manager = self.node_manager.clone();
let table_flow_manager = self.table_flow_manager.clone();
// Spawn all tasks that do job for mirror insert requests for flownode
common_runtime::spawn_bg(async move {
match Self::mirror_flow_node_requests(table_flow_manager, requests).await {
Ok(flow_tasks) => {
let flow_tasks = flow_tasks.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
common_runtime::spawn_write(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.map(|flow_response| RegionResponse {
affected_rows: flow_response.affected_rows as AffectedRows,
extension: flow_response.extension,
})
.context(RequestInsertsSnafu)
})
});

if let Err(err) = future::try_join_all(flow_tasks)
.await
.context(JoinTaskSnafu)
{
warn!(err; "Failed to insert data into flownode");
}
}
Err(err) => warn!(err; "Failed to mirror request to flownode"),
}
});
}

async fn do_request(
&self,
requests: RegionInsertRequests,
Expand All @@ -206,29 +241,8 @@ impl Inserter {
..Default::default()
});

// spawn all tasks that do job for mirror insert requests for flownode
let flow_tasks = self
.mirror_flow_node_requests(&requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
let node_manager = self.node_manager.clone();
common_runtime::spawn_write(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.map(|flow_response| RegionResponse {
affected_rows: flow_response.affected_rows as AffectedRows,
extension: flow_response.extension,
})
.context(RequestInsertsSnafu)
})
});

let tasks = self
.group_requests_by_peer(requests)
.group_requests_by_peer(requests.clone())
.await?
.into_iter()
.map(|(peer, inserts)| {
Expand All @@ -242,10 +256,9 @@ impl Inserter {
.await
.context(RequestInsertsSnafu)
})
})
.chain(flow_tasks);
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;

self.post_request(requests);
let affected_rows = results
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
Expand All @@ -259,21 +272,20 @@ impl Inserter {

/// Mirror requests for source table to flownode
async fn mirror_flow_node_requests(
&self,
requests: &RegionInsertRequests,
table_flow_manager: TableFlowManagerRef,
requests: RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
// store partial source table requests used by flow node(only store what's used)
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in &requests.requests {
for req in requests.requests {
match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
Some(Some((_peers, reqs))) => reqs.requests.push(req),
// already know this is not source table
Some(None) => continue,
_ => {
let table_id = RegionId::from_u64(req.region_id).table_id();
let peers = self
.table_flow_manager
let peers = table_flow_manager
.flows(table_id)
// TODO(discord9): determine where to store the flow node address in distributed mode
.map_ok(|key| Peer::new(key.flownode_id(), ""))
Expand All @@ -287,7 +299,7 @@ impl Inserter {

if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req.clone());
reqs.requests.push(req);
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
Expand Down

0 comments on commit d997463

Please sign in to comment.