Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pagination to query Execution #141

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
265 changes: 239 additions & 26 deletions src/app/app_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,69 @@
//! [`AppExecution`]: Handles executing queries for the TUI application.

use crate::app::state::tabs::sql::Query;
use crate::app::AppEvent;
use crate::app::{AppEvent, ExecutionError, ExecutionResultsBatch};
use crate::execution::ExecutionContext;
use arrow_flight::decode::FlightRecordBatchStream;
use color_eyre::eyre::Result;
use datafusion::arrow::array::RecordBatch;
use datafusion::execution::context::SessionContext;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
use datafusion::physical_plan::{
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
};
use futures::StreamExt;
use log::{error, info};
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use tonic::IntoRequest;

#[cfg(feature = "flightsql")]
use {arrow_flight::sql::client::FlightSqlServiceClient, tonic::transport::Channel};

/// Handles executing queries for the TUI application, formatting results
/// and sending them to the UI.
pub(crate) struct AppExecution {
inner: Arc<ExecutionContext>,
// TODO: Store the SQL with the stream
result_stream: Arc<Mutex<Option<SendableRecordBatchStream>>>,
// TODO: Store the SQL with the stream
flight_result_stream: Arc<Mutex<Option<FlightRecordBatchStream>>>,
flight_results_current_row_start: Option<usize>,
}

