Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36954: [Python] Add more FlightInfo / FlightEndpoint attributes #43537

Merged
merged 32 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
47a3902
Add ordered and app_metadata attributes to PyArrow FlightInfo
EnricoMi Jul 19, 2024
bfca244
Fix code style
EnricoMi Jul 19, 2024
bd53f7f
Use same formatting in __repr__ as in cpp ToString
EnricoMi Jul 22, 2024
5555bad
Allow for Python str app_metadata
EnricoMi Jul 26, 2024
1e87b19
Make ordered and app_metadata optional
EnricoMi Jul 26, 2024
911e991
Make total_records and total_bytes optional, support None
EnricoMi Jul 26, 2024
9ab0624
Add expiration_time and app_metadata attributes to PyArrow FlightEndp…
EnricoMi Jul 24, 2024
e54872b
Fix code style
EnricoMi Jul 24, 2024
eb98843
Use microseconds precision to work with macOS
EnricoMi Jul 25, 2024
2382b0e
Fix code style
EnricoMi Jul 25, 2024
7501924
Add license to chrono.pxd
EnricoMi Jul 25, 2024
4355408
Allow for Python str app_metadata
EnricoMi Jul 26, 2024
4c09c13
Make expiration_time and app_metadata optional
EnricoMi Jul 26, 2024
60ec782
Make CTimestampType timezone constructor arg const
EnricoMi Jul 29, 2024
336b3e4
Remove .hex, use bool __repr__, use -1 not None default value
EnricoMi Aug 3, 2024
176347a
Support nanoseconds, cast expiration_time in FlightEndpoint() to time…
EnricoMi Aug 5, 2024
29ad932
Test FlightEndpoint with nanoseconds
EnricoMi Aug 6, 2024
d30bc6c
Add docstrings
EnricoMi Aug 6, 2024
428162d
Use None as default for total_records and total_bytes
EnricoMi Aug 30, 2024
86e3fad
Simplify code with TimePoint_from_ns and TimePoint_to_ns
EnricoMi Aug 30, 2024
08b1a58
Remove chrono.pxd, use existing CTimePoint instead
EnricoMi Aug 30, 2024
9dc0e75
Fix Flight Timestamp definition to OS-agnostic nanoseconds
EnricoMi Sep 10, 2024
989f7a2
Revert "Fix Flight Timestamp definition to OS-agnostic nanoseconds"
EnricoMi Sep 12, 2024
08d6b48
Fix conversion to and from system time
adamreeve Sep 10, 2024
985b115
Fix lint error
EnricoMi Sep 12, 2024
d264ce1
Fix docstring typo
EnricoMi Sep 18, 2024
bf62c5d
Emphasize `app_metadata` non-nullability in docstring
EnricoMi Sep 18, 2024
818660f
Apply suggestions from code review
EnricoMi Sep 23, 2024
c88fdd5
Apply review suggestion around CTimestampType with timezone
EnricoMi Sep 23, 2024
828070e
Cleanup after committing suggestions
EnricoMi Sep 24, 2024
d2d7cab
Check argument types in FlightEndpoint()
EnricoMi Oct 2, 2024
94a40b2
Add comments on timestamp cast needs
EnricoMi Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/examples/flight/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def list_flights(args, client, connection_args={}):
else:
print("Unknown")

print(f"Data are {'ordered' if flight.ordered else 'not ordered'}")
print("App metadata:", flight.app_metadata)

print("Number of endpoints:", len(flight.endpoints))
print("Schema:")
print(flight.schema)
Expand Down
108 changes: 95 additions & 13 deletions python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ from libcpp cimport bool as c_bool
from pyarrow.lib cimport *
from pyarrow.lib import (ArrowCancelled, ArrowException, ArrowInvalid,
SignalStopHandler)
from pyarrow.lib import as_buffer, frombytes, tobytes
from pyarrow.lib import as_buffer, frombytes, timestamp, tobytes
from pyarrow.includes.libarrow_flight cimport *
from pyarrow.ipc import _get_legacy_format_default, _ReadPandasMixin
import pyarrow.lib as lib
Expand Down Expand Up @@ -704,7 +704,7 @@ cdef class FlightEndpoint(_Weakrefable):
cdef:
CFlightEndpoint endpoint

