diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 00cacfbd3..92c9c22b8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,8 +24,10 @@ jobs: - "-p stac -F reqwest" - "-p stac-api" - "-p stac -p stac-api -F geo" + - "-p stac-arrow" - "-p stac-async" - "-p stac-cli --no-default-features" + - "-p stac-geoparquet" - "-p stac-server --no-default-features" - "-p stac-server --no-default-features -F axum" - "-p stac-server --no-default-features -F memory-item-search" @@ -144,8 +146,9 @@ jobs: with: python-version: "3.10" cache: "pip" + cache-dependency-path: scripts/requirements-stac-server.txt - name: Install stac-api-validator - run: pip install -r scripts/requirements.txt + run: pip install -r scripts/requirements-stac-server.txt - name: Validate run: scripts/validate-stac-server validate-stac-server-pgstac: @@ -169,7 +172,22 @@ jobs: with: python-version: "3.10" cache: "pip" + cache-dependency-path: scripts/requirements-stac-server.txt - name: Install stac-api-validator - run: pip install -r scripts/requirements.txt + run: pip install -r scripts/requirements-stac-server.txt - name: Validate run: scripts/validate-stac-server --pgstac + validate-stac-geoparquet: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: Swatinem/rust-cache@v2 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: "pip" + cache-dependency-path: scripts/requirements-stac-geoparquet.txt + - name: Install requirements + run: pip install -r scripts/requirements-stac-geoparquet.txt + - name: Validate + run: scripts/validate-stac-geoparquet \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index ec37e9c39..ff2481228 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,16 +4,20 @@ members = [ "stac", "pgstac", "stac-api", + "stac-arrow", "stac-async", "stac-cli", + "stac-geoparquet", "stac-server", "stac-validate", ] default-members = [ "stac", "stac-api", + "stac-arrow", "stac-async", "stac-cli", + "stac-geoparquet", "stac-server", "stac-validate", ] diff --git a/scripts/requirements-stac-geoparquet.in b/scripts/requirements-stac-geoparquet.in new file mode 100644 index 000000000..aa0957dff --- /dev/null +++ b/scripts/requirements-stac-geoparquet.in @@ -0,0 +1,3 @@ +deepdiff +pyarrow +stac-geoparquet \ No newline at end of file diff --git a/scripts/requirements-stac-geoparquet.txt b/scripts/requirements-stac-geoparquet.txt new file mode 100644 index 000000000..73765b60b --- /dev/null +++ b/scripts/requirements-stac-geoparquet.txt @@ -0,0 +1,65 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile scripts/requirements-stac-geoparquet.in +certifi==2024.7.4 + # via + # pyogrio + # pyproj +ciso8601==2.3.1 + # via stac-geoparquet +deepdiff==7.0.1 + # via -r scripts/requirements-stac-geoparquet.in +deltalake==0.18.2 + # via stac-geoparquet +geopandas==1.0.1 + # via stac-geoparquet +numpy==2.0.1 + # via + # geopandas + # pandas + # pyarrow + # pyogrio + # shapely +ordered-set==4.1.0 + # via deepdiff +orjson==3.10.6 + # via stac-geoparquet +packaging==24.1 + # via + # geopandas + # pyogrio + # stac-geoparquet +pandas==2.2.2 + # via + # geopandas + # stac-geoparquet +pyarrow==17.0.0 + # via + # -r scripts/requirements-stac-geoparquet.in + # deltalake + # stac-geoparquet +pyarrow-hotfix==0.6 + # via deltalake +pyogrio==0.9.0 + # via geopandas +pyproj==3.6.1 + # via + # geopandas + # stac-geoparquet +pystac==1.10.1 + # via stac-geoparquet +python-dateutil==2.9.0.post0 + # via + # pandas + # pystac +pytz==2024.1 + # via pandas +shapely==2.0.5 + # via + # geopandas + # stac-geoparquet +six==1.16.0 + # via python-dateutil +stac-geoparquet==0.6.0 + # via -r scripts/requirements-stac-geoparquet.in +tzdata==2024.1 + # via pandas diff --git a/scripts/requirements.in b/scripts/requirements-stac-server.in similarity index 100% rename from scripts/requirements.in rename to scripts/requirements-stac-server.in diff --git a/scripts/requirements.txt b/scripts/requirements-stac-server.txt similarity index 86% rename from scripts/requirements.txt rename to scripts/requirements-stac-server.txt index ff4734894..fc833423c 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements-stac-server.txt @@ -1,6 +1,6 @@ # This file was autogenerated by uv via the following command: -# uv pip compile scripts/requirements.in -attrs==23.2.0 +# uv pip compile scripts/requirements-stac-server.in +attrs==24.2.0 # via # jsonschema # referencing @@ -19,7 +19,7 @@ deepdiff==6.7.1 # via stac-api-validator idna==3.7 # via requests -jsonschema==4.22.0 +jsonschema==4.23.0 # via # pystac # stac-api-validator @@ -29,13 +29,13 @@ jsonschema-specifications==2023.12.1 # via jsonschema more-itertools==8.14.0 # via stac-api-validator -numpy==2.0.0 +numpy==2.0.1 # via shapely ordered-set==4.1.0 # via deepdiff -orjson==3.10.5 +orjson==3.10.6 # via pystac -pystac[orjson,validation]==1.10.1 +pystac==1.10.1 # via # pystac-client # stac-api-validator @@ -47,7 +47,7 @@ python-dateutil==2.9.0.post0 # pystac-client python-dotenv==1.0.1 # via stac-check -pyyaml==6.0.1 +pyyaml==6.0.2 # via # stac-api-validator # stac-check @@ -61,16 +61,16 @@ requests==2.32.3 # stac-api-validator # stac-check # stac-validator -rpds-py==0.18.1 +rpds-py==0.20.0 # via # jsonschema # referencing -shapely==2.0.4 +shapely==2.0.5 # via stac-api-validator six==1.16.0 # via python-dateutil stac-api-validator==0.6.2 - # via -r requirements.in + # via -r scripts/requirements-stac-server.in stac-check==1.3.3 # via stac-api-validator stac-validator==3.3.2 diff --git a/scripts/validate-stac-geoparquet b/scripts/validate-stac-geoparquet new file mode 100755 index 000000000..027fd5914 --- /dev/null +++ b/scripts/validate-stac-geoparquet @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +import json +import sys +import shutil +import subprocess +import tempfile +from typing import Any +from deepdiff import DeepDiff +from pathlib import Path +import pyarrow.parquet +import stac_geoparquet.arrow +import pyarrow + +root = Path(__file__).parents[1] +path = root / "spec-examples" / "v1.0.0" / "extended-item.json" +directory = tempfile.mkdtemp() +parquet_path = Path(directory) / "extended-item.parquet" + +def clean_report(report: dict[str, Any]) -> dict[str, Any]: + """We expect datetime values to be changed in the report.""" + if report.get("values_changed"): + if report["values_changed"].get("root['properties']['datetime']") == { + "new_value": "2020-12-14T18:02:31.437Z", + "old_value": "2020-12-14T18:02:31.437000Z", + }: + del report["values_changed"]["root['properties']['datetime']"] + if not report["values_changed"]: + del report["values_changed"] + return report + +try: + # Writing + subprocess.check_call( + ["cargo", "run", "--no-default-features", "--", "translate", path, parquet_path] + ) + table = pyarrow.parquet.read_table(parquet_path) + after = next(stac_geoparquet.arrow.stac_table_to_items(table)) + with open(path) as f: + before = json.load(f) + report = DeepDiff(before, after).to_dict() + report = clean_report(report) + if report: + print(json.dumps(report, indent=2)) + sys.exit(1) + else: + parquet_path.unlink() + + # Reading + table = stac_geoparquet.arrow.parse_stac_items_to_arrow([before]) + stac_geoparquet.arrow.to_parquet(table, parquet_path) + item_collection = json.loads(subprocess.check_output( + ["cargo", "run", "--no-default-features", "--", "translate", parquet_path] + )) + report = DeepDiff(before, item_collection["features"][0]).to_dict() + report = clean_report(report) + if report: + print(json.dumps(report, indent=2)) + sys.exit(1) + +finally: + shutil.rmtree(directory) diff --git a/stac-arrow/Cargo.toml b/stac-arrow/Cargo.toml new file mode 100644 index 000000000..da8e2cacb --- /dev/null +++ b/stac-arrow/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "stac-arrow" +version = "0.1.0" +edition = "2021" + +[dependencies] +arrow-array = "52" +arrow-cast = "52" +arrow-schema = "52" +arrow-json = "52" +geoarrow = { git = "https://github.com/geoarrow/geoarrow-rs", rev = "476562b3da7dde9cd324fc5bf5ceb5451f76c451" } +geojson = "0.24" +geo-types = "0.7" +serde_json = "1" +stac = { version = "0.7", path = "../stac" } +thiserror = "1" + +[dev-dependencies] +geoarrow = { git = "https://github.com/geoarrow/geoarrow-rs", rev = "476562b3da7dde9cd324fc5bf5ceb5451f76c451", features = [ + "parquet", +] } +stac-validate = { version = "0.1", path = "../stac-validate" } diff --git a/stac-arrow/data b/stac-arrow/data new file mode 120000 index 000000000..188ec7632 --- /dev/null +++ b/stac-arrow/data @@ -0,0 +1 @@ +../spec-examples/v1.0.0 \ No newline at end of file diff --git a/stac-arrow/examples/extended-item.parquet b/stac-arrow/examples/extended-item.parquet new file mode 100644 index 000000000..d7bb47e6f Binary files /dev/null and b/stac-arrow/examples/extended-item.parquet differ diff --git a/stac-arrow/src/json.rs b/stac-arrow/src/json.rs new file mode 100644 index 000000000..3379527a1 --- /dev/null +++ b/stac-arrow/src/json.rs @@ -0,0 +1,460 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Taken from v51.0.0 of +//! [arrow-json](https://docs.rs/arrow-json/51.0.0/arrow_json/index.html), we've +//! lifted this code to convert record batches to vectors of +//! [serde_json::Value]. We've been able to go _mostly_ as-is, but there's some +//! modifications and cutouts. + +use arrow_array::cast::*; +use arrow_array::types::*; +use arrow_array::*; +use arrow_cast::display::{ArrayFormatter, FormatOptions}; +use arrow_json::JsonSerializable; +use arrow_schema::*; +use serde_json::json; +use serde_json::map::Map as JsonMap; +use serde_json::Value; +use std::iter; + +fn primitive_array_to_json(array: &dyn Array) -> Result, ArrowError> +where + T: ArrowPrimitiveType, + T::Native: JsonSerializable, +{ + Ok(array + .as_primitive::() + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => v.into_json_value().unwrap_or(Value::Null), + None => Value::Null, + }) + .collect()) +} + +fn struct_array_to_jsonmap_array( + array: &StructArray, + explicit_nulls: bool, +) -> Result>>, ArrowError> { + let inner_col_names = array.column_names(); + + let mut inner_objs = (0..array.len()) + // Ensure we write nulls for struct arrays as nulls in JSON + // Instead of writing a struct with nulls + .map(|index| array.is_valid(index).then(JsonMap::new)) + .collect::>>>(); + + for (j, struct_col) in array.columns().iter().enumerate() { + set_column_for_json_rows( + &mut inner_objs, + struct_col, + inner_col_names[j], + explicit_nulls, + )? + } + Ok(inner_objs) +} + +fn array_to_json_array_internal( + array: &dyn Array, + explicit_nulls: bool, +) -> Result, ArrowError> { + match array.data_type() { + DataType::Null => Ok(iter::repeat(Value::Null).take(array.len()).collect()), + DataType::Boolean => Ok(array + .as_boolean() + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => v.into(), + None => Value::Null, + }) + .collect()), + + DataType::Utf8 => Ok(array + .as_string::() + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => v.into(), + None => Value::Null, + }) + .collect()), + DataType::LargeUtf8 => Ok(array + .as_string::() + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => v.into(), + None => Value::Null, + }) + .collect()), + DataType::Int8 => primitive_array_to_json::(array), + DataType::Int16 => primitive_array_to_json::(array), + DataType::Int32 => primitive_array_to_json::(array), + DataType::Int64 => primitive_array_to_json::(array), + DataType::UInt8 => primitive_array_to_json::(array), + DataType::UInt16 => primitive_array_to_json::(array), + DataType::UInt32 => primitive_array_to_json::(array), + DataType::UInt64 => primitive_array_to_json::(array), + DataType::Float16 => primitive_array_to_json::(array), + DataType::Float32 => primitive_array_to_json::(array), + DataType::Float64 => primitive_array_to_json::(array), + DataType::List(_) => as_list_array(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => Ok(Value::Array(array_to_json_array_internal( + &v, + explicit_nulls, + )?)), + None => Ok(Value::Null), + }) + .collect(), + DataType::LargeList(_) => as_large_list_array(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => Ok(Value::Array(array_to_json_array_internal( + &v, + explicit_nulls, + )?)), + None => Ok(Value::Null), + }) + .collect(), + DataType::FixedSizeList(_, _) => as_fixed_size_list_array(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => Ok(Value::Array(array_to_json_array_internal( + &v, + explicit_nulls, + )?)), + None => Ok(Value::Null), + }) + .collect(), + DataType::Struct(_) => { + let jsonmaps = struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls)?; + let json_values = jsonmaps + .into_iter() + .map(|maybe_map| maybe_map.map(Value::Object).unwrap_or(Value::Null)) + .collect(); + Ok(json_values) + } + DataType::Map(_, _) => as_map_array(array) + .iter() + .map(|maybe_value| match maybe_value { + Some(v) => Ok(Value::Array(array_to_json_array_internal( + &v, + explicit_nulls, + )?)), + None => Ok(Value::Null), + }) + .collect(), + t => Err(ArrowError::JsonError(format!( + "data type {t:?} not supported" + ))), + } +} + +macro_rules! set_column_by_array_type { + ($cast_fn:ident, $col_name:ident, $rows:ident, $array:ident, $explicit_nulls:ident) => { + let arr = $cast_fn($array); + $rows + .iter_mut() + .zip(arr.iter()) + .filter_map(|(maybe_row, maybe_value)| maybe_row.as_mut().map(|row| (row, maybe_value))) + .for_each(|(row, maybe_value)| { + if let Some(j) = maybe_value.map(Into::into) { + row.insert($col_name.to_string(), j); + } else if $explicit_nulls { + row.insert($col_name.to_string(), Value::Null); + } + }); + }; +} + +fn set_column_by_primitive_type( + rows: &mut [Option>], + array: &ArrayRef, + col_name: &str, + explicit_nulls: bool, +) where + T: ArrowPrimitiveType, + T::Native: JsonSerializable, +{ + let primitive_arr = array.as_primitive::(); + + rows.iter_mut() + .zip(primitive_arr.iter()) + .filter_map(|(maybe_row, maybe_value)| maybe_row.as_mut().map(|row| (row, maybe_value))) + .for_each(|(row, maybe_value)| { + if let Some(j) = maybe_value.and_then(|v| v.into_json_value()) { + row.insert(col_name.to_string(), j); + } else if explicit_nulls { + row.insert(col_name.to_string(), Value::Null); + } + }); +} + +fn set_column_for_json_rows( + rows: &mut [Option>], + array: &ArrayRef, + col_name: &str, + explicit_nulls: bool, +) -> Result<(), ArrowError> { + match array.data_type() { + DataType::Int8 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::Int16 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::Int32 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::Int64 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::UInt8 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::UInt16 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::UInt32 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::UInt64 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::Float16 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::Float32 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::Float64 => { + set_column_by_primitive_type::(rows, array, col_name, explicit_nulls); + } + DataType::Null => { + if explicit_nulls { + rows.iter_mut() + .filter_map(|maybe_row| maybe_row.as_mut()) + .for_each(|row| { + row.insert(col_name.to_string(), Value::Null); + }); + } + } + DataType::Boolean => { + set_column_by_array_type!(as_boolean_array, col_name, rows, array, explicit_nulls); + } + DataType::Utf8 => { + set_column_by_array_type!(as_string_array, col_name, rows, array, explicit_nulls); + } + DataType::LargeUtf8 => { + set_column_by_array_type!(as_largestring_array, col_name, rows, array, explicit_nulls); + } + DataType::Date32 + | DataType::Date64 + | DataType::Timestamp(_, _) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Duration(_) => { + let options = FormatOptions::default(); + let formatter = ArrayFormatter::try_new(array.as_ref(), &options)?; + let nulls = array.nulls(); + rows.iter_mut() + .enumerate() + .filter_map(|(idx, maybe_row)| maybe_row.as_mut().map(|row| (idx, row))) + .for_each(|(idx, row)| { + let maybe_value = nulls + .map(|x| x.is_valid(idx)) + .unwrap_or(true) + .then(|| formatter.value(idx).to_string().into()); + if let Some(j) = maybe_value { + row.insert(col_name.to_string(), j); + } else if explicit_nulls { + row.insert(col_name.to_string(), Value::Null); + } + }); + } + DataType::Struct(_) => { + let inner_objs = struct_array_to_jsonmap_array(array.as_struct(), explicit_nulls)?; + rows.iter_mut() + .zip(inner_objs) + .filter_map(|(maybe_row, maybe_obj)| maybe_row.as_mut().map(|row| (row, maybe_obj))) + .for_each(|(row, maybe_obj)| { + let json = if let Some(obj) = maybe_obj { + if col_name == "bbox" { + convert_bbox(obj) + } else { + Value::Object(obj) + } + } else { + Value::Null + }; + row.insert(col_name.to_string(), json); + }); + } + DataType::List(_) => { + let listarr = as_list_array(array); + rows.iter_mut() + .zip(listarr.iter()) + .filter_map(|(maybe_row, maybe_value)| { + maybe_row.as_mut().map(|row| (row, maybe_value)) + }) + .try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> { + let maybe_value = maybe_value + .map(|v| array_to_json_array_internal(&v, explicit_nulls).map(Value::Array)) + .transpose()?; + if let Some(j) = maybe_value { + row.insert(col_name.to_string(), j); + } else if explicit_nulls { + row.insert(col_name.to_string(), Value::Null); + } + Ok(()) + })?; + } + DataType::LargeList(_) => { + let listarr = as_large_list_array(array); + rows.iter_mut() + .zip(listarr.iter()) + .filter_map(|(maybe_row, maybe_value)| { + maybe_row.as_mut().map(|row| (row, maybe_value)) + }) + .try_for_each(|(row, maybe_value)| -> Result<(), ArrowError> { + let maybe_value = maybe_value + .map(|v| array_to_json_array_internal(&v, explicit_nulls).map(Value::Array)) + .transpose()?; + if let Some(j) = maybe_value { + row.insert(col_name.to_string(), j); + } else if explicit_nulls { + row.insert(col_name.to_string(), Value::Null); + } + Ok(()) + })?; + } + DataType::Dictionary(_, value_type) => { + let hydrated = arrow_cast::cast::cast(&array, value_type) + .expect("cannot cast dictionary to underlying values"); + set_column_for_json_rows(rows, &hydrated, col_name, explicit_nulls)?; + } + DataType::Map(_, _) => { + let maparr = as_map_array(array); + + let keys = maparr.keys(); + let values = maparr.values(); + + // Keys have to be strings to convert to json. + if !matches!(keys.data_type(), DataType::Utf8) { + return Err(ArrowError::JsonError(format!( + "data type {:?} not supported in nested map for json writer", + keys.data_type() + ))); + } + + let keys = keys.as_string::(); + let values = array_to_json_array_internal(values, explicit_nulls)?; + + let mut kv = keys.iter().zip(values); + + for (i, row) in rows + .iter_mut() + .enumerate() + .filter_map(|(i, maybe_row)| maybe_row.as_mut().map(|row| (i, row))) + { + if maparr.is_null(i) { + row.insert(col_name.to_string(), serde_json::Value::Null); + continue; + } + + let len = maparr.value_length(i) as usize; + let mut obj = serde_json::Map::new(); + + for (_, (k, v)) in (0..len).zip(&mut kv) { + obj.insert(k.expect("keys in a map should be non-null").to_string(), v); + } + + row.insert(col_name.to_string(), serde_json::Value::Object(obj)); + } + } + _ => { + return Err(ArrowError::JsonError(format!( + "data type {:?} not supported in nested map for json writer", + array.data_type() + ))) + } + } + Ok(()) +} + +pub(crate) fn record_batches_to_json_rows( + batches: &[RecordBatch], + geometry_index: usize, +) -> Result>, ArrowError> { + // For backwards compatibility, default to skip nulls + // Skip converting the geometry index, we'll do that later. + record_batches_to_json_rows_internal(batches, false, geometry_index) +} + +fn record_batches_to_json_rows_internal( + batches: &[RecordBatch], + explicit_nulls: bool, + geometry_index: usize, +) -> Result>, ArrowError> { + let mut rows: Vec>> = iter::repeat(Some(JsonMap::new())) + .take(batches.iter().map(|b| b.num_rows()).sum()) + .collect(); + + if !rows.is_empty() { + let schema = batches[0].schema(); + let mut base = 0; + for batch in batches { + let row_count = batch.num_rows(); + let row_slice = &mut rows[base..base + batch.num_rows()]; + for (j, col) in batch.columns().iter().enumerate() { + if j == geometry_index { + continue; + } + let col_name = schema.field(j).name(); + set_column_for_json_rows(row_slice, col, col_name, explicit_nulls)? + } + base += row_count; + } + } + + let rows = rows.into_iter().map(|a| a.unwrap()).collect::>(); + Ok(rows) +} + +fn convert_bbox(obj: serde_json::Map) -> Value { + if let Some((((xmin, ymin), xmax), ymax)) = obj + .get("xmin") + .and_then(|v| v.as_f64()) + .zip(obj.get("ymin").and_then(|v| v.as_f64())) + .zip(obj.get("xmax").and_then(|v| v.as_f64())) + .zip(obj.get("ymax").and_then(|v| v.as_f64())) + { + if let Some((zmin, zmax)) = obj + .get("zmin") + .and_then(|v| v.as_f64()) + .zip(obj.get("zmax").and_then(|v| v.as_f64())) + { + json!([xmin, ymin, zmin, xmax, ymax, zmax]) + } else { + json!([xmin, ymin, xmax, ymax]) + } + } else { + Value::Object(obj) + } +} diff --git a/stac-arrow/src/lib.rs b/stac-arrow/src/lib.rs new file mode 100644 index 000000000..6dc843ca1 --- /dev/null +++ b/stac-arrow/src/lib.rs @@ -0,0 +1,230 @@ +mod json; + +use arrow_json::ReaderBuilder; +use arrow_schema::{DataType, Field, SchemaBuilder, TimeUnit}; +use geo_types::Geometry; +use geoarrow::{ + array::{AsGeometryArray, MixedGeometryBuilder}, + datatypes::GeoDataType, + table::Table, + trait_::GeometryArrayAccessor, +}; +use geojson::Value; +use serde_json::json; +use stac::{FlatItem, Item, ItemCollection}; +use std::sync::Arc; +use thiserror::Error; + +const DATETIME_COLUMNS: [&'static str; 8] = [ + "datetime", + "start_datetime", + "end_datetime", + "created", + "updated", + "expires", + "published", + "unpublished", +]; + +#[derive(Debug, Error)] +pub enum Error { + /// [arrow_schema::ArrowError] + #[error(transparent)] + Arrow(#[from] arrow_schema::ArrowError), + + /// [geoarrow::error::GeoArrowError] + #[error(transparent)] + GeoArrow(#[from] geoarrow::error::GeoArrowError), + + /// Invalid bbox length. + #[error("invalid bbox length (should be four or six): {0}")] + InvalidBBoxLength(usize), + + /// No geometry column. + #[error("no geometry column")] + NoGeometryColumn, + + /// No items to serialize. + #[error("no items")] + NoItems, + + /// [serde_json::Error] + #[error(transparent)] + SerdeJson(#[from] serde_json::Error), + + /// [stac::Error] + #[error(transparent)] + Stac(#[from] stac::Error), +} + +pub type Result = std::result::Result; + +/// Converts an [ItemCollection] to a [Table]. +/// +/// Any invalid attributes in the items (e.g. top-level attributes that conflict +/// with STAC spec attributes) will be dropped with a warning. +/// +/// # Examples +/// +/// ``` +/// use stac::ItemCollection; +/// +/// let item = stac::read("data/simple-item.json").unwrap(); +/// let item_collection = ItemCollection::from(vec![item]); +/// let table = stac_arrow::to_table(item_collection).unwrap(); +/// ``` +pub fn to_table(item_collection: ItemCollection) -> Result { + let mut values = Vec::with_capacity(item_collection.items.len()); + let mut builder = MixedGeometryBuilder::::new(); + for mut item in item_collection.items { + builder.push_geometry( + item.geometry + .take() + .and_then(|geometry| Geometry::try_from(geometry).ok()) + .as_ref(), + )?; + let flat_item = item.into_flat_item(true)?; + let mut value = serde_json::to_value(flat_item)?; + { + let value = value + .as_object_mut() + .expect("a flat item should serialize to an object"); + value.remove("geometry"); + if let Some(bbox) = value.remove("bbox") { + let bbox = bbox + .as_array() + .expect("STAC items should always have a list as their bbox"); + if bbox.len() == 4 { + value.insert("bbox".into(), json!({ + "xmin": bbox[0].as_number().expect("all bbox values should be a number"), + "ymin": bbox[1].as_number().expect("all bbox values should be a number"), + "xmax": bbox[2].as_number().expect("all bbox values should be a number"), + "ymax": bbox[3].as_number().expect("all bbox values should be a number"), + })); + } else if bbox.len() == 6 { + value.insert("bbox".into(), json!({ + "xmin": bbox[0].as_number().expect("all bbox values should be a number"), + "ymin": bbox[1].as_number().expect("all bbox values should be a number"), + "zmin": bbox[2].as_number().expect("all bbox values should be a number"), + "xmax": bbox[3].as_number().expect("all bbox values should be a number"), + "ymax": bbox[4].as_number().expect("all bbox values should be a number"), + "zmax": bbox[5].as_number().expect("all bbox values should be a number"), + })); + } else { + return Err(Error::InvalidBBoxLength(bbox.len())); + } + } + } + values.push(value); + } + let schema = arrow_json::reader::infer_json_schema_from_iterator(values.iter().map(Ok))?; + let mut schema_builder = SchemaBuilder::new(); + for field in schema.fields().iter() { + if DATETIME_COLUMNS.contains(&field.name().as_str()) { + schema_builder.push(Field::new( + field.name(), + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())), + field.is_nullable(), + )); + } else { + schema_builder.push(field.clone()); + } + } + let metadata = schema.metadata; + let schema = Arc::new(schema_builder.finish().with_metadata(metadata)); + let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder()?; + decoder.serialize(&values)?; + let batch = decoder.flush()?.ok_or(Error::NoItems)?; + Table::from_arrow_and_geometry( + vec![batch], + schema, + geoarrow::chunked_array::from_geoarrow_chunks(&[&builder.finish()])?, + ) + .map_err(Error::from) +} + +/// Converts a [Table] to an [ItemCollection]. +/// +/// # Examples +/// +/// ``` +/// use std::fs::File; +/// +/// let file = File::open("examples/extended-item.parquet").unwrap(); +/// let table = geoarrow::io::parquet::read_geoparquet(file, Default::default()).unwrap(); +/// let item_collection = stac_arrow::from_table(table).unwrap(); +/// ``` +pub fn from_table(table: Table) -> Result { + use GeoDataType::*; + + let (index, _) = table + .schema() + .column_with_name("geometry") + .ok_or(Error::NoGeometryColumn)?; + let mut json_rows = crate::json::record_batches_to_json_rows(table.batches(), index)?; + let mut base = 0; + let mut items = Vec::new(); + for chunk in table.geometry_column(Some(index))?.geometry_chunks() { + for i in 0..chunk.len() { + let value = match chunk.data_type() { + Point(_) => Value::from(&chunk.as_point().value_as_geo(i)), + LineString(_) => Value::from(&chunk.as_line_string().value_as_geo(i)), + LargeLineString(_) => Value::from(&chunk.as_large_line_string().value_as_geo(i)), + Polygon(_) => Value::from(&chunk.as_polygon().value_as_geo(i)), + LargePolygon(_) => Value::from(&chunk.as_large_polygon().value_as_geo(i)), + MultiPoint(_) => Value::from(&chunk.as_multi_point().value_as_geo(i)), + LargeMultiPoint(_) => Value::from(&chunk.as_large_multi_point().value_as_geo(i)), + MultiLineString(_) => Value::from(&chunk.as_multi_line_string().value_as_geo(i)), + LargeMultiLineString(_) => { + Value::from(&chunk.as_large_multi_line_string().value_as_geo(i)) + } + MultiPolygon(_) => Value::from(&chunk.as_multi_polygon().value_as_geo(i)), + LargeMultiPolygon(_) => { + Value::from(&chunk.as_large_multi_polygon().value_as_geo(i)) + } + Mixed(_) => Value::from(&chunk.as_mixed().value_as_geo(i)), + LargeMixed(_) => Value::from(&chunk.as_large_mixed().value_as_geo(i)), + GeometryCollection(_) => { + Value::from(&chunk.as_geometry_collection().value_as_geo(i)) + } + LargeGeometryCollection(_) => { + Value::from(&chunk.as_large_geometry_collection().value_as_geo(i)) + } + WKB => Value::from(&chunk.as_wkb().value_as_geo(i)), + LargeWKB => Value::from(&chunk.as_large_wkb().value_as_geo(i)), + Rect => Value::from(&chunk.as_rect().value_as_geo(i)), + }; + json_rows[base + i].insert( + "geometry".into(), + serde_json::to_value(geojson::Geometry::new(value))?, + ); + let flat_item: FlatItem = serde_json::from_value(serde_json::Value::Object( + std::mem::take(&mut json_rows[base + i]), + ))?; + items.push(Item::try_from(flat_item)?); + } + base += chunk.len(); + } + Ok(items.into()) +} + +#[cfg(test)] +mod tests { + use stac_validate::Validate; + use std::fs::File; + + #[test] + fn to_table() { + let item = stac::read("data/simple-item.json").unwrap(); + super::to_table(vec![item].into()).unwrap(); + } + + #[test] + fn from_table() { + let file = File::open("examples/extended-item.parquet").unwrap(); + let table = geoarrow::io::parquet::read_geoparquet(file, Default::default()).unwrap(); + let item_collection = super::from_table(table).unwrap(); + assert_eq!(item_collection.items.len(), 1); + item_collection.items[0].validate().unwrap(); + } +} diff --git a/stac-cli/Cargo.toml b/stac-cli/Cargo.toml index 04d4cd3b9..7652a6898 100644 --- a/stac-cli/Cargo.toml +++ b/stac-cli/Cargo.toml @@ -17,12 +17,15 @@ pgstac = ["stac-server/pgstac"] [dependencies] axum = "0.7" +bytes = "1" clap = { version = "4", features = ["derive"] } +reqwest = "0.12" serde = "1" serde_json = "1" stac = { version = "0.7", path = "../stac" } stac-api = { version = "0.4", path = "../stac-api" } stac-async = { version = "0.5", path = "../stac-async" } +stac-geoparquet = { version = "0.1", path = "../stac-geoparquet" } stac-server = { version = "0.1", path = "../stac-server", features = [ "memory-item-search", ] } diff --git a/stac-cli/src/args.rs b/stac-cli/src/args.rs index c8ef05343..a0e332f4d 100644 --- a/stac-cli/src/args.rs +++ b/stac-cli/src/args.rs @@ -1,5 +1,6 @@ -use crate::{Format, Subcommand}; +use crate::{Format, Result, Subcommand}; use clap::Parser; +use std::{fs::File, io::Write}; /// CLI arguments. #[derive(Parser, Debug)] @@ -9,9 +10,13 @@ pub struct Args { #[arg(short, long)] pub compact: bool, + /// The input format. + #[arg(short, long)] + pub input_format: Option, + /// The output format. - #[arg(short, long, default_value = "json")] - pub format: Format, + #[arg(short, long)] + pub output_format: Option, /// The subcommand to run. #[command(subcommand)] @@ -57,6 +62,11 @@ pub struct ItemArgs { /// newly created to it into a new item collection. #[arg(short, long)] pub collect: bool, + + /// The file to write the item to. + /// + /// If not provided, the item will be written to standard output. + pub outfile: Option, } /// Arguments for searching a STAC API. @@ -83,7 +93,8 @@ pub struct SearchArgs { #[arg(short, long)] pub datetime: Option, - /// Searches items by performing intersection between their geometry and provided GeoJSON geometry. + /// Searches items by performing intersection between their geometry and + /// provided GeoJSON geometry. /// /// All GeoJSON geometry types must be supported. #[arg(long)] @@ -93,7 +104,8 @@ pub struct SearchArgs { #[arg(short, long)] pub ids: Option, - /// Comma-delimited list of one or more Collection IDs that each matching Item must be in. + /// Comma-delimited list of one or more Collection IDs that each matching + /// Item must be in. #[arg(short, long)] pub collections: Option, @@ -119,9 +131,14 @@ pub struct SearchArgs { #[arg(short, long)] pub filter: Option, - /// Stream the items to standard output as ndjson. + /// Stream the items to output as ndjson. #[arg(long)] pub stream: bool, + + /// The file to write the output to. + /// + /// If not provided, the output will be written to standard output. + pub outfile: Option, } /// Arguments for serving a STAC API. @@ -138,10 +155,15 @@ pub struct ServeArgs { /// Arguments for sorting a STAC value. #[derive(clap::Args, Debug)] pub struct SortArgs { - /// The href of the STAC object. + /// The href of the STAC to sort. /// - /// If this is not provided, will read from standard input. - pub href: Option, + /// If this is not provided, or is `-`, will read from standard input. + pub infile: Option, + + /// The output filename. + /// + /// If this is not provided, output will be printed to standard output. + pub outfile: Option, } /// Arguments for validating a STAC value. @@ -155,6 +177,48 @@ pub struct ValidateArgs { /// endpoint from a STAC API, all collections will be validated. /// Additional behavior TBD. /// - /// If this is not provided, will read from standard input. + /// If this is not provided, or is `-`, will read from standard input. pub href: Option, } + +/// Arguments for translating STAC values. +#[derive(clap::Args, Debug)] +pub struct TranslateArgs { + /// The input STAC value. + /// + /// If this is not provided, or is `-`, input will be read from standard + /// input. + pub infile: Option, + + /// The output STAC value. + /// + /// If not provided, output will be printed to standard output. + pub outfile: Option, +} + +impl Args { + pub(crate) fn writer(&self) -> Result> { + if let Some(outfile) = self.subcommand.outfile() { + let file = File::create(outfile)?; + Ok(Box::new(file)) + } else { + Ok(Box::new(std::io::stdout())) + } + } + + pub(crate) fn input_format(&self) -> Format { + self.input_format + .or_else(|| self.subcommand.infile().and_then(Format::maybe_from_href)) + .unwrap_or_default() + } + + pub(crate) fn output_format(&self) -> Format { + self.output_format + .or_else(|| self.subcommand.outfile().and_then(Format::maybe_from_href)) + .unwrap_or_default() + } + + pub(crate) fn outfile(&self) -> Option<&str> { + self.subcommand.outfile() + } +} diff --git a/stac-cli/src/error.rs b/stac-cli/src/error.rs index 39b45a558..97dac2ce2 100644 --- a/stac-cli/src/error.rs +++ b/stac-cli/src/error.rs @@ -1,6 +1,5 @@ -use thiserror::Error; - use crate::Output; +use thiserror::Error; /// Crate specific error type. #[derive(Error, Debug)] @@ -14,6 +13,10 @@ pub enum Error { #[error(transparent)] Io(#[from] std::io::Error), + /// [reqwest::Error] + #[error(transparent)] + Reqwest(#[from] reqwest::Error), + /// [serde_json::Error] #[error(transparent)] SerdeJson(#[from] serde_json::Error), @@ -30,6 +33,10 @@ pub enum Error { #[error(transparent)] StacAsync(#[from] stac_async::Error), + /// [stac_geoparquet::Error] + #[error(transparent)] + StacGeoparquet(#[from] stac_geoparquet::Error), + /// [stac_server::Error] #[error(transparent)] StacServer(#[from] stac_server::Error), diff --git a/stac-cli/src/format.rs b/stac-cli/src/format.rs index aaf3dee16..4e5e8457d 100644 --- a/stac-cli/src/format.rs +++ b/stac-cli/src/format.rs @@ -1,9 +1,14 @@ use crate::{Error, Result}; -use std::str::FromStr; +use bytes::Bytes; +use serde::de::DeserializeOwned; +use std::{fs::File, io::Read, str::FromStr}; /// The STAC output format. #[derive(Clone, Copy, Debug, Default)] pub enum Format { + /// stac-geparquet + Geoparquet, + /// JSON (the default). #[default] Json, @@ -14,7 +19,44 @@ impl FromStr for Format { fn from_str(s: &str) -> Result { match s.to_ascii_lowercase().as_str() { "json" | "geojson" => Ok(Format::Json), + "parquet" | "geoparquet" => Ok(Format::Geoparquet), _ => Err(Error::UnsupportedFormat(s.to_string())), } } } + +impl Format { + pub(crate) fn maybe_from_href(href: &str) -> Option { + href.rsplit_once('.') + .and_then(|(_, ext)| Format::from_str(ext).ok()) + } + + pub(crate) async fn read_href(&self, href: Option<&str>) -> Result { + if let Some(href) = href.and_then(|href| if href == "-" { None } else { Some(href) }) { + match *self { + Format::Geoparquet => { + let item_collection = if let Some(url) = stac::href_to_url(href) { + stac_geoparquet::from_reader(reqwest::blocking::get(url)?.bytes()?)? + } else { + let file = File::open(href)?; + stac_geoparquet::from_reader(file)? + }; + serde_json::from_value(serde_json::to_value(item_collection)?) + .map_err(Error::from) + } + Format::Json => stac_async::read_json(href).await.map_err(Error::from), + } + } else { + match *self { + Format::Geoparquet => { + let mut buf = Vec::new(); + let _ = std::io::stdin().read_to_end(&mut buf)?; + let item_collection = stac_geoparquet::from_reader(Bytes::from(buf))?; + serde_json::from_value(serde_json::to_value(item_collection)?) + .map_err(Error::from) + } + Format::Json => serde_json::from_reader(std::io::stdin()).map_err(Error::from), + } + } + } +} diff --git a/stac-cli/src/io.rs b/stac-cli/src/io.rs deleted file mode 100644 index 7dc961927..000000000 --- a/stac-cli/src/io.rs +++ /dev/null @@ -1,14 +0,0 @@ -//! Input/output utilities. - -use crate::{Error, Result}; -use serde::de::DeserializeOwned; - -/// Reads something from an href or from standard input. -pub async fn read_href(href: Option<&str>) -> Result { - // TODO support `-` for stdin - if let Some(href) = href { - stac_async::read_json(href).await.map_err(Error::from) - } else { - serde_json::from_reader(std::io::stdin()).map_err(Error::from) - } -} diff --git a/stac-cli/src/lib.rs b/stac-cli/src/lib.rs index a8e4d1baf..c82dcbc3c 100644 --- a/stac-cli/src/lib.rs +++ b/stac-cli/src/lib.rs @@ -34,13 +34,12 @@ mod args; mod error; mod format; -pub mod io; mod output; mod runner; mod subcommand; pub use { - args::{Args, ItemArgs, SearchArgs, ServeArgs, SortArgs, ValidateArgs}, + args::{Args, ItemArgs, SearchArgs, ServeArgs, SortArgs, TranslateArgs, ValidateArgs}, error::Error, format::Format, output::Output, @@ -56,14 +55,16 @@ pub type Result = std::result::Result; /// # Examples /// /// ``` -/// use stac_cli::{Args, Subcommand, Format, SortArgs}; +/// use stac_cli::{Args, Subcommand, SortArgs}; /// /// let sort_args = SortArgs { -/// href: Some("data/simple-item.json".to_string()) +/// infile: Some("data/simple-item.json".to_string()), +/// outfile: None, /// }; /// let args = Args { /// compact: false, -/// format: Format::Json, +/// input_format: None, +/// output_format: None, /// subcommand: Subcommand::Sort(sort_args), /// }; /// # tokio_test::block_on(async { @@ -71,13 +72,26 @@ pub type Result = std::result::Result; /// # }) /// ``` pub async fn run(args: Args) -> Result<()> { + let writer = args.writer()?; + let outfile = args.outfile().map(String::from); + let input_format = args.input_format(); + let output_format = args.output_format(); let mut runner = Runner { compact: args.compact, - format: args.format, - writer: std::io::stdout(), + input_format, + output_format, + writer, buffer: 100, }; - runner.run(args.subcommand).await + let result = runner.run(args.subcommand).await; + if result.is_err() { + if let Some(outfile) = outfile { + if let Err(err) = std::fs::remove_file(outfile) { + eprintln!("error when unlinking outfile: {}", err); + } + } + } + result } #[cfg(test)] diff --git a/stac-cli/src/main.rs b/stac-cli/src/main.rs index fcc304a78..0e0e2f100 100644 --- a/stac-cli/src/main.rs +++ b/stac-cli/src/main.rs @@ -6,6 +6,9 @@ async fn main() { let args = Args::parse(); std::process::exit(match stac_cli::run(args).await { Ok(()) => 0, - Err(err) => err.code() + Err(err) => { + eprintln!("ERROR: {}", err); + err.code() + } }) } diff --git a/stac-cli/src/output.rs b/stac-cli/src/output.rs index 82c9b08f9..6f9ea496e 100644 --- a/stac-cli/src/output.rs +++ b/stac-cli/src/output.rs @@ -24,6 +24,17 @@ impl Output { Output::String(_) => None, } } + + /// Converts this output to [stac::Value]. + /// + /// Strings are not converted. + pub fn to_stac(&self) -> Option { + match self { + Output::Stac(value) => Some(value.clone()), + Output::Json(value) => serde_json::from_value(value.clone()).ok(), + Output::String(_) => None, + } + } } impl From for Output { diff --git a/stac-cli/src/runner.rs b/stac-cli/src/runner.rs index 8c903ec4c..eb0b642be 100644 --- a/stac-cli/src/runner.rs +++ b/stac-cli/src/runner.rs @@ -3,12 +3,18 @@ use std::io::Write; /// Struct for running commands. #[derive(Debug)] -pub struct Runner { +pub struct Runner +where + W: Send, +{ /// Should the output be printed in compact form, if supported? pub compact: bool, + /// The input format. + pub input_format: Format, + /// The output format. - pub format: Format, + pub output_format: Format, /// The output writeable stream. pub writer: W, @@ -17,12 +23,16 @@ pub struct Runner { pub buffer: usize, } -impl Runner { +impl Runner +where + W: Send, +{ pub(crate) async fn run(&mut self, subcommand: Subcommand) -> Result<()> { let (sender, mut receiver) = tokio::sync::mpsc::channel(self.buffer); - let handle = tokio::spawn(async move { subcommand.run(sender).await }); + let input_format = self.input_format; + let handle = tokio::spawn(async move { subcommand.run(input_format, sender).await }); while let Some(value) = receiver.recv().await { - match self.format { + match self.output_format { Format::Json => { if let Some(value) = value.to_json() { if self.compact { @@ -34,6 +44,13 @@ impl Runner { writeln!(self.writer, "{}", value)?; } } + Format::Geoparquet => { + if let Some(value) = value.to_stac() { + stac_geoparquet::to_writer(&mut self.writer, value)?; + } else { + writeln!(self.writer, "{}", value)?; + } + } } } handle.await? diff --git a/stac-cli/src/subcommand/mod.rs b/stac-cli/src/subcommand/mod.rs index f4622da6b..9bcafc845 100644 --- a/stac-cli/src/subcommand/mod.rs +++ b/stac-cli/src/subcommand/mod.rs @@ -2,9 +2,13 @@ mod item; mod search; mod serve; mod sort; +mod translate; mod validate; -use crate::{Error, ItemArgs, Output, Result, SearchArgs, ServeArgs, SortArgs, ValidateArgs}; +use crate::{ + Error, Format, ItemArgs, Output, Result, SearchArgs, ServeArgs, SortArgs, TranslateArgs, + ValidateArgs, +}; use tokio::sync::mpsc::Sender; /// A CLI subcommand. @@ -27,12 +31,39 @@ pub enum Subcommand { /// Sorts the fields of STAC object. Sort(SortArgs), + /// Translates STAC values between formats. + Translate(TranslateArgs), + /// Validates a STAC object or API endpoint using json-schema validation. Validate(ValidateArgs), } impl Subcommand { - pub(crate) async fn run(self, sender: Sender) -> Result<()> { + pub(crate) fn infile(&self) -> Option<&str> { + use Subcommand::*; + + match self { + Item(args) => Some(args.id_or_href.as_str()), + Sort(args) => args.infile.as_deref(), + Translate(args) => args.infile.as_deref(), + Validate(args) => args.href.as_deref(), + _ => None, + } + } + + pub(crate) fn outfile(&self) -> Option<&str> { + use Subcommand::*; + + match self { + Item(args) => args.outfile.as_deref(), + Search(args) => args.outfile.as_deref(), + Sort(args) => args.outfile.as_deref(), + Translate(args) => args.outfile.as_deref(), + _ => None, + } + } + + pub(crate) async fn run(self, input_format: Format, sender: Sender) -> Result<()> { use Subcommand::*; match self { @@ -43,11 +74,15 @@ impl Subcommand { Search(args) => Subcommand::search(args, sender).await?, Serve(args) => Subcommand::serve(args, sender).await?, Sort(args) => { - let value = Subcommand::sort(args).await?; + let value = Subcommand::sort(args, input_format).await?; + sender.send(value.into()).await?; + } + Translate(args) => { + let value = Subcommand::translate(args, input_format).await?; sender.send(value.into()).await?; } Validate(args) => { - if let Err(err) = Subcommand::validate(args).await { + if let Err(err) = Subcommand::validate(args, input_format).await { match err { Error::Validation(errors) => { for error in errors { diff --git a/stac-cli/src/subcommand/sort.rs b/stac-cli/src/subcommand/sort.rs index 12bb7e7fa..5b9eaafa2 100644 --- a/stac-cli/src/subcommand/sort.rs +++ b/stac-cli/src/subcommand/sort.rs @@ -1,9 +1,9 @@ -use crate::{Result, SortArgs, Subcommand}; +use crate::{Format, Result, SortArgs, Subcommand}; use stac::Value; impl Subcommand { /// Sorts a STAC value. - pub(crate) async fn sort(args: SortArgs) -> Result { - crate::io::read_href(args.href.as_deref()).await + pub(crate) async fn sort(args: SortArgs, input_format: Format) -> Result { + input_format.read_href(args.infile.as_deref()).await } } diff --git a/stac-cli/src/subcommand/translate.rs b/stac-cli/src/subcommand/translate.rs new file mode 100644 index 000000000..be7096c90 --- /dev/null +++ b/stac-cli/src/subcommand/translate.rs @@ -0,0 +1,8 @@ +use crate::{Format, Result, Subcommand, TranslateArgs}; +use stac::Value; + +impl Subcommand { + pub(crate) async fn translate(args: TranslateArgs, input_format: Format) -> Result { + input_format.read_href(args.infile.as_deref()).await + } +} diff --git a/stac-cli/src/subcommand/validate.rs b/stac-cli/src/subcommand/validate.rs index d65df1d7b..4516ae37d 100644 --- a/stac-cli/src/subcommand/validate.rs +++ b/stac-cli/src/subcommand/validate.rs @@ -1,11 +1,11 @@ -use crate::{Error, Result, Subcommand, ValidateArgs}; +use crate::{Error, Format, Result, Subcommand, ValidateArgs}; use serde_json::json; use stac_validate::Validate; impl Subcommand { /// Validates a STAC value. - pub async fn validate(args: ValidateArgs) -> Result<()> { - let value: serde_json::Value = crate::io::read_href(args.href.as_deref()).await?; + pub async fn validate(args: ValidateArgs, input_format: Format) -> Result<()> { + let value: serde_json::Value = input_format.read_href(args.href.as_deref()).await?; let mut errors: Vec = Vec::new(); let mut update_errors = |result: std::result::Result<(), stac_validate::Error>| match result { diff --git a/stac-geoparquet/Cargo.toml b/stac-geoparquet/Cargo.toml new file mode 100644 index 000000000..e063fc37d --- /dev/null +++ b/stac-geoparquet/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "stac-geoparquet" +version = "0.1.0" +edition = "2021" + +[features] +default = ["compression"] +compression = [ + "geoarrow/parquet_compression", + "parquet/snap", + "parquet/brotli", + "parquet/flate2", + "parquet/lz4", + "parquet/zstd", +] + +[dependencies] +geoarrow = { git = "https://github.com/geoarrow/geoarrow-rs", rev = "476562b3da7dde9cd324fc5bf5ceb5451f76c451", features = [ + "parquet", +] } +parquet = { version = "52", default-features = false } +stac = { version = "0.7", path = "../stac" } +stac-arrow = { version = "0.1", path = "../stac-arrow" } +thiserror = "1" diff --git a/stac-geoparquet/data b/stac-geoparquet/data new file mode 120000 index 000000000..188ec7632 --- /dev/null +++ b/stac-geoparquet/data @@ -0,0 +1 @@ +../spec-examples/v1.0.0 \ No newline at end of file diff --git a/stac-geoparquet/examples/extended-item.parquet b/stac-geoparquet/examples/extended-item.parquet new file mode 100644 index 000000000..d7bb47e6f Binary files /dev/null and b/stac-geoparquet/examples/extended-item.parquet differ diff --git a/stac-geoparquet/src/lib.rs b/stac-geoparquet/src/lib.rs new file mode 100644 index 000000000..5371b034f --- /dev/null +++ b/stac-geoparquet/src/lib.rs @@ -0,0 +1,113 @@ +use parquet::file::reader::ChunkReader; +use stac::{ItemCollection, Value}; +use std::io::Write; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + /// [geoarrow::error::GeoArrowError] + #[error(transparent)] + GeoArrow(#[from] geoarrow::error::GeoArrowError), + + /// [stac_arrow::Error] + #[error(transparent)] + StacArrow(#[from] stac_arrow::Error), + + /// This STAC type is not supported by stac-geoparquet. + #[error("unsupported type: {0}")] + UnsupportedType(&'static str), +} + +pub type Result = std::result::Result; + +/// Writes a [stac::Value] to a [std::io::Write] as +/// [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet). +/// +/// Currently, will throw an error if the value is not an item or an item +/// collection. +/// +/// # Examples +/// +/// ``` +/// use std::io::Cursor; +/// use stac::Item; +/// +/// let item: Item = stac::read("data/simple-item.json").unwrap(); +/// let mut cursor = Cursor::new(Vec::new()); +/// stac_geoparquet::to_writer(&mut cursor, item.into()).unwrap(); +/// ``` +pub fn to_writer(writer: W, value: Value) -> Result<()> +where + W: Write + Send, +{ + match value { + Value::ItemCollection(item_collection) => { + let mut table = stac_arrow::to_table(item_collection)?; + geoarrow::io::parquet::write_geoparquet(&mut table, writer, &Default::default()) + .map_err(Error::from) + } + Value::Item(item) => to_writer(writer, ItemCollection::from(vec![item.clone()]).into()), + _ => Err(Error::UnsupportedType(value.type_name())), + } +} + +/// Reads a [stac::ItemCollection] from a [std::io::Read] as +/// [stac-geoparquet](https://github.com/stac-utils/stac-geoparquet). +/// +/// # Examples +/// +/// ``` +/// use std::fs::File; +/// +/// let file = File::open("examples/extended-item.parquet").unwrap(); +/// let item_collection = stac_geoparquet::from_reader(file).unwrap(); +/// ``` +pub fn from_reader(reader: R) -> Result +where + R: ChunkReader + 'static, +{ + let table = geoarrow::io::parquet::read_geoparquet(reader, Default::default())?; + stac_arrow::from_table(table).map_err(Error::from) +} + +#[cfg(test)] +mod tests { + use stac::ItemCollection; + use std::{fs::File, io::Cursor}; + + #[test] + fn to_writer_catalog() { + let mut cursor = Cursor::new(Vec::new()); + let catalog = stac::read("data/catalog.json").unwrap(); + super::to_writer(&mut cursor, catalog).unwrap_err(); + } + + #[test] + fn to_writer_collection() { + let mut cursor = Cursor::new(Vec::new()); + let collection = stac::read("data/collection.json").unwrap(); + super::to_writer(&mut cursor, collection).unwrap_err(); + } + + #[test] + fn to_writer_item_collection() { + let mut cursor = Cursor::new(Vec::new()); + let item = stac::read("data/simple-item.json").unwrap(); + let item_collection = ItemCollection::from(vec![item]); + super::to_writer(&mut cursor, item_collection.into()).unwrap(); + } + + #[test] + fn to_writer_item() { + let mut cursor = Cursor::new(Vec::new()); + let item = stac::read("data/simple-item.json").unwrap(); + super::to_writer(&mut cursor, item).unwrap(); + } + + #[test] + fn from_reader() { + let file = File::open("examples/extended-item.parquet").unwrap(); + let item_collection = super::from_reader(file).unwrap(); + assert_eq!(item_collection.items.len(), 1); + } +}