Skip to content

Commit

Permalink
refactor: remove substrait serde for region query in standalone
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Jun 6, 2024
1 parent 2ade511 commit 21cfdf3
Show file tree
Hide file tree
Showing 23 changed files with 253 additions and 212 deletions.
156 changes: 77 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
prometheus.workspace = true
prost.workspace = true
query.workspace = true
rand.workspace = true
serde_json.workspace = true
snafu.workspace = true
substrait.workspace = true
tokio.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
tonic.workspace = true
Expand All @@ -42,7 +44,6 @@ tonic.workspace = true
common-grpc-expr.workspace = true
datanode.workspace = true
derive-new = "0.5"
substrait.workspace = true
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand Down
17 changes: 15 additions & 2 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
Expand All @@ -25,13 +25,15 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::node_manager::Datanode;
use common_meta::node_manager::{Datanode, QueryRequest};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use query::query_engine::DefaultSerializer;
use snafu::{location, Location, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use tokio_stream::StreamExt;

use crate::error::{
Expand Down Expand Up @@ -63,6 +65,17 @@ impl Datanode for RegionRequester {
}

async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let plan = DFLogicalSubstraitConvertor
.encode(&request.plan, DefaultSerializer)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?
.to_vec();
let request = api::v1::region::QueryRequest {
header: request.header,
region_id: request.region_id.as_u64(),
plan,
};

let ticket = Ticket {
ticket: request.encode_to_vec().into(),
};
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
Expand All @@ -143,6 +143,7 @@ mod tests {
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::error::{self, Error, Result};
use crate::key::datanode_table::DatanodeTableKey;
use crate::node_manager::QueryRequest;
use crate::peer::Peer;
use crate::rpc::router::region_distribution;
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/test_util/datanode_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use common_error::ext::{BoxedError, ErrorExt, StackError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
Expand All @@ -22,6 +22,7 @@ use snafu::{ResultExt, Snafu};
use tokio::sync::mpsc;

use crate::error::{self, Error, Result};
use crate::node_manager::QueryRequest;
use crate::peer::Peer;
use crate::test_util::MockDatanodeHandler;

Expand Down
16 changes: 15 additions & 1 deletion src/common/meta/src/node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,27 @@ use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
use api::v1::region::{InsertRequests, RegionRequest, RegionRequestHeader};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use datafusion_expr::LogicalPlan;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::peer::Peer;

/// The query request to be handled by the RegionServer(Datanode).
pub struct QueryRequest {
/// The header of this request. Often to store some context of the query. None means all to defaults.
pub header: Option<RegionRequestHeader>,

/// The id of the region to be queried.
pub region_id: RegionId,

/// The form of the query: a logical plan.
pub plan: LogicalPlan,
}

/// The trait for handling requests to datanode.
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

Expand All @@ -30,7 +30,7 @@ use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::node_manager::{
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef, QueryRequest,
};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
Expand Down
1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
toml.workspace = true
Expand Down
13 changes: 11 additions & 2 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub enum Error {
DecodeLogicalPlan {
#[snafu(implicit)]
location: Location,
source: common_query::error::Error,
source: substrait::error::Error,
},

#[snafu(display("Incorrect internal state: {}", state))]
Expand Down Expand Up @@ -387,6 +387,14 @@ pub enum Error {
location: Location,
source: BoxedError,
},

#[snafu(display("DataFusion"))]
DataFusion {
#[snafu(source)]
error: datafusion::error::DataFusionError,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -439,7 +447,8 @@ impl ErrorExt for Error {
| IncorrectInternalState { .. }
| ShutdownInstance { .. }
| RegionEngineNotFound { .. }
| UnsupportedOutput { .. } => StatusCode::Internal,
| UnsupportedOutput { .. }
| DataFusion { .. } => StatusCode::Internal,

RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
Expand Down
Loading

0 comments on commit 21cfdf3

Please sign in to comment.