def __init__(self, ticket, locations):
def __init__(self, ticket, locations, expiration_time=None, app_metadata=""):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __init__(self, ticket, locations, expiration_time=None, app_metadata=""):
def __init__(self, ticket, locations, TimestampScalar expiration_time=None, app_metadata=""):

If we require such a scalar, I would either enforce that through cython in the signature, or otherwise add a check in the code below that ensures a proper values is passed. For example, right now if you pass a stdlib datetime, you get "'datetime.datetime' object has no attribute 'cast'"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added type checks in 415aca7. Note none of the other arguments where checked, so I added that for all arguments. This includes tests.

"""Create a FlightEndpoint from a ticket and list of locations.

Parameters
Expand All @@ -713,6 +713,12 @@ cdef class FlightEndpoint(_Weakrefable):
the ticket needed to access this flight
locations : list of string URIs
locations where this flight is available
expiration_time : TimestampScalar, default None
Expiration time of this stream. If present, clients may assume
they can retry DoGet requests. Otherwise, clients should avoid
retrying DoGet requests.
app_metadata : bytes or str, default ""
Application-defined opaque metadata.

Raises
------
Expand All @@ -724,28 +730,75 @@ cdef class FlightEndpoint(_Weakrefable):

if isinstance(ticket, Ticket):
self.endpoint.ticket.ticket = tobytes(ticket.ticket)
else:
elif isinstance(ticket, (str, bytes)):
self.endpoint.ticket.ticket = tobytes(ticket)
else:
raise TypeError("Argument ticket must be a Ticket instance, string or bytes, "
"not '{}'".format(type(ticket)))

for location in locations:
if isinstance(location, Location):
c_location = (<Location> location).location
else:
elif isinstance(location, (str, bytes)):
c_location = CLocation()
check_flight_status(
CLocation.Parse(tobytes(location)).Value(&c_location))
else:
raise TypeError("Argument locations must contain Location instances, strings or bytes, "
"not '{}'".format(type(location)))
self.endpoint.locations.push_back(c_location)

if expiration_time is not None:
if isinstance(expiration_time, lib.TimestampScalar):
# Convert into OS-dependent std::chrono::system_clock::time_point from
# std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds>
# See Timestamp in cpp/src/arrow/flight/types.h
self.endpoint.expiration_time = TimePoint_to_system_time(TimePoint_from_ns(
expiration_time.cast(timestamp("ns")).value))
else:
raise TypeError("Argument expiration_time must be a TimestampScalar, "
"not '{}'".format(type(expiration_time)))

if not isinstance(app_metadata, (str, bytes)):
raise TypeError("Argument app_metadata must be a string or bytes, "
"not '{}'".format(type(app_metadata)))
self.endpoint.app_metadata = tobytes(app_metadata)
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved

@property
def ticket(self):
"""Get the ticket in this endpoint."""
return Ticket(self.endpoint.ticket.ticket)

@property
def locations(self):
"""Get locations where this flight is available."""
return [Location.wrap(location)
for location in self.endpoint.locations]

@property
def expiration_time(self):
lidavidm marked this conversation as resolved.
Show resolved Hide resolved
"""Get the expiration time of this stream.

If present, clients may assume they can retry DoGet requests.
Otherwise, clients should avoid retrying DoGet requests.

"""
cdef:
int64_t time_since_epoch
if self.endpoint.expiration_time.has_value():
time_since_epoch = TimePoint_to_ns(
# Convert from OS-dependent std::chrono::system_clock::time_point into
# std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds>
# See Timestamp in cpp/src/arrow/flight/types.h
TimePoint_from_system_time(self.endpoint.expiration_time.value()))
return lib.scalar(time_since_epoch, timestamp("ns", "UTC"))
return None

@property
def app_metadata(self):
"""Get application-defined opaque metadata."""
return self.endpoint.app_metadata

def serialize(self):
"""Get the wire-format representation of this type.

Expand All @@ -770,7 +823,9 @@ cdef class FlightEndpoint(_Weakrefable):

def __repr__(self):
return (f"<pyarrow.flight.FlightEndpoint ticket={self.ticket!r} "
f"locations={self.locations!r}>")
f"locations={self.locations!r} "
f"expiration_time={self.expiration_time} "
f"app_metadata={self.app_metadata}>")

