Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow json decoding #477

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
353 changes: 172 additions & 181 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tonic-reflection = { version = "0.10" }
arrow = { version = "49.0.0" }
arrow-array = { version = "49.0.0" }
arrow-schema = { version = "49.0.0" }
arrow-json = { version = "49.0.0" }
object_store = { version = "0.8.0" }
parquet = { version = "49.0.0" }

Expand All @@ -52,4 +53,5 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/par
arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'}
arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'}
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/json' }
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '0.8.0/put_part_api'}
3 changes: 2 additions & 1 deletion arroyo-df/src/external.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use arrow_schema::FieldRef;
use arroyo_datastream::logical::LogicalNode;
use std::time::Duration;

Expand All @@ -14,7 +15,7 @@ pub enum ProcessingMode {
#[derive(Clone, Debug)]
pub struct SqlSource {
pub id: Option<i64>,
pub struct_def: StructDef,
pub struct_def: Vec<FieldRef>,
pub config: ConnectorOp,
pub processing_mode: ProcessingMode,
pub idle_time: Option<Duration>,
Expand Down
25 changes: 17 additions & 8 deletions arroyo-df/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use anyhow::{anyhow, bail, Result};
use arrow::array::ArrayRef;
use arrow::datatypes::{self, DataType, Field};
use arrow_schema::{Schema, TimeUnit};
use arrow_schema::{FieldRef, Schema, TimeUnit};
use arroyo_connectors::Connection;
use arroyo_datastream::WindowType;

Expand Down Expand Up @@ -399,10 +399,11 @@ pub fn parse_dependencies(definition: &str) -> Result<String> {
};
}

fn create_table_source(table_name: String, fields: Vec<Field>) -> Arc<dyn TableSource> {
let schema = add_timestamp_field_if_missing_arrow(Arc::new(
datatypes::Schema::new_with_metadata(fields, HashMap::new()),
));
fn create_table_source(table_name: String, fields: Vec<FieldRef>) -> Arc<dyn TableSource> {
let schema = add_timestamp_field_if_missing_arrow(Arc::new(Schema::new_with_metadata(
fields,
HashMap::new(),
)));
let table_provider = LogicalBatchInput { table_name, schema };
let wrapped = Arc::new(table_provider);
let provider = DefaultTableSource::new(wrapped);
Expand Down Expand Up @@ -806,7 +807,11 @@ impl TreeNodeRewriter for QueryToGraphVisitor {
.fields()
.iter()
.map(|field| {
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
Arc::new(Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
))
})
.collect(),
);
Expand Down Expand Up @@ -887,11 +892,11 @@ impl TreeNodeRewriter for QueryToGraphVisitor {
.fields()
.iter()
.map(|field| {
Field::new(
Arc::new(Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
)
))
})
.collect(),
),
Expand Down Expand Up @@ -1027,6 +1032,10 @@ pub async fn parse_and_get_arrow_program(
};
}

if inserts.is_empty() {
bail!("The provided SQL does not contain a query");
}

