From 475c2af412ab7c9ede3548a19bc90f1dcaf8540e Mon Sep 17 00:00:00 2001 From: Markus Klein Date: Sat, 29 Jun 2024 21:10:45 +0200 Subject: [PATCH] refactor: seperate connection and query in reader --- python/arrow_odbc/reader.py | 45 +++++++++++++++++++++---------------- src/reader.rs | 24 +++++++++++++++----- 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/python/arrow_odbc/reader.py b/python/arrow_odbc/reader.py index ef6dcd0..966d360 100644 --- a/python/arrow_odbc/reader.py +++ b/python/arrow_odbc/reader.py @@ -35,7 +35,7 @@ class _BatchReaderRaii: def __init__(self): reader_out = ffi.new("ArrowOdbcReader **") - lib.arrow_odbc_reader_make_empty(reader_out) + lib.arrow_odbc_reader_make(reader_out) # We take ownership of the corresponding reader written in Rust and keep it alive until # `self` is deleted. self.handle = reader_out[0] @@ -68,15 +68,31 @@ def into_concurrent(self): error = lib.arrow_odbc_reader_into_concurrent(self.handle) raise_on_error(error) - def query( + def connect( self, - query: str, connection_string: str, user: Optional[str], password: Optional[str], - parameters: Optional[List[Optional[str]]], login_timeout_sec: Optional[int], packet_size: Optional[int], + ): + connection = connect_to_database( + connection_string, user, password, login_timeout_sec, packet_size + ) + + # Connecting to the database has been successful. Note that connection does not truly take + # ownership of the connection. If it runs out of scope (e.g. due to a raised exception) the + # connection would not be closed and its associated resources would not be freed. + # However, this is fine since everything from here on out until we call + # arrow_odbc_reader_set_connection is infalliable. arrow_odbc_reader_connection will truly + # take ownership of the connection. + + lib.arrow_odbc_reader_set_connection(self.handle, connection) + + def query( + self, + query: str, + parameters: Optional[List[Optional[str]]], ): query_bytes = query.encode("utf-8") @@ -99,24 +115,12 @@ def query( # payload is just referenced. encoded_parameters = [to_bytes_and_len(p) for p in parameters] - connection = connect_to_database( - connection_string, user, password, login_timeout_sec, packet_size - ) - - # Connecting to the database has been successful. Note that connection does not truly take - # ownership of the connection. If it runs out of scope (e.g. due to a raised exception) the - # connection would not be closed and its associated resources would not be freed. - # However, this is fine since everything from here on out until we call - # arrow_odbc_reader_make is infalliable. arrow_odbc_reader_query will truly take ownership - # of the connection. Even if it should fail, it will be closed correctly. - for p_index in range(0, parameters_len): (p_bytes, p_len) = encoded_parameters[p_index] parameters_array[p_index] = lib.arrow_odbc_parameter_string_make(p_bytes, p_len) error = lib.arrow_odbc_reader_query( self.handle, - connection, query_bytes, len(query_bytes), parameters_array, @@ -492,16 +496,19 @@ def read_arrow_batches_from_odbc( """ reader = _BatchReaderRaii() - reader.query( - query=query, + reader.connect( connection_string=connection_string, user=user, password=password, - parameters=parameters, login_timeout_sec=login_timeout_sec, packet_size=packet_size, ) + reader.query( + query=query, + parameters=parameters, + ) + if max_text_size is None: max_text_size = 0 if max_binary_size is None: diff --git a/src/reader.rs b/src/reader.rs index 901989b..f256ad6 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -17,6 +17,24 @@ use crate::{parameter::ArrowOdbcParameter, try_, ArrowOdbcError, OdbcConnection} pub use self::arrow_odbc_reader::ArrowOdbcReader; +/// Set a reader into connection state +/// +/// # Safety +/// +/// * `reader` must point to a valid reader in empty state. +/// * `connection` must point to a valid OdbcConnection. This function takes ownership of the +/// connection, even in case of an error. So The connection must not be freed explicitly +/// afterwards. + +#[no_mangle] +pub unsafe extern "C" fn arrow_odbc_reader_set_connection( + mut reader: NonNull, + connection: NonNull, +) { + let connection = *Box::from_raw(connection.as_ptr()); + reader.as_mut().set_connection(connection.0); +} + /// Creates an Arrow ODBC reader instance. /// /// Executes the SQL Query and moves the reader into cursor state. @@ -48,7 +66,6 @@ pub use self::arrow_odbc_reader::ArrowOdbcReader; #[no_mangle] pub unsafe extern "C" fn arrow_odbc_reader_query( mut reader: NonNull, - connection: NonNull, query_buf: *const u8, query_len: usize, parameters: *const *mut ArrowOdbcParameter, @@ -58,8 +75,6 @@ pub unsafe extern "C" fn arrow_odbc_reader_query( let query = slice::from_raw_parts(query_buf, query_len); let query = str::from_utf8(query).unwrap(); - let connection = *Box::from_raw(connection.as_ptr()); - let parameters = if parameters.is_null() { Vec::new() } else { @@ -69,7 +84,6 @@ pub unsafe extern "C" fn arrow_odbc_reader_query( .collect() }; - reader.as_mut().set_connection(connection.0); try_!(reader.as_mut().promote_to_cursor(query, ¶meters[..])); null_mut() // Ok(()) @@ -83,7 +97,7 @@ pub unsafe extern "C" fn arrow_odbc_reader_query( /// * `reader_out` will point to an instance of `ArrowOdbcReader`. Ownership is transferred to the /// caller. #[no_mangle] -pub unsafe extern "C" fn arrow_odbc_reader_make_empty(reader_out: *mut *mut ArrowOdbcReader) { +pub unsafe extern "C" fn arrow_odbc_reader_make(reader_out: *mut *mut ArrowOdbcReader) { let reader = ArrowOdbcReader::empty(); *reader_out = Box::into_raw(Box::new(reader)); }