def __eq__(self, FlightEndpoint other):
return self.endpoint == other.endpoint
Expand Down Expand Up @@ -844,7 +899,7 @@ cdef class FlightInfo(_Weakrefable):
return obj

def __init__(self, Schema schema, FlightDescriptor descriptor, endpoints,
total_records, total_bytes):
total_records=None, total_bytes=None, ordered=False, app_metadata=""):
"""Create a FlightInfo object from a schema, descriptor, and endpoints.

Parameters
Expand All @@ -855,10 +910,14 @@ cdef class FlightInfo(_Weakrefable):
the descriptor for this flight.
endpoints : list of FlightEndpoint
a list of endpoints where this flight is available.
total_records : int
the total records in this flight, or -1 if unknown
total_bytes : int
the total bytes in this flight, or -1 if unknown
total_records : int, default None
the total records in this flight, -1 or None if unknown.
total_bytes : int, default None
the total bytes in this flight, -1 or None if unknown.
ordered : boolean, default False
Whether endpoints are in the same order as the data.
app_metadata : bytes or str, default ""
Application-defined opaque metadata.
"""
cdef:
shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
Expand All @@ -874,8 +933,10 @@ cdef class FlightInfo(_Weakrefable):
check_flight_status(CreateFlightInfo(c_schema,
descriptor.descriptor,
c_endpoints,
total_records,
total_bytes, &self.info))
total_records if total_records is not None else -1,
total_bytes if total_bytes is not None else -1,
ordered,
tobytes(app_metadata), &self.info))

@property
def total_records(self):
Expand All @@ -887,6 +948,25 @@ cdef class FlightInfo(_Weakrefable):
"""The size in bytes of the data in this flight, or -1 if unknown."""
return self.info.get().total_bytes()

@property
def ordered(self):
"""Whether endpoints are in the same order as the data."""
return self.info.get().ordered()

@property
def app_metadata(self):
"""
Application-defined opaque metadata.

There is no inherent or required relationship between this and the
app_metadata fields in the FlightEndpoints or resulting FlightData
messages. Since this metadata is application-defined, a given
application could define there to be a relationship, but there is
none required by the spec.

"""
return self.info.get().app_metadata()

@property
def schema(self):
"""The schema of the data in this flight."""
Expand Down Expand Up @@ -950,7 +1030,9 @@ cdef class FlightInfo(_Weakrefable):
f"descriptor={self.descriptor} "
f"endpoints={self.endpoints} "
f"total_records={self.total_records} "
f"total_bytes={self.total_bytes}>")
f"total_bytes={self.total_bytes} "
f"ordered={self.ordered} "
f"app_metadata={self.app_metadata}>")


cdef class FlightStreamChunk(_Weakrefable):
Expand Down
23 changes: 23 additions & 0 deletions python/pyarrow/includes/chrono.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# distutils: language = c++


cdef extern from "<chrono>" namespace "std::chrono::system_clock":
cdef cppclass time_point:
pass
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow_flight.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.chrono cimport time_point


cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
Expand Down Expand Up @@ -134,6 +135,8 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:

CTicket ticket
vector[CLocation] locations
optional[time_point] expiration_time
c_string app_metadata

bint operator==(CFlightEndpoint)
CResult[c_string] SerializeToString()
Expand All @@ -146,6 +149,8 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
CFlightInfo(CFlightInfo info)
int64_t total_records()
int64_t total_bytes()
c_bool ordered()
c_string app_metadata()
CResult[shared_ptr[CSchema]] GetSchema(CDictionaryMemo* memo)
CFlightDescriptor& descriptor()
const vector[CFlightEndpoint]& endpoints()
Expand Down Expand Up @@ -608,6 +613,8 @@ cdef extern from "arrow/python/flight.h" namespace "arrow::py::flight" nogil:
vector[CFlightEndpoint] endpoints,
int64_t total_records,
int64_t total_bytes,
c_bool ordered,
const c_string& app_metadata,
unique_ptr[CFlightInfo]* out)

