Skip to content

Commit

Permalink
refactor: remove substrait serde for region query in standalone
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Apr 29, 2024
1 parent 336db38 commit 8aabf78
Show file tree
Hide file tree
Showing 21 changed files with 176 additions and 127 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ prost.workspace = true
rand.workspace = true
serde_json.workspace = true
snafu.workspace = true
substrait.workspace = true
tokio.workspace = true
tokio-stream = { workspace = true, features = ["net"] }
tonic.workspace = true
Expand All @@ -42,7 +43,6 @@ tonic.workspace = true
common-grpc-expr.workspace = true
datanode.workspace = true
derive-new = "0.5"
substrait.workspace = true
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

Expand Down
16 changes: 14 additions & 2 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
Expand All @@ -25,13 +25,14 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::node_manager::Datanode;
use common_meta::node_manager::{Datanode, QueryRequest};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use tokio_stream::StreamExt;

use crate::error::{
Expand Down Expand Up @@ -63,6 +64,17 @@ impl Datanode for RegionRequester {
}

async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
let plan = DFLogicalSubstraitConvertor
.encode(&request.plan)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)?
.to_vec();
let request = api::v1::region::QueryRequest {
header: request.header,
region_id: request.region_id.as_u64(),
plan,
};

let ticket = Ticket {
ticket: request.encode_to_vec().into(),
};
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
etcd-client.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod tests {
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
Expand All @@ -134,6 +134,7 @@ mod tests {
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::error::{self, Error, Result};
use crate::key::datanode_table::DatanodeTableKey;
use crate::node_manager::QueryRequest;
use crate::peer::Peer;
use crate::rpc::router::region_distribution;
use crate::table_name::TableName;
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl/test_util/datanode_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
use common_error::ext::{BoxedError, ErrorExt, StackError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
Expand All @@ -22,6 +22,7 @@ use snafu::{ResultExt, Snafu};
use tokio::sync::mpsc;

use crate::error::{self, Error, Result};
use crate::node_manager::QueryRequest;
use crate::peer::Peer;
use crate::test_util::MockDatanodeHandler;

Expand Down
16 changes: 15 additions & 1 deletion src/common/meta/src/node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,27 @@ use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse, InsertRequest};
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::{RegionRequest, RegionRequestHeader};
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;
use datafusion_expr::LogicalPlan;
use store_api::storage::RegionId;

use crate::error::Result;
use crate::peer::Peer;

/// The query request to be handled by the RegionServer(Datanode).
pub struct QueryRequest {
/// The header of this request. Often to store some context of the query. None means all to defaults.
pub header: Option<RegionRequestHeader>,

/// The id of the region to be queried.
pub region_id: RegionId,

/// The form of the query: a logical plan.
pub plan: LogicalPlan,
}

/// The trait for handling requests to datanode.
#[async_trait::async_trait]
pub trait Datanode: Send + Sync {
Expand Down
6 changes: 4 additions & 2 deletions src/common/meta/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::region::{QueryRequest, RegionRequest};
use api::v1::region::RegionRequest;
pub use common_base::AffectedRows;
use common_recordbatch::SendableRecordBatchStream;

Expand All @@ -26,7 +26,9 @@ use crate::error::Result;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef};
use crate::node_manager::{
Datanode, DatanodeRef, FlownodeRef, NodeManager, NodeManagerRef, QueryRequest,
};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
Expand Down
10 changes: 9 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ pub enum Error {
source: mito2::error::Error,
location: Location,
},

#[snafu(display("DataFusion"))]
DataFusion {
#[snafu(source)]
error: datafusion::error::DataFusionError,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -325,7 +332,8 @@ impl ErrorExt for Error {
| IncorrectInternalState { .. }
| ShutdownInstance { .. }
| RegionEngineNotFound { .. }
| UnsupportedOutput { .. } => StatusCode::Internal,
| UnsupportedOutput { .. }
| DataFusion { .. } => StatusCode::Internal,

RegionNotFound { .. } => StatusCode::RegionNotFound,
RegionNotReady { .. } => StatusCode::RegionNotReady,
Expand Down
Loading

0 comments on commit 8aabf78

Please sign in to comment.