Skip to content

Commit

Permalink
feat(python, rust): add "raise_if_empty" flag to read_excel, `read_…
Browse files Browse the repository at this point in the history
…csv`, `scan_csv`, and `read_csv_batched` (pola-rs#10409)
  • Loading branch information
alexander-beedie authored Aug 14, 2023
1 parent 4c677d8 commit 67e3148
Show file tree
Hide file tree
Showing 26 changed files with 306 additions and 118 deletions.
11 changes: 11 additions & 0 deletions crates/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ where
skip_rows_after_header: usize,
try_parse_dates: bool,
row_count: Option<RowCount>,
raise_if_empty: bool,
}

impl<'a, R> CsvReader<'a, R>
Expand Down Expand Up @@ -295,6 +296,12 @@ where
self
}

/// Raise an error if CSV is empty (otherwise return an empty frame)
pub fn raise_if_empty(mut self, toggle: bool) -> Self {
self.raise_if_empty = toggle;
self
}

/// Reduce memory consumption at the expense of performance
pub fn low_memory(mut self, toggle: bool) -> Self {
self.low_memory = toggle;
Expand Down Expand Up @@ -366,6 +373,7 @@ impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
self.skip_rows_after_header,
std::mem::take(&mut self.row_count),
self.try_parse_dates,
self.raise_if_empty,
)
}

Expand Down Expand Up @@ -476,6 +484,7 @@ impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
self.eol_char,
self.null_values.as_ref(),
self.try_parse_dates,
self.raise_if_empty,
)?;
let schema = Arc::new(inferred_schema);
Ok(to_batched_owned_mmap(self, schema))
Expand Down Expand Up @@ -504,6 +513,7 @@ impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
self.eol_char,
self.null_values.as_ref(),
self.try_parse_dates,
self.raise_if_empty,
)?;
let schema = Arc::new(inferred_schema);
Ok(to_batched_owned_read(self, schema))
Expand Down Expand Up @@ -547,6 +557,7 @@ where
skip_rows_after_header: 0,
try_parse_dates: false,
row_count: None,
raise_if_empty: true,
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl<'a> CoreReader<'a> {
skip_rows_after_header: usize,
row_count: Option<RowCount>,
try_parse_dates: bool,
raise_if_empty: bool,
) -> PolarsResult<CoreReader<'a>> {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
let mut reader_bytes = reader_bytes;
Expand Down Expand Up @@ -235,6 +236,7 @@ impl<'a> CoreReader<'a> {
eol_char,
null_values.as_ref(),
try_parse_dates,
raise_if_empty,
)?;
Arc::new(inferred_schema)
}
Expand Down
11 changes: 10 additions & 1 deletion crates/polars-io/src/csv/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub fn infer_file_schema_inner(
null_values: Option<&NullValues>,
try_parse_dates: bool,
recursion_count: u8,
raise_if_empty: bool,
) -> PolarsResult<(Schema, usize, usize)> {
// keep track so that we can determine the amount of bytes read
let start_ptr = reader_bytes.as_ptr() as usize;
Expand All @@ -201,7 +202,9 @@ pub fn infer_file_schema_inner(
let encoding = CsvEncoding::LossyUtf8;

let bytes = skip_line_ending(skip_bom(reader_bytes), eol_char);
polars_ensure!(!bytes.is_empty(), NoData: "empty CSV");
if raise_if_empty {
polars_ensure!(!bytes.is_empty(), NoData: "empty CSV");
};
let mut lines = SplitLines::new(bytes, quote_char.unwrap_or(b'"'), eol_char).skip(*skip_rows);
// it can be that we have a single line without eol char
let has_eol = bytes.contains(&eol_char);
Expand Down Expand Up @@ -301,7 +304,10 @@ pub fn infer_file_schema_inner(
null_values,
try_parse_dates,
recursion_count + 1,
raise_if_empty,
);
} else if !raise_if_empty {
return Ok((Schema::new(), 0, 0));
} else {
polars_bail!(NoData: "empty CSV");
};
Expand Down Expand Up @@ -484,6 +490,7 @@ pub fn infer_file_schema_inner(
null_values,
try_parse_dates,
recursion_count + 1,
raise_if_empty,
);
}

Expand Down Expand Up @@ -515,6 +522,7 @@ pub fn infer_file_schema(
eol_char: u8,
null_values: Option<&NullValues>,
try_parse_dates: bool,
raise_if_empty: bool,
) -> PolarsResult<(Schema, usize, usize)> {
infer_file_schema_inner(
reader_bytes,
Expand All @@ -530,6 +538,7 @@ pub fn infer_file_schema(
null_values,
try_parse_dates,
0,
raise_if_empty,
)
}

Expand Down
14 changes: 13 additions & 1 deletion crates/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct LazyCsvReader<'a> {
encoding: CsvEncoding,
row_count: Option<RowCount>,
try_parse_dates: bool,
raise_if_empty: bool,
}