cdef CStatus CreateSchemaResult" arrow::py::flight::CreateSchemaResult"(
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/includes/libarrow_python.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# distutils: language = c++

from pyarrow.includes.chrono cimport time_point
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *

Expand Down Expand Up @@ -244,6 +245,9 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py::internal" nogil:
CTimePoint TimePoint_from_s(double val)
CTimePoint TimePoint_from_ns(int64_t val)

CTimePoint TimePoint_from_system_time(time_point val)
time_point TimePoint_to_system_time(CTimePoint val)

CResult[c_string] TzinfoToString(PyObject* pytzinfo)
CResult[PyObject*] StringToTzinfo(c_string)

Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/src/arrow/python/datetime.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ inline TimePoint TimePoint_from_ns(int64_t val) {
return TimePoint(TimePoint::duration(val));
}

ARROW_PYTHON_EXPORT
// Note: Needed by FlightEndpoint.expiration_time, which is an OS-dependent
// std::chrono::system_clock::time_point
inline std::chrono::system_clock::time_point TimePoint_to_system_time(TimePoint val) {
return std::chrono::time_point_cast<std::chrono::system_clock::duration>(val);
Comment on lines +150 to +151
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand this code: what does this exactly do? Because my reading is that TimePoint type is also already an alias for some time_point ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a comment in the tests about system_clock::duration being different depending on the platform, while TimePoint here is defined to always be nanoseconds?

So for the flight API we need to system-dependent resolution?

If that's the reason for adding those APIs, could you add a comment briefly explaining what those functions do and why we need them?

Copy link
Contributor Author

@EnricoMi EnricoMi Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying issue is FlightEndpoint.expiration_time having different types in C++ and Cython:

  • In C++, it is std::chrono::system_clock::time_point (OS-dependent precision).
  • In Cython, it is std::chrono::time_point<std::chrono::system_clock, std::chrono::nanoseconds> (OS-independent precision).

The latter is the desired type.

The right thing to do is to fix FlightEndpoint.expiration_time in types.h, and all the casting and helper methods in this PR would go away. But that is a breaking change.

See #43537 (comment).

I have added more comments to the workaround (0b49a09) and will raise a PR (EnricoMi#3) to fix the type and revert the workaround to manage that breaking change separately.

}

ARROW_PYTHON_EXPORT
// Note: Needed by FlightEndpoint.expiration_time, which is an OS-dependent
// std::chrono::system_clock::time_point
inline TimePoint TimePoint_from_system_time(std::chrono::system_clock::time_point val) {
return std::chrono::time_point_cast<TimePoint::duration>(val);
}

ARROW_PYTHON_EXPORT
inline int64_t PyDelta_to_s(PyDateTime_Delta* pytimedelta) {
return (PyDateTime_DELTA_GET_DAYS(pytimedelta) * 86400LL +
Expand Down
9 changes: 5 additions & 4 deletions python/pyarrow/src/arrow/python/flight.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,12 @@ void PyClientMiddleware::CallCompleted(const Status& call_status) {
Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
const arrow::flight::FlightDescriptor& descriptor,
const std::vector<arrow::flight::FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes,
int64_t total_records, int64_t total_bytes, bool ordered,
const std::string& app_metadata,
std::unique_ptr<arrow::flight::FlightInfo>* out) {
ARROW_ASSIGN_OR_RAISE(auto result,
arrow::flight::FlightInfo::Make(*schema, descriptor, endpoints,
total_records, total_bytes));
ARROW_ASSIGN_OR_RAISE(auto result, arrow::flight::FlightInfo::Make(
*schema, descriptor, endpoints, total_records,
total_bytes, ordered, app_metadata));
*out = std::unique_ptr<arrow::flight::FlightInfo>(
new arrow::flight::FlightInfo(std::move(result)));
return Status::OK();
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/src/arrow/python/flight.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ ARROW_PYFLIGHT_EXPORT
Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
const arrow::flight::FlightDescriptor& descriptor,
const std::vector<arrow::flight::FlightEndpoint>& endpoints,
int64_t total_records, int64_t total_bytes,
int64_t total_records, int64_t total_bytes, bool ordered,
const std::string& app_metadata,
std::unique_ptr<arrow::flight::FlightInfo>* out);

/// \brief Create a SchemaResult from schema.
Expand Down
Loading
Loading