let mut rewriter = QueryToGraphVisitor::default();
for insert in inserts {
let plan = match insert {
Expand Down
128 changes: 56 additions & 72 deletions arroyo-df/src/tables.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::str::FromStr;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};

use anyhow::{anyhow, bail, Result};
use arrow_schema::{DataType, Field};
use arrow_schema::{DataType, Field, FieldRef};
use arroyo_connectors::{connector_for_type, Connection};
use arroyo_datastream::ConnectorOp;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, SchemaDefinition, SourceField,
};
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_types::ArroyoExtensionType;
use datafusion::sql::sqlparser::ast::Query;
use datafusion::{
optimizer::{analyzer::Analyzer, optimizer::Optimizer, OptimizerContext},
Expand Down Expand Up @@ -53,11 +55,8 @@ pub struct ConnectorTable {

#[derive(Debug, Clone)]
pub enum FieldSpec {
StructField(StructField),
VirtualField {
field: StructField,
expression: Expr,
},
StructField(Field),
VirtualField { field: Field, expression: Expr },
}

impl FieldSpec {
Expand All @@ -67,16 +66,16 @@ impl FieldSpec {
FieldSpec::VirtualField { .. } => true,
}
}
fn struct_field(&self) -> &StructField {
fn struct_field(&self) -> &Field {
match self {
FieldSpec::StructField(f) => f,
FieldSpec::VirtualField { field, .. } => field,
}
}
}

impl From<StructField> for FieldSpec {
fn from(value: StructField) -> Self {
impl From<Field> for FieldSpec {
fn from(value: Field) -> Self {
FieldSpec::StructField(value)
}
}
Expand Down Expand Up @@ -136,7 +135,8 @@ impl From<Connection> for ConnectorTable {
.iter()
.map(|f| {
let struct_field: StructField = f.clone().into();
struct_field.into()
let field: Field = struct_field.into();
field.into()
})
.collect(),
type_name: schema_type(&value.name, &value.schema),
Expand Down Expand Up @@ -165,14 +165,11 @@ impl ConnectorTable {
fields = fields
.into_iter()
.map(|field_spec| match &field_spec {
FieldSpec::StructField(struct_field) => match struct_field.data_type {
TypeDef::DataType(DataType::Timestamp(_, None), nullable) => {
let mut coerced = struct_field.clone();
coerced.data_type = TypeDef::DataType(
FieldSpec::StructField(struct_field) => match struct_field.data_type() {
DataType::Timestamp(_, None) => {
FieldSpec::StructField(struct_field.clone().with_data_type(
DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
nullable,
);
FieldSpec::StructField(coerced)
))
}
_ => field_spec,
},
Expand All @@ -197,8 +194,8 @@ impl ConnectorTable {
struct_field.clone().try_into().map_err(|_| {
anyhow!(
"field '{}' has a type '{:?}' that cannot be used in a connection table",
struct_field.name,
struct_field.data_type
struct_field.name(),
struct_field.data_type()
)
})
})
Expand Down Expand Up @@ -277,11 +274,8 @@ impl ConnectorTable {
.fields
.iter()
.find(|f| {
f.struct_field().name == *field_name
&& matches!(
f.struct_field().data_type,
TypeDef::DataType(DataType::Timestamp(..), _)
)
f.struct_field().name() == field_name
&& matches!(f.struct_field().data_type(), DataType::Timestamp(..))
})
.ok_or_else(|| {
anyhow!(
Expand All @@ -290,8 +284,7 @@ impl ConnectorTable {
)
})?;

Ok(Some(Expr::Column(Column::new(
field.struct_field().alias.as_ref().cloned(),
Ok(Some(Expr::Column(Column::from_name(
field.struct_field().name(),
))))
} else {
Expand All @@ -306,11 +299,8 @@ impl ConnectorTable {
.fields
.iter()
.find(|f| {
f.struct_field().name == *field_name
&& matches!(
f.struct_field().data_type,
TypeDef::DataType(DataType::Timestamp(..), _)
)
f.struct_field().name() == field_name
&& matches!(f.struct_field().data_type(), DataType::Timestamp(..))
})
.ok_or_else(|| {
anyhow!(
Expand All @@ -319,8 +309,7 @@ impl ConnectorTable {
)
})?;

Ok(Some(Expr::Column(Column::new(
field.struct_field().alias.as_ref().cloned(),
Ok(Some(Expr::Column(Column::from_name(
field.struct_field().name(),
))))
} else {
Expand Down Expand Up @@ -362,18 +351,14 @@ impl ConnectorTable {

let source = SqlSource {
id: self.id,
struct_def: StructDef::new(
self.type_name.clone(),
self.type_name.is_none(),
self.fields
.iter()
.filter_map(|field| match field {
FieldSpec::StructField(struct_field) => Some(struct_field.clone()),
FieldSpec::VirtualField { .. } => None,
})
.collect(),
self.format.clone(),
),
struct_def: self
.fields
.iter()
.filter_map(|field| match field {
FieldSpec::StructField(struct_field) => Some(Arc::new(struct_field.clone())),
FieldSpec::VirtualField { .. } => None,
})
.collect(),
config: self.connector_op(),
processing_mode: self.processing_mode(),
idle_time: self.idle_time,
Expand Down Expand Up @@ -469,7 +454,7 @@ pub enum Table {
ConnectorTable(ConnectorTable),
MemoryTable {
name: String,
fields: Vec<StructField>,
fields: Vec<FieldRef>,
},
TableFromQuery {
name: String,
Expand Down Expand Up @@ -499,14 +484,16 @@ impl Table {
.iter()
.map(|column| {
let name = column.name.value.to_string();
let data_type = convert_data_type(&column.data_type)?;
let (data_type, extension) = convert_data_type(&column.data_type)?;
let nullable = !column
.options
.iter()
.any(|option| matches!(option.option, ColumnOption::NotNull));

let struct_field =
StructField::new(name, None, TypeDef::DataType(data_type, nullable));
let struct_field = ArroyoExtensionType::add_metadata(
extension,
Field::new(name, data_type, nullable),
);

let generating_expression = column.options.iter().find_map(|option| {
if let ColumnOption::Generated {
Expand All @@ -522,27 +509,25 @@ impl Table {
})
.collect::<Result<Vec<_>>>()?;

let physical_struct: StructDef = StructDef::for_fields(
struct_field_pairs
.iter()
.filter_map(
|(field, generating_expression)| match generating_expression {
Some(_) => None,
None => Some(field.clone()),
},
)
.collect(),
);
let physical_fields: Vec<_> = struct_field_pairs
.iter()
.filter_map(
|(field, generating_expression)| match generating_expression {
Some(_) => None,
None => Some(field.clone()),
},
)
.collect();

let _physical_schema = DFSchema::new_with_metadata(
physical_struct
.fields
physical_fields
.iter()
.map(|f| {
let TypeDef::DataType(data_type, nullable) = f.data_type.clone() else {
bail!("expect data type for generated column")
};
Ok(DFField::new_unqualified(&f.name, data_type, nullable))
Ok(DFField::new_unqualified(
&f.name(),
f.data_type().clone(),
f.is_nullable(),
))
})
.collect::<Result<Vec<_>>>()?,
HashMap::new(),
Expand Down Expand Up @@ -615,7 +600,7 @@ impl Table {
name,
fields: fields
.into_iter()
.map(|f| f.struct_field().clone())
.map(|f| Arc::new(f.struct_field().clone()))
.collect(),
}))
}
Expand Down Expand Up @@ -701,18 +686,16 @@ impl Table {
Ok(())
}

pub fn get_fields(&self) -> Vec<Field> {
pub fn get_fields(&self) -> Vec<FieldRef> {
match self {
Table::MemoryTable { fields, .. } => {
fields.iter().map(|field| field.clone().into()).collect()
}
Table::MemoryTable { fields, .. } => fields.clone(),
Table::ConnectorTable(ConnectorTable {
fields,
inferred_fields,
..
}) => inferred_fields
.as_ref()
.map(|fs| fs.iter().map(qualified_field).collect())
.map(|fs| fs.iter().map(qualified_field).map(Arc::new).collect())
.unwrap_or_else(|| {
fields
.iter()
Expand All @@ -724,6 +707,7 @@ impl Table {
.fields()
.iter()
.map(qualified_field)
.map(Arc::new)
.collect(),
}
}
Expand Down
Loading
Loading