Skip to content

Commit

Permalink
update to datafusion 39, test on MSRV 1.70.0
Browse files Browse the repository at this point in the history
  • Loading branch information
davidhewitt committed Jun 11, 2024
1 parent 8fc08ee commit d9b87ed
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 45 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,32 @@ jobs:
PRE_COMMIT_COLOR: always
SKIP: test

resolve:
runs-on: ubuntu-latest
outputs:
MSRV: ${{ steps.resolve-msrv.outputs.MSRV }}
steps:
- uses: actions/checkout@v4

- name: set up python
uses: actions/setup-python@v5
with:
python-version: '3.12'

- name: resolve MSRV
id: resolve-msrv
run:
echo MSRV=`python -c 'import tomllib; print(tomllib.load(open("Cargo.toml", "rb"))["workspace"]["package"]["rust-version"])'` >> $GITHUB_OUTPUT

test:
needs: [resolve]
name: test rust-${{ matrix.rust-version }}
strategy:
fail-fast: false
matrix:
rust-version: [stable, nightly]
include:
- rust-version: ${{ needs.resolve.outputs.MSRV }}

runs-on: ubuntu-latest

Expand Down
21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ license = "Apache-2.0"
keywords = ["datafusion", "JSON", "SQL"]
categories = ["database-implementations", "parsing"]
repository = "https://github.com/datafusion-contrib/datafusion-functions-json/"
rust-version = "1.70.0"

[dependencies]
arrow = ">=51"
arrow-schema = ">=51"
datafusion-common = ">=38"
datafusion-expr = ">=38"
jiter = ">=0.3"
paste = ">=1.0.14"
log = ">=0.4.21"
datafusion-execution = ">=38"
arrow = "52"
arrow-schema = "52"
datafusion-common = "39"
datafusion-expr = "39"
jiter = "=0.4"
paste = "1"
log = "0.4"
datafusion-execution = "39"

[dev-dependencies]
datafusion = "38.0.0"
tokio = { version = "1.37.0", features = ["full"] }
datafusion = "39"
tokio = { version = "1.37", features = ["full"] }

[lints.clippy]
dbg_macro = "deny"
Expand Down
2 changes: 1 addition & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ fn scalar_apply_iter<'a, 'j, C: FromIterator<Option<I>> + 'static, I>(

pub fn jiter_json_find<'j>(opt_json: Option<&'j str>, path: &[JsonPath]) -> Option<(Jiter<'j>, Peek)> {
if let Some(json_str) = opt_json {
let mut jiter = Jiter::new(json_str.as_bytes(), false);
let mut jiter = Jiter::new(json_str.as_bytes());
if let Ok(peek) = jiter.peek() {
if let Ok(peek_found) = jiter_json_find_step(&mut jiter, peek, path) {
return Some((jiter, peek_found));
Expand Down
65 changes: 36 additions & 29 deletions src/common_union.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

use arrow::array::{Array, BooleanArray, Float64Array, Int64Array, StringArray, UnionArray};
use arrow::buffer::Buffer;
Expand Down Expand Up @@ -36,10 +36,7 @@ impl JsonUnion {
}

pub fn data_type() -> DataType {
DataType::Union(
UnionFields::new(TYPE_IDS.to_vec(), union_fields().to_vec()),
UnionMode::Sparse,
)
DataType::Union(union_fields(), UnionMode::Sparse)
}

fn push(&mut self, field: JsonUnionField) {
Expand All @@ -58,7 +55,7 @@ impl JsonUnion {
}

fn push_none(&mut self) {
self.type_ids[self.index] = TYPE_IDS[0];
self.type_ids[self.index] = TYPE_ID_NULL;
self.index += 1;
debug_assert!(self.index <= self.capacity);
}
Expand Down Expand Up @@ -86,17 +83,16 @@ impl TryFrom<JsonUnion> for UnionArray {
type Error = arrow::error::ArrowError;

fn try_from(value: JsonUnion) -> Result<Self, Self::Error> {
let [f0, f1, f2, f3, f4, f5, f6] = union_fields();
let children: Vec<(Field, Arc<dyn Array>)> = vec![
(f0, Arc::new(BooleanArray::from(value.nulls))),
(f1, Arc::new(BooleanArray::from(value.bools))),
(f2, Arc::new(Int64Array::from(value.ints))),
(f3, Arc::new(Float64Array::from(value.floats))),
(f4, Arc::new(StringArray::from(value.strings))),
(f5, Arc::new(StringArray::from(value.arrays))),
(f6, Arc::new(StringArray::from(value.objects))),
let children: Vec<Arc<dyn Array>> = vec![
Arc::new(BooleanArray::from(value.nulls)),
Arc::new(BooleanArray::from(value.bools)),
Arc::new(Int64Array::from(value.ints)),
Arc::new(Float64Array::from(value.floats)),
Arc::new(StringArray::from(value.strings)),
Arc::new(StringArray::from(value.arrays)),
Arc::new(StringArray::from(value.objects)),
];
UnionArray::try_new(TYPE_IDS, Buffer::from_slice_ref(&value.type_ids), None, children)
UnionArray::try_new(union_fields(), Buffer::from_vec(value.type_ids).into(), None, children)
}
}

Expand All @@ -111,18 +107,29 @@ pub(crate) enum JsonUnionField {
Object(String),
}

const TYPE_IDS: &[i8] = &[0, 1, 2, 3, 4, 5, 6];

fn union_fields() -> [Field; 7] {
[
Field::new("null", DataType::Boolean, true),
Field::new("bool", DataType::Boolean, false),
Field::new("int", DataType::Int64, false),
Field::new("float", DataType::Float64, false),
Field::new("str", DataType::Utf8, false),
Field::new("array", DataType::Utf8, false),
Field::new("object", DataType::Utf8, false),
]
const TYPE_ID_NULL: i8 = 0;
const TYPE_ID_BOOL: i8 = 1;
const TYPE_ID_INT: i8 = 2;
const TYPE_ID_FLOAT: i8 = 3;
const TYPE_ID_STR: i8 = 4;
const TYPE_ID_ARRAY: i8 = 5;
const TYPE_ID_OBJECT: i8 = 6;

fn union_fields() -> UnionFields {
static FIELDS: OnceLock<UnionFields> = OnceLock::new();
FIELDS
.get_or_init(|| {
UnionFields::from_iter([
(TYPE_ID_NULL, Arc::new(Field::new("null", DataType::Boolean, true))),
(TYPE_ID_BOOL, Arc::new(Field::new("bool", DataType::Boolean, false))),
(TYPE_ID_INT, Arc::new(Field::new("int", DataType::Int64, false))),
(TYPE_ID_FLOAT, Arc::new(Field::new("float", DataType::Float64, false))),
(TYPE_ID_STR, Arc::new(Field::new("str", DataType::Utf8, false))),
(TYPE_ID_ARRAY, Arc::new(Field::new("array", DataType::Utf8, false))),
(TYPE_ID_OBJECT, Arc::new(Field::new("object", DataType::Utf8, false))),
])
})
.clone()
}

impl JsonUnionField {
Expand All @@ -141,7 +148,7 @@ impl JsonUnionField {
pub fn scalar_value(f: Option<Self>) -> ScalarValue {
ScalarValue::Union(
f.map(|f| (f.type_id(), Box::new(f.into()))),
UnionFields::new(TYPE_IDS.to_vec(), union_fields().to_vec()),
union_fields(),
UnionMode::Sparse,
)
}
Expand Down
9 changes: 4 additions & 5 deletions src/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{Expr, ScalarFunctionDefinition};
use datafusion_expr::Expr;

pub(crate) struct JsonFunctionRewriter;

Expand All @@ -17,8 +17,7 @@ impl FunctionRewrite for JsonFunctionRewriter {
fn rewrite(&self, expr: Expr, _schema: &DFSchema, _config: &ConfigOptions) -> Result<Transformed<Expr>> {
if let Expr::Cast(cast) = &expr {
if let Expr::ScalarFunction(func) = &*cast.expr {
let ScalarFunctionDefinition::UDF(udf) = &func.func_def;
if udf.name() == "json_get" {
if func.func.name() == "json_get" {
if let Some(t) = switch_json_get(&cast.data_type, &func.args) {
return Ok(t);
}
Expand All @@ -30,15 +29,15 @@ impl FunctionRewrite for JsonFunctionRewriter {
}

fn switch_json_get(cast_data_type: &DataType, args: &[Expr]) -> Option<Transformed<Expr>> {
let udf = match cast_data_type {
let func = match cast_data_type {
DataType::Boolean => crate::json_get_bool::json_get_bool_udf(),
DataType::Float64 | DataType::Float32 => crate::json_get_float::json_get_float_udf(),
DataType::Int64 | DataType::Int32 => crate::json_get_int::json_get_int_udf(),
DataType::Utf8 => crate::json_get_str::json_get_str_udf(),
_ => return None,
};
let f = ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(udf),
func,
args: args.to_vec(),
};
Some(Transformed::yes(Expr::ScalarFunction(f)))
Expand Down

0 comments on commit d9b87ed

Please sign in to comment.