impl AppExecution {
/// Create a new instance of [`AppExecution`].
pub fn new(inner: Arc<ExecutionContext>) -> Self {
Self { inner }
Self {
inner,
result_stream: Arc::new(Mutex::new(None)),
flight_result_stream: Arc::new(Mutex::new(None)),
flight_results_current_row_start: None,
}
}

pub fn session_ctx(&self) -> &SessionContext {
self.inner.session_ctx()
}

#[cfg(feature = "flightsql")]
pub fn flightsql_client(&self) -> &Mutex<Option<FlightSqlServiceClient<Channel>>> {
self.inner.flightsql_client()
}

pub async fn set_result_stream(&self, stream: SendableRecordBatchStream) {
let mut s = self.result_stream.lock().await;
*s = Some(stream)
}

#[cfg(feature = "flightsql")]
pub async fn set_flight_result_stream(&self, stream: FlightRecordBatchStream) {
let mut s = self.flight_result_stream.lock().await;
*s = Some(stream)
}

/// Run the sequence of SQL queries, sending the results as [`AppEvent::QueryResult`] via the sender.
Expand All @@ -60,33 +104,53 @@ impl AppExecution {
let start = std::time::Instant::now();
if i == statement_count - 1 {
info!("Executing last query and display results");
match self.inner.execute_sql(sql).await {
Ok(mut stream) => {
let mut batches = Vec::new();
while let Some(maybe_batch) = stream.next().await {
match maybe_batch {
Ok(batch) => {
batches.push(batch);
}
Err(e) => {
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
break;
sender.send(AppEvent::NewExecution)?;
match self.inner.create_physical_plan(sql).await {
Ok(plan) => match execute_stream(plan, self.inner.session_ctx().task_ctx()) {
Ok(stream) => {
self.set_result_stream(stream).await;
let mut stream = self.result_stream.lock().await;
if let Some(s) = stream.as_mut() {
if let Some(b) = s.next().await {
match b {
Ok(b) => {
let duration = start.elapsed();
let results = ExecutionResultsBatch {
query: sql.to_string(),
batch: b,
duration,
};
sender.send(AppEvent::ExecutionResultsNextPage(
results,
))?;
}
Err(e) => {
error!("Error getting RecordBatch: {:?}", e);
}
}
}
}
}
Err(stream_err) => {
error!("Error creating physical plan: {:?}", stream_err);
let elapsed = start.elapsed();
let e = ExecutionError {
query: sql.to_string(),
error: stream_err.to_string(),
duration: elapsed,
};
sender.send(AppEvent::ExecutionResultsError(e))?;
}
},
Err(plan_err) => {
error!("Error creating physical plan: {:?}", plan_err);
let elapsed = start.elapsed();
let rows: usize = batches.iter().map(|r| r.num_rows()).sum();
query.set_results(Some(batches));
query.set_num_rows(Some(rows));
query.set_execution_time(elapsed);
}
Err(e) => {
error!("Error creating dataframe: {:?}", e);
let elapsed = start.elapsed();
query.set_error(Some(e.to_string()));
query.set_execution_time(elapsed);
let e = ExecutionError {
query: sql.to_string(),
error: plan_err.to_string(),
duration: elapsed,
};
sender.send(AppEvent::ExecutionResultsError(e))?;
}
}
} else {
Expand All @@ -107,4 +171,153 @@ impl AppExecution {
}
Ok(())
}

pub async fn run_flightsqls(&self, sqls: Vec<&str>, sender: UnboundedSender<AppEvent>) {
info!("Running sqls: {:?}", sqls);
let non_empty_sqls: Vec<&str> = sqls.into_iter().filter(|s| !s.is_empty()).collect();
info!("Non empty SQLs: {:?}", non_empty_sqls);
let statement_count = non_empty_sqls.len();
for (i, sql) in non_empty_sqls.into_iter().enumerate() {
let client = self.flightsql_client();
// let mut query =
// FlightSQLQuery::new(sql.clone(), None, None, None, Duration::default(), None);
let start = Instant::now();
if let Some(ref mut c) = *client.lock().await {
match c.execute(sql.to_string(), None).await {
Ok(flight_info) => {
for endpoint in flight_info.endpoint {
if let Some(ticket) = endpoint.ticket {
match c.do_get(ticket.into_request()).await {
Ok(stream) => {
self.set_flight_result_stream(stream).await;
let mut stream = self.flight_result_stream.lock().await;
if let Some(s) = stream.as_mut() {
if let Some(b) = s.next().await {
match b {
Ok(b) => {
let results = ExecutionResultsBatch {
query: sql.to_string(),
batch: b,
duration: start.elapsed(),
};
let _ = sender.send(AppEvent::FlightSQLExecutionResultsNextPage(results));
}
Err(e) => {
error!(
"Error getting RecordBatch: {:?}",
e
);
let e = ExecutionError {
query: sql.to_string(),
error: e.to_string(),
duration: start.elapsed(),
};
let _ = sender.send(AppEvent::FlightSQLExecutionResultsError(e));
}
}
}
}
}
Err(e) => {
error!("Error getting RecordBatch: {:?}", e);
}
}
} else {
error!("No ticket in endpoint");
}
}
}
Err(e) => {
error!("Error executing FlightSQL query: {:?}", e);
}
}
}
}
}

pub async fn next_batch(&self, sql: String, sender: UnboundedSender<AppEvent>) {
let mut stream = self.result_stream.lock().await;
if let Some(s) = stream.as_mut() {
let start = std::time::Instant::now();
if let Some(b) = s.next().await {
match b {
Ok(b) => {
let duration = start.elapsed();
let results = ExecutionResultsBatch {
query: sql,
batch: b,
duration,
};
let _ = sender.send(AppEvent::ExecutionResultsNextPage(results));
}
Err(e) => {
error!("Error getting RecordBatch: {:?}", e);
}
}
}
}
}
}

// #[derive(Debug, Clone)]
// pub struct ExecMetrics {
// name: String,
// bytes_scanned: usize,
// }

#[derive(Clone, Debug)]
pub struct ExecutionStats {
// bytes_scanned: usize,
// exec_metrics: Vec<ExecMetrics>,
}

// impl ExecutionStats {
// pub fn bytes_scanned(&self) -> usize {
// self.bytes_scanned
// }
// }

#[derive(Default)]
struct PlanVisitor {
total_bytes_scanned: usize,
// exec_metrics: Vec<ExecMetrics>,
}

impl From<PlanVisitor> for ExecutionStats {
fn from(value: PlanVisitor) -> Self {
Self {
// bytes_scanned: value.total_bytes_scanned,
}
}
}

impl ExecutionPlanVisitor for PlanVisitor {
type Error = datafusion_common::DataFusionError;

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
match plan.metrics() {
Some(metrics) => match metrics.sum_by_name("bytes_scanned") {
Some(bytes_scanned) => {
info!("Adding {} to total_bytes_scanned", bytes_scanned.as_usize());
self.total_bytes_scanned += bytes_scanned.as_usize();
}
None => {
info!("No bytes_scanned for {}", plan.name())
}
},
None => {
info!("No MetricsSet for {}", plan.name())
}
}
Ok(true)
}
}

pub fn collect_plan_stats(plan: Arc<dyn ExecutionPlan>) -> Option<ExecutionStats> {
let mut visitor = PlanVisitor::default();
if visit_execution_plan(plan.as_ref(), &mut visitor).is_ok() {
Some(visitor.into())
} else {
None
}
}
Loading
Loading