From fda225325f526b764cd62a6b7c6c3c2af7e18841 Mon Sep 17 00:00:00 2001 From: l1xnan Date: Fri, 2 Feb 2024 15:06:25 +0800 Subject: [PATCH] paginating query results --- src-tauri/src/dialect/clickhouse.rs | 45 +++++++++++++---------------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/src-tauri/src/dialect/clickhouse.rs b/src-tauri/src/dialect/clickhouse.rs index e78efe2..646d250 100644 --- a/src-tauri/src/dialect/clickhouse.rs +++ b/src-tauri/src/dialect/clickhouse.rs @@ -5,6 +5,7 @@ use arrow::array::*; use arrow::datatypes::*; use async_trait::async_trait; use chrono::naive::NaiveDate; +use chrono::offset; use chrono::prelude::*; use chrono::DateTime; use chrono_tz::Tz; @@ -58,6 +59,20 @@ impl Dialect for ClickhouseDialect { } async fn query(&self, sql: &str, limit: usize, offset: usize) -> anyhow::Result { + if limit == 0 && offset == 0 { + self.fetch_all(sql).await + } else { + self.fetch_many(sql, limit, offset).await + } + } +} + +impl ClickhouseDialect { + fn get_schema(&self) -> Vec { + vec![] + } + + async fn fetch_all(&self, sql: &str) -> anyhow::Result { let pool = Pool::new(self.get_url()); let mut client = pool.get_handle().await?; let mut stream = client.query(sql).stream_blocks(); @@ -76,26 +91,6 @@ impl Dialect for ClickhouseDialect { preview: serialize_preview(&batch)?, }) } -} - -impl ClickhouseDialect { - fn get_schema(&self) -> Vec
{ - vec![] - } - - // pub async fn query(&self, sql: &str) -> anyhow::Result { - // let pool = Pool::new(self.get_url()); - // let mut client = pool.get_handle().await?; - // - // let block = client.query(sql).fetch_all().await?; - // - // let batch = block_to_arrow(&block)?; - // - // Ok(ArrowData { - // total_count: batch.num_rows(), - // preview: serialize_preview(&batch)?, - // }) - // } pub async fn query_stream(&self, window: Window, sql: &str) -> anyhow::Result<()> { let pool = Pool::new(self.get_url()); @@ -155,20 +150,20 @@ impl ClickhouseDialect { limit: usize, offset: usize, ) -> anyhow::Result { - println!("sql: {}, limit={}, offset={}", sql, limit, offset); - let pool = Pool::new(self.get_url()); let mut client = pool.get_handle().await?; let mut stream = client.query(sql).stream_blocks(); let mut batchs = vec![]; - let mut offset: usize = offset; + let mut offset = offset; let mut row_count = 0; + let mut total = 0; while let Some(block) = stream.next().await { let block = block?; let count = block.row_count(); - if offset - count >= 0 { + total += count; + if (offset as i64 - count as i64) >= 0 { offset -= count; continue; } @@ -187,7 +182,7 @@ impl ClickhouseDialect { let preview = batch.slice(offset, std::cmp::min(limit, total - offset)); Ok(ArrowData { - total_count: preview.num_rows(), + total_count: total, preview: serialize_preview(&preview)?, }) }