Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flow): query table schema&refactor #3943

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl TableInfoValue {
}

pub type TableInfoManagerRef = Arc<TableInfoManager>;
#[derive(Clone)]
pub struct TableInfoManager {
kv_backend: KvBackendRef,
}
Expand Down
9 changes: 8 additions & 1 deletion src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

pub(crate) mod error;
pub(crate) mod node_context;
mod table_source;
mod util;

pub(crate) use node_context::{FlowId, FlownodeContext, TableName};
pub(crate) use node_context::FlownodeContext;
pub(crate) use table_source::TableSource;

mod worker;

pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u32;
pub type TableName = [String; 3];
18 changes: 1 addition & 17 deletions src/flow/src/adapter/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid query plan: {source}"))]
InvalidQueryPlan {
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid query: prost can't decode substrait plan: {inner}"))]
InvalidQueryProst {
inner: api::DecodeError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid query, can't transform to substrait: {source}"))]
InvalidQuerySubstrait {
source: substrait::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Invalid query: {reason}"))]
InvalidQuery {
reason: String,
Expand Down Expand Up @@ -193,9 +179,7 @@ impl ErrorExt for Error {
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. } => StatusCode::TableNotFound,
Self::InvalidQueryPlan { .. }
| Self::InvalidQuerySubstrait { .. }
| Self::InvalidQueryProst { .. }
Self::InvalidQueryProst { .. }
| &Self::InvalidQuery { .. }
| &Self::Plan { .. }
| &Self::Datatypes { .. } => StatusCode::PlanQuery,
Expand Down
134 changes: 84 additions & 50 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,18 @@
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;

use common_telemetry::debug;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc};

use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::adapter::{FlowId, TableName, TableSource};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};

// TODO: refactor common types for flow to a separate module
/// FlowId is a unique identifier for a flow task
pub type FlowId = u64;
pub type TableName = [String; 3];

pub struct TableSource {}

impl TableSource {
pub async fn get_table_name_schema(
&self,
_table_id: &TableId,
) -> Result<(TableName, RelationType), Error> {
todo!()
}
}

/// A context that holds the information of the dataflow
#[derive(Default)]
pub struct FlownodeContext {
Expand All @@ -53,7 +39,7 @@ pub struct FlownodeContext {
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
/// Note that we are getting insert requests with table id, so we should use table id as the key
pub source_sender: BTreeMap<TableId, broadcast::Sender<DiffRow>>,
pub source_sender: BTreeMap<TableId, SourceSender>,
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
Expand All @@ -74,38 +60,90 @@ pub struct FlownodeContext {
pub query_context: Option<Arc<QueryContext>>,
}

/// a simple broadcast sender with backpressure and unbound capacity
///
/// receiver still use tokio broadcast channel, since only sender side need to know
/// backpressure and adjust dataflow running duration to avoid blocking
pub struct SourceSender {
sender: broadcast::Sender<DiffRow>,
send_buf: VecDeque<DiffRow>,
}

impl Default for SourceSender {
fn default() -> Self {
Self {
sender: broadcast::Sender::new(BROADCAST_CAP),
send_buf: Default::default(),
discord9 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

impl SourceSender {
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
self.sender.subscribe()
}

/// send as many as possible rows from send buf
/// until send buf is empty or broadchannel is full
pub fn try_send_all(&mut self) -> Result<usize, Error> {
let mut row_cnt = 0;
loop {
// if inner sender channel is empty or send buf is empty, there
// is nothing to do for now, just break
if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() {
break;
}
if let Some(row) = self.send_buf.pop_front() {
self.sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!("Failed to send row, error = {:?}", err),
}
.build()
})
.with_context(|_| EvalSnafu)?;
row_cnt += 1;
}
}
if row_cnt > 0 {
debug!("Send {} rows", row_cnt);
}

Ok(row_cnt)
}

/// return number of rows it actual send(including what's in the buffer)
pub fn send_rows(&mut self, rows: Vec<DiffRow>) -> Result<usize, Error> {
self.send_buf.extend(rows);

let row_cnt = self.try_send_all()?;

Ok(row_cnt)
}
}

impl FlownodeContext {
// return number of rows it actual send(including what's in the buffer)
/// return number of rows it actual send(including what's in the buffer)
///
/// TODO(discord9): make this concurrent
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.get_mut(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
let send_buffer = self.send_buffer.entry(table_id).or_default();
send_buffer.extend(rows);
let mut row_cnt = 0;
while let Some(row) = send_buffer.pop_front() {
if sender.len() >= BROADCAST_CAP {
break;
}
row_cnt += 1;
sender
.send(row)
.map_err(|err| {
InternalSnafu {
reason: format!(
"Failed to send row to table_id = {:?}, error = {:?}",
table_id, err
),
}
.build()
})
.with_context(|_| EvalSnafu)?;
}
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());
sender.send_rows(rows)
}

Ok(row_cnt)
/// flush all sender's buf
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
self.source_sender
.iter_mut()
.map(|(_table_id, src_sender)| src_sender.try_send_all())
.try_fold(0, |acc, x| x.map(|x| x + acc))
}
}

Expand All @@ -120,7 +158,7 @@ impl FlownodeContext {
sink_table_name: TableName,
) {
for source_table_id in source_table_ids {
self.add_source_sender(*source_table_id);
self.add_source_sender_if_not_exist(*source_table_id);
self.source_to_tasks
.entry(*source_table_id)
.or_default()
Expand All @@ -131,10 +169,9 @@ impl FlownodeContext {
self.flow_to_sink.insert(task_id, sink_table_name);
}

pub fn add_source_sender(&mut self, table_id: TableId) {
self.source_sender
.entry(table_id)
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
/// try add source sender, if already exist, do nothing
pub fn add_source_sender_if_not_exist(&mut self, table_id: TableId) {
let _sender = self.source_sender.entry(table_id).or_default();
}

pub fn add_sink_receiver(&mut self, table_name: TableName) {
Expand All @@ -143,10 +180,7 @@ impl FlownodeContext {
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
}

pub fn get_source_by_global_id(
&self,
id: &GlobalId,
) -> Result<&broadcast::Sender<DiffRow>, Error> {
pub fn get_source_by_global_id(&self, id: &GlobalId) -> Result<&SourceSender, Error> {
let table_id = self
.table_repr
.get_by_global_id(id)
Expand Down
Loading