Skip to content

Commit

Permalink
paginating query results
Browse files Browse the repository at this point in the history
  • Loading branch information
l1xnan committed Feb 2, 2024
1 parent 3c74ad3 commit fda2253
Showing 1 changed file with 20 additions and 25 deletions.
45 changes: 20 additions & 25 deletions src-tauri/src/dialect/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use arrow::array::*;
use arrow::datatypes::*;
use async_trait::async_trait;
use chrono::naive::NaiveDate;
use chrono::offset;
use chrono::prelude::*;
use chrono::DateTime;
use chrono_tz::Tz;
Expand Down Expand Up @@ -58,6 +59,20 @@ impl Dialect for ClickhouseDialect {
}

async fn query(&self, sql: &str, limit: usize, offset: usize) -> anyhow::Result<ArrowData> {
if limit == 0 && offset == 0 {
self.fetch_all(sql).await
} else {
self.fetch_many(sql, limit, offset).await
}
}
}

impl ClickhouseDialect {
fn get_schema(&self) -> Vec<Table> {
vec![]
}

async fn fetch_all(&self, sql: &str) -> anyhow::Result<ArrowData> {
let pool = Pool::new(self.get_url());
let mut client = pool.get_handle().await?;
let mut stream = client.query(sql).stream_blocks();
Expand All @@ -76,26 +91,6 @@ impl Dialect for ClickhouseDialect {
preview: serialize_preview(&batch)?,
})
}
}

impl ClickhouseDialect {
fn get_schema(&self) -> Vec<Table> {
vec![]
}

// pub async fn query(&self, sql: &str) -> anyhow::Result<ArrowData> {
// let pool = Pool::new(self.get_url());
// let mut client = pool.get_handle().await?;
//
// let block = client.query(sql).fetch_all().await?;
//
// let batch = block_to_arrow(&block)?;
//
// Ok(ArrowData {
// total_count: batch.num_rows(),
// preview: serialize_preview(&batch)?,
// })
// }

pub async fn query_stream(&self, window: Window, sql: &str) -> anyhow::Result<()> {
let pool = Pool::new(self.get_url());
Expand Down Expand Up @@ -155,20 +150,20 @@ impl ClickhouseDialect {
limit: usize,
offset: usize,
) -> anyhow::Result<ArrowData> {
println!("sql: {}, limit={}, offset={}", sql, limit, offset);

let pool = Pool::new(self.get_url());
let mut client = pool.get_handle().await?;
let mut stream = client.query(sql).stream_blocks();

let mut batchs = vec![];

let mut offset: usize = offset;
let mut offset = offset;
let mut row_count = 0;
let mut total = 0;
while let Some(block) = stream.next().await {
let block = block?;
let count = block.row_count();
if offset - count >= 0 {
total += count;
if (offset as i64 - count as i64) >= 0 {
offset -= count;
continue;
}
Expand All @@ -187,7 +182,7 @@ impl ClickhouseDialect {
let preview = batch.slice(offset, std::cmp::min(limit, total - offset));

Ok(ArrowData {
total_count: preview.num_rows(),
total_count: total,
preview: serialize_preview(&preview)?,
})
}
Expand Down

0 comments on commit fda2253

Please sign in to comment.