Skip to content

Commit

Permalink
refactor: seperate connection and query in reader
Browse files Browse the repository at this point in the history
  • Loading branch information
pacman82 committed Jun 29, 2024
1 parent 30db68c commit 475c2af
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 24 deletions.
45 changes: 26 additions & 19 deletions python/arrow_odbc/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class _BatchReaderRaii:

def __init__(self):
reader_out = ffi.new("ArrowOdbcReader **")
lib.arrow_odbc_reader_make_empty(reader_out)
lib.arrow_odbc_reader_make(reader_out)
# We take ownership of the corresponding reader written in Rust and keep it alive until
# `self` is deleted.
self.handle = reader_out[0]
Expand Down Expand Up @@ -68,15 +68,31 @@ def into_concurrent(self):
error = lib.arrow_odbc_reader_into_concurrent(self.handle)
raise_on_error(error)

def query(
def connect(
self,
query: str,
connection_string: str,
user: Optional[str],
password: Optional[str],
parameters: Optional[List[Optional[str]]],
login_timeout_sec: Optional[int],
packet_size: Optional[int],
):
connection = connect_to_database(
connection_string, user, password, login_timeout_sec, packet_size
)

# Connecting to the database has been successful. Note that connection does not truly take
# ownership of the connection. If it runs out of scope (e.g. due to a raised exception) the
# connection would not be closed and its associated resources would not be freed.
# However, this is fine since everything from here on out until we call
# arrow_odbc_reader_set_connection is infalliable. arrow_odbc_reader_connection will truly
# take ownership of the connection.

lib.arrow_odbc_reader_set_connection(self.handle, connection)

def query(
self,
query: str,
parameters: Optional[List[Optional[str]]],
):
query_bytes = query.encode("utf-8")

Expand All @@ -99,24 +115,12 @@ def query(
# payload is just referenced.
encoded_parameters = [to_bytes_and_len(p) for p in parameters]

connection = connect_to_database(
connection_string, user, password, login_timeout_sec, packet_size
)

# Connecting to the database has been successful. Note that connection does not truly take
# ownership of the connection. If it runs out of scope (e.g. due to a raised exception) the
# connection would not be closed and its associated resources would not be freed.
# However, this is fine since everything from here on out until we call
# arrow_odbc_reader_make is infalliable. arrow_odbc_reader_query will truly take ownership
# of the connection. Even if it should fail, it will be closed correctly.

for p_index in range(0, parameters_len):
(p_bytes, p_len) = encoded_parameters[p_index]
parameters_array[p_index] = lib.arrow_odbc_parameter_string_make(p_bytes, p_len)

error = lib.arrow_odbc_reader_query(
self.handle,
connection,
query_bytes,
len(query_bytes),
parameters_array,
Expand Down Expand Up @@ -492,16 +496,19 @@ def read_arrow_batches_from_odbc(
"""
reader = _BatchReaderRaii()

reader.query(
query=query,
reader.connect(
connection_string=connection_string,
user=user,
password=password,
parameters=parameters,
login_timeout_sec=login_timeout_sec,
packet_size=packet_size,
)

reader.query(
query=query,
parameters=parameters,
)

if max_text_size is None:
max_text_size = 0
if max_binary_size is None:
Expand Down
24 changes: 19 additions & 5 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@ use crate::{parameter::ArrowOdbcParameter, try_, ArrowOdbcError, OdbcConnection}
pub use self::arrow_odbc_reader::ArrowOdbcReader;


/// Set a reader into connection state
///
/// # Safety
///
/// * `reader` must point to a valid reader in empty state.
/// * `connection` must point to a valid OdbcConnection. This function takes ownership of the
/// connection, even in case of an error. So The connection must not be freed explicitly
/// afterwards.
#[no_mangle]
pub unsafe extern "C" fn arrow_odbc_reader_set_connection(
mut reader: NonNull<ArrowOdbcReader>,
connection: NonNull<OdbcConnection>,
) {
let connection = *Box::from_raw(connection.as_ptr());
reader.as_mut().set_connection(connection.0);
}

/// Creates an Arrow ODBC reader instance.
///
/// Executes the SQL Query and moves the reader into cursor state.
Expand Down Expand Up @@ -48,7 +66,6 @@ pub use self::arrow_odbc_reader::ArrowOdbcReader;
#[no_mangle]
pub unsafe extern "C" fn arrow_odbc_reader_query(
mut reader: NonNull<ArrowOdbcReader>,
connection: NonNull<OdbcConnection>,
query_buf: *const u8,
query_len: usize,
parameters: *const *mut ArrowOdbcParameter,
Expand All @@ -58,8 +75,6 @@ pub unsafe extern "C" fn arrow_odbc_reader_query(
let query = slice::from_raw_parts(query_buf, query_len);
let query = str::from_utf8(query).unwrap();

let connection = *Box::from_raw(connection.as_ptr());

let parameters = if parameters.is_null() {
Vec::new()
} else {
Expand All @@ -69,7 +84,6 @@ pub unsafe extern "C" fn arrow_odbc_reader_query(
.collect()
};

reader.as_mut().set_connection(connection.0);
try_!(reader.as_mut().promote_to_cursor(query, &parameters[..]));

null_mut() // Ok(())
Expand All @@ -83,7 +97,7 @@ pub unsafe extern "C" fn arrow_odbc_reader_query(
/// * `reader_out` will point to an instance of `ArrowOdbcReader`. Ownership is transferred to the
/// caller.
#[no_mangle]
pub unsafe extern "C" fn arrow_odbc_reader_make_empty(reader_out: *mut *mut ArrowOdbcReader) {
pub unsafe extern "C" fn arrow_odbc_reader_make(reader_out: *mut *mut ArrowOdbcReader) {
let reader = ArrowOdbcReader::empty();
*reader_out = Box::into_raw(Box::new(reader));
}
Expand Down

0 comments on commit 475c2af

Please sign in to comment.