Skip to content

Commit

Permalink
Arrow json decoding (#477)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Jan 3, 2024
1 parent 63c6da5 commit 0ead44e
Show file tree
Hide file tree
Showing 18 changed files with 622 additions and 388 deletions.
353 changes: 172 additions & 181 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ tonic-reflection = { version = "0.10" }
arrow = { version = "49.0.0" }
arrow-array = { version = "49.0.0" }
arrow-schema = { version = "49.0.0" }
arrow-json = { version = "49.0.0" }
object_store = { version = "0.8.0" }
parquet = { version = "49.0.0" }

Expand All @@ -52,4 +53,5 @@ arrow = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/par
arrow-buffer = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'}
arrow-array = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'}
arrow-schema = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/parquet_bytes'}
arrow-json = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '49.0.0/json' }
object_store = {git = 'https://github.com/ArroyoSystems/arrow-rs', branch = '0.8.0/put_part_api'}
3 changes: 2 additions & 1 deletion arroyo-df/src/external.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use arrow_schema::FieldRef;
use arroyo_datastream::logical::LogicalNode;
use std::time::Duration;

Expand All @@ -14,7 +15,7 @@ pub enum ProcessingMode {
#[derive(Clone, Debug)]
pub struct SqlSource {
pub id: Option<i64>,
pub struct_def: StructDef,
pub struct_def: Vec<FieldRef>,
pub config: ConnectorOp,
pub processing_mode: ProcessingMode,
pub idle_time: Option<Duration>,
Expand Down
25 changes: 17 additions & 8 deletions arroyo-df/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use anyhow::{anyhow, bail, Result};
use arrow::array::ArrayRef;
use arrow::datatypes::{self, DataType, Field};
use arrow_schema::{Schema, TimeUnit};
use arrow_schema::{FieldRef, Schema, TimeUnit};
use arroyo_connectors::Connection;
use arroyo_datastream::WindowType;

Expand Down Expand Up @@ -399,10 +399,11 @@ pub fn parse_dependencies(definition: &str) -> Result<String> {
};
}

fn create_table_source(table_name: String, fields: Vec<Field>) -> Arc<dyn TableSource> {
let schema = add_timestamp_field_if_missing_arrow(Arc::new(
datatypes::Schema::new_with_metadata(fields, HashMap::new()),
));
fn create_table_source(table_name: String, fields: Vec<FieldRef>) -> Arc<dyn TableSource> {
let schema = add_timestamp_field_if_missing_arrow(Arc::new(Schema::new_with_metadata(
fields,
HashMap::new(),
)));
let table_provider = LogicalBatchInput { table_name, schema };
let wrapped = Arc::new(table_provider);
let provider = DefaultTableSource::new(wrapped);
Expand Down Expand Up @@ -806,7 +807,11 @@ impl TreeNodeRewriter for QueryToGraphVisitor {
.fields()
.iter()
.map(|field| {
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
Arc::new(Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
))
})
.collect(),
);
Expand Down Expand Up @@ -887,11 +892,11 @@ impl TreeNodeRewriter for QueryToGraphVisitor {
.fields()
.iter()
.map(|field| {
Field::new(
Arc::new(Field::new(
field.name(),
field.data_type().clone(),
field.is_nullable(),
)
))
})
.collect(),
),
Expand Down Expand Up @@ -1027,6 +1032,10 @@ pub async fn parse_and_get_arrow_program(
};
}

if inserts.is_empty() {
bail!("The provided SQL does not contain a query");
}

let mut rewriter = QueryToGraphVisitor::default();
for insert in inserts {
let plan = match insert {
Expand Down
128 changes: 56 additions & 72 deletions arroyo-df/src/tables.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::str::FromStr;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};

use anyhow::{anyhow, bail, Result};
use arrow_schema::{DataType, Field};
use arrow_schema::{DataType, Field, FieldRef};
use arroyo_connectors::{connector_for_type, Connection};
use arroyo_datastream::ConnectorOp;
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, SchemaDefinition, SourceField,
};
use arroyo_rpc::formats::{BadData, Format, Framing};
use arroyo_types::ArroyoExtensionType;
use datafusion::sql::sqlparser::ast::Query;
use datafusion::{
optimizer::{analyzer::Analyzer, optimizer::Optimizer, OptimizerContext},
Expand Down Expand Up @@ -53,11 +55,8 @@ pub struct ConnectorTable {

#[derive(Debug, Clone)]
pub enum FieldSpec {
StructField(StructField),
VirtualField {
field: StructField,
expression: Expr,
},
StructField(Field),
VirtualField { field: Field, expression: Expr },
}

impl FieldSpec {
Expand All @@ -67,16 +66,16 @@ impl FieldSpec {
FieldSpec::VirtualField { .. } => true,
}
}
fn struct_field(&self) -> &StructField {
fn struct_field(&self) -> &Field {
match self {
FieldSpec::StructField(f) => f,
FieldSpec::VirtualField { field, .. } => field,
}
}
}

impl From<StructField> for FieldSpec {
fn from(value: StructField) -> Self {
impl From<Field> for FieldSpec {
fn from(value: Field) -> Self {
FieldSpec::StructField(value)
}
}
Expand Down Expand Up @@ -136,7 +135,8 @@ impl From<Connection> for ConnectorTable {
.iter()
.map(|f| {
let struct_field: StructField = f.clone().into();
struct_field.into()
let field: Field = struct_field.into();
field.into()
})
.collect(),
type_name: schema_type(&value.name, &value.schema),
Expand Down Expand Up @@ -165,14 +165,11 @@ impl ConnectorTable {
fields = fields
.into_iter()
.map(|field_spec| match &field_spec {
FieldSpec::StructField(struct_field) => match struct_field.data_type {
TypeDef::DataType(DataType::Timestamp(_, None), nullable) => {
let mut coerced = struct_field.clone();
coerced.data_type = TypeDef::DataType(
FieldSpec::StructField(struct_field) => match struct_field.data_type() {
DataType::Timestamp(_, None) => {
FieldSpec::StructField(struct_field.clone().with_data_type(
DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
nullable,
);
FieldSpec::StructField(coerced)
))
}
_ => field_spec,
},
Expand All @@ -197,8 +194,8 @@ impl ConnectorTable {
struct_field.clone().try_into().map_err(|_| {
anyhow!(
"field '{}' has a type '{:?}' that cannot be used in a connection table",
struct_field.name,
struct_field.data_type
struct_field.name(),
struct_field.data_type()
)
})
})
Expand Down Expand Up @@ -277,11 +274,8 @@ impl ConnectorTable {
.fields
.iter()
.find(|f| {
f.struct_field().name == *field_name
&& matches!(
f.struct_field().data_type,
TypeDef::DataType(DataType::Timestamp(..), _)
)
f.struct_field().name() == field_name
&& matches!(f.struct_field().data_type(), DataType::Timestamp(..))
})
.ok_or_else(|| {
anyhow!(
Expand All @@ -290,8 +284,7 @@ impl ConnectorTable {
)
})?;

Ok(Some(Expr::Column(Column::new(
field.struct_field().alias.as_ref().cloned(),
Ok(Some(Expr::Column(Column::from_name(
field.struct_field().name(),
))))
} else {
Expand All @@ -306,11 +299,8 @@ impl ConnectorTable {
.fields
.iter()
.find(|f| {
f.struct_field().name == *field_name
&& matches!(
f.struct_field().data_type,
TypeDef::DataType(DataType::Timestamp(..), _)
)
f.struct_field().name() == field_name
&& matches!(f.struct_field().data_type(), DataType::Timestamp(..))
})
.ok_or_else(|| {
anyhow!(
Expand All @@ -319,8 +309,7 @@ impl ConnectorTable {
)
})?;

Ok(Some(Expr::Column(Column::new(
field.struct_field().alias.as_ref().cloned(),
Ok(Some(Expr::Column(Column::from_name(
field.struct_field().name(),
))))
} else {
Expand Down Expand Up @@ -362,18 +351,14 @@ impl ConnectorTable {

let source = SqlSource {
id: self.id,
struct_def: StructDef::new(
self.type_name.clone(),
self.type_name.is_none(),
self.fields
.iter()
.filter_map(|field| match field {
FieldSpec::StructField(struct_field) => Some(struct_field.clone()),
FieldSpec::VirtualField { .. } => None,
})
.collect(),
self.format.clone(),
),
struct_def: self
.fields
.iter()
.filter_map(|field| match field {
FieldSpec::StructField(struct_field) => Some(Arc::new(struct_field.clone())),
FieldSpec::VirtualField { .. } => None,
})
.collect(),
config: self.connector_op(),
processing_mode: self.processing_mode(),
idle_time: self.idle_time,
Expand Down Expand Up @@ -469,7 +454,7 @@ pub enum Table {
ConnectorTable(ConnectorTable),
MemoryTable {
name: String,
fields: Vec<StructField>,
fields: Vec<FieldRef>,
},
TableFromQuery {
name: String,
Expand Down Expand Up @@ -499,14 +484,16 @@ impl Table {
.iter()
.map(|column| {
let name = column.name.value.to_string();
let data_type = convert_data_type(&column.data_type)?;
let (data_type, extension) = convert_data_type(&column.data_type)?;
let nullable = !column
.options
.iter()
.any(|option| matches!(option.option, ColumnOption::NotNull));

let struct_field =
StructField::new(name, None, TypeDef::DataType(data_type, nullable));
let struct_field = ArroyoExtensionType::add_metadata(
extension,
Field::new(name, data_type, nullable),
);

let generating_expression = column.options.iter().find_map(|option| {
if let ColumnOption::Generated {
Expand All @@ -522,27 +509,25 @@ impl Table {
})
.collect::<Result<Vec<_>>>()?;

let physical_struct: StructDef = StructDef::for_fields(
struct_field_pairs
.iter()
.filter_map(
|(field, generating_expression)| match generating_expression {
Some(_) => None,
None => Some(field.clone()),
},
)
.collect(),
);
let physical_fields: Vec<_> = struct_field_pairs
.iter()
.filter_map(
|(field, generating_expression)| match generating_expression {
Some(_) => None,
None => Some(field.clone()),
},
)
.collect();

let _physical_schema = DFSchema::new_with_metadata(
physical_struct
.fields
physical_fields
.iter()
.map(|f| {
let TypeDef::DataType(data_type, nullable) = f.data_type.clone() else {
bail!("expect data type for generated column")
};
Ok(DFField::new_unqualified(&f.name, data_type, nullable))
Ok(DFField::new_unqualified(
&f.name(),
f.data_type().clone(),
f.is_nullable(),
))
})
.collect::<Result<Vec<_>>>()?,
HashMap::new(),
Expand Down Expand Up @@ -615,7 +600,7 @@ impl Table {
name,
fields: fields
.into_iter()
.map(|f| f.struct_field().clone())
.map(|f| Arc::new(f.struct_field().clone()))
.collect(),
}))
}
Expand Down Expand Up @@ -701,18 +686,16 @@ impl Table {
Ok(())
}

pub fn get_fields(&self) -> Vec<Field> {
pub fn get_fields(&self) -> Vec<FieldRef> {
match self {
Table::MemoryTable { fields, .. } => {
fields.iter().map(|field| field.clone().into()).collect()
}
Table::MemoryTable { fields, .. } => fields.clone(),
Table::ConnectorTable(ConnectorTable {
fields,
inferred_fields,
..
}) => inferred_fields
.as_ref()
.map(|fs| fs.iter().map(qualified_field).collect())
.map(|fs| fs.iter().map(qualified_field).map(Arc::new).collect())
.unwrap_or_else(|| {
fields
.iter()
Expand All @@ -724,6 +707,7 @@ impl Table {
.fields()
.iter()
.map(qualified_field)
.map(Arc::new)
.collect(),
}
}
Expand Down
Loading

0 comments on commit 0ead44e

Please sign in to comment.