#[cfg(feature = "csv")]
Expand Down Expand Up @@ -59,6 +60,7 @@ impl<'a> LazyCsvReader<'a> {
encoding: CsvEncoding::Utf8,
row_count: None,
try_parse_dates: false,
raise_if_empty: true,
}
}

Expand Down Expand Up @@ -191,13 +193,21 @@ impl<'a> LazyCsvReader<'a> {
self
}

/// Automatically try to parse dates/ datetimes and time. If parsing fails, columns remain of dtype `[DataType::Utf8]`.
/// Automatically try to parse dates/datetimes and time.
/// If parsing fails, columns remain of dtype `[DataType::Utf8]`.
#[cfg(feature = "temporal")]
pub fn with_try_parse_dates(mut self, toggle: bool) -> Self {
self.try_parse_dates = toggle;
self
}

/// Raise an error if CSV is empty (otherwise return an empty frame)
#[must_use]
pub fn raise_if_empty(mut self, toggle: bool) -> Self {
self.raise_if_empty = toggle;
self
}

/// Modify a schema before we run the lazy scanning.
///
/// Important! Run this function latest in the builder!
Expand Down Expand Up @@ -231,6 +241,7 @@ impl<'a> LazyCsvReader<'a> {
self.eol_char,
None,
self.try_parse_dates,
self.raise_if_empty,
)?;
let mut schema = f(schema)?;

Expand Down Expand Up @@ -268,6 +279,7 @@ impl LazyFileListReader for LazyCsvReader<'_> {
self.encoding,
self.row_count,
self.try_parse_dates,
self.raise_if_empty,
)?
.build()
.into();
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl CsvExec {
.with_rechunk(self.file_options.rechunk)
.with_row_count(std::mem::take(&mut self.file_options.row_count))
.with_try_parse_dates(self.options.try_parse_dates)
.raise_if_empty(self.options.raise_if_empty)
.finish()
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl CsvSource {
.with_rechunk(false)
.with_chunk_size(chunk_size)
.with_row_count(file_options.row_count)
.with_try_parse_dates(options.try_parse_dates);
.with_try_parse_dates(options.try_parse_dates)
.raise_if_empty(options.raise_if_empty);

let reader = Box::new(reader);
let reader = Box::leak(reader) as *mut CsvReader<'static, File>;
Expand Down
10 changes: 8 additions & 2 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl LogicalPlanBuilder {
encoding: CsvEncoding,
row_count: Option<RowCount>,
try_parse_dates: bool,
raise_if_empty: bool,
) -> PolarsResult<Self> {
let path = path.into();
let mut file = polars_utils::open_file(&path).map_err(|e| {
Expand All @@ -262,9 +263,12 @@ impl LogicalPlanBuilder {
polars_err!(ComputeError: "error open file: {}, {}", path, e)
}
})?;

let mut magic_nr = [0u8; 2];
file.read_exact(&mut magic_nr)
.map_err(|_| polars_err!(NoData: "empty csv"))?;
let res = file.read_exact(&mut magic_nr);
if raise_if_empty {
res.map_err(|_| polars_err!(NoData: "empty CSV"))?;
};
polars_ensure!(
!is_compressed(&magic_nr),
ComputeError: "cannot scan compressed csv; use `read_csv` for compressed data",
Expand All @@ -287,6 +291,7 @@ impl LogicalPlanBuilder {
eol_char,
null_values.as_ref(),
try_parse_dates,
raise_if_empty,
)?;

if let Some(rc) = &row_count {
Expand Down Expand Up @@ -340,6 +345,7 @@ impl LogicalPlanBuilder {
null_values,
encoding,
try_parse_dates,
raise_if_empty,
},
},
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct CsvParserOptions {
pub null_values: Option<NullValues>,
pub encoding: CsvEncoding,
pub try_parse_dates: bool,
pub raise_if_empty: bool,
}

#[cfg(feature = "parquet")]
Expand Down
11 changes: 9 additions & 2 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ def _read_csv(
row_count_offset: int = 0,
sample_size: int = 1024,
eol_char: str = "\n",
raise_if_empty: bool = True,
) -> DataFrame:
"""
Read a CSV file into a DataFrame.
Expand Down Expand Up @@ -744,6 +745,7 @@ def _read_csv(
row_count_name=row_count_name,
row_count_offset=row_count_offset,
eol_char=eol_char,
raise_if_empty=raise_if_empty,
)
if columns is None:
return scan.collect()
Expand Down Expand Up @@ -784,6 +786,7 @@ def _read_csv(
_prepare_row_count_args(row_count_name, row_count_offset),
sample_size=sample_size,
eol_char=eol_char,
raise_if_empty=raise_if_empty,
)
return self

Expand Down Expand Up @@ -872,7 +875,9 @@ def _read_avro(
Parameters
----------
source
Path to a file or a file-like object.
Path to a file or a file-like object (by file-like object, we refer to
objects that have a ``read()`` method, such as a file handler (e.g.
via builtin ``open`` function) or ``BytesIO``).
columns
Columns.
n_rows
Expand Down Expand Up @@ -906,7 +911,9 @@ def _read_ipc(
Parameters
----------
source
Path to a file or a file-like object.
Path to a file or a file-like object (by file-like object, we refer to
objects that have a ``read()`` method, such as a file handler (e.g.
via builtin ``open`` function) or ``BytesIO``).
columns
Columns to select. Accepts a list of column indices (starting at zero) or a
list of column names.
Expand Down
24 changes: 19 additions & 5 deletions py-polars/polars/io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _prepare_file_arg(
file: str | list[str] | TextIO | Path | BinaryIO | bytes,
encoding: str | None = None,
use_pyarrow: bool | None = None,
raise_if_empty: bool = True,
**kwargs: Any,
) -> ContextManager[str | BinaryIO | list[str] | list[BinaryIO]]:
"""
Expand Down Expand Up @@ -102,15 +103,19 @@ def managed_file(file: Any) -> Iterator[Any]:
return _check_empty(
BytesIO(file.decode(encoding_str).encode("utf8")),
context="bytes",
raise_if_empty=raise_if_empty,
)
if use_pyarrow:
return _check_empty(BytesIO(file), context="bytes")
return _check_empty(
BytesIO(file), context="bytes", raise_if_empty=raise_if_empty
)

if isinstance(file, StringIO):
return _check_empty(
BytesIO(file.read().encode("utf8")),
context="StringIO",
read_position=file.tell(),
raise_if_empty=raise_if_empty,
)

if isinstance(file, BytesIO):
Expand All @@ -119,12 +124,14 @@ def managed_file(file: Any) -> Iterator[Any]:
BytesIO(file.read().decode(encoding_str).encode("utf8")),
context="BytesIO",
read_position=file.tell(),
raise_if_empty=raise_if_empty,
)
return managed_file(
_check_empty(
b=file,
context="BytesIO",
read_position=file.tell(),
raise_if_empty=raise_if_empty,
)
)

Expand All @@ -133,6 +140,7 @@ def managed_file(file: Any) -> Iterator[Any]:
return _check_empty(
BytesIO(file.read_bytes().decode(encoding_str).encode("utf8")),
context=f"Path ({file!r})",
raise_if_empty=raise_if_empty,
)
return managed_file(normalise_filepath(file, check_not_dir))

Expand All @@ -153,7 +161,9 @@ def managed_file(file: Any) -> Iterator[Any]:
# decode first
with Path(file).open(encoding=encoding_str) as f:
return _check_empty(
BytesIO(f.read().encode("utf8")), context=f"{file!r}"
BytesIO(f.read().encode("utf8")),
context=f"{file!r}",
raise_if_empty=raise_if_empty,
)
# non-local file
if "*" in file:
Expand Down Expand Up @@ -184,14 +194,18 @@ def managed_file(file: Any) -> Iterator[Any]:
if not has_utf8_utf8_lossy_encoding:
with Path(file).open(encoding=encoding_str) as f:
return _check_empty(
BytesIO(f.read().encode("utf8")), context=f"{file!r}"
BytesIO(f.read().encode("utf8")),
context=f"{file!r}",
raise_if_empty=raise_if_empty,
)

return managed_file(file)


def _check_empty(b: BytesIO, context: str, read_position: int | None = None) -> BytesIO:
if not b.getbuffer().nbytes:
def _check_empty(
b: BytesIO, *, context: str, raise_if_empty: bool, read_position: int | None = None
) -> BytesIO:
if raise_if_empty and not b.getbuffer().nbytes:
hint = (
f" (buffer position = {read_position}; try seek(0) before reading?)"
if context in ("StringIO", "BytesIO") and read_position
Expand Down
4 changes: 3 additions & 1 deletion py-polars/polars/io/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def read_avro(
Parameters
----------
source
Path to a file or a file-like object.
Path to a file or a file-like object (by file-like object, we refer to objects
that have a ``read()`` method, such as a file handler (e.g. via builtin ``open``
function) or ``BytesIO``).
columns
Columns to select. Accepts a list of column indices (starting at zero) or a list
of column names.
Expand Down
Loading

0 comments on commit 67e3148

Please sign in to comment.