diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index 9ad346a56af00..91c433aa9aac0 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -31,7 +31,8 @@ from libcpp cimport bool as c_bool from pyarrow.lib cimport * from pyarrow.lib import (ArrowCancelled, ArrowException, ArrowInvalid, SignalStopHandler) -from pyarrow.lib import as_buffer, frombytes, tobytes +from pyarrow.lib import as_buffer, frombytes, timestamp, tobytes +from pyarrow.includes.chrono cimport duration_cast, microseconds from pyarrow.includes.libarrow_flight cimport * from pyarrow.ipc import _get_legacy_format_default, _ReadPandasMixin import pyarrow.lib as lib @@ -704,7 +705,7 @@ cdef class FlightEndpoint(_Weakrefable): cdef: CFlightEndpoint endpoint - def __init__(self, ticket, locations): + def __init__(self, ticket, locations, expiration_time=None, app_metadata=""): """Create a FlightEndpoint from a ticket and list of locations. Parameters @@ -713,6 +714,12 @@ cdef class FlightEndpoint(_Weakrefable): the ticket needed to access this flight locations : list of string URIs locations where this flight is available + expiration_time : TimestampScalar optional, default None + Expiration time of this stream. If present, clients may assume + they can retry DoGet requests. Otherwise, clients should avoid + retrying DoGet requests. + app_metadata : bytes or str optional, default "" + Application-defined opaque metadata. Raises ------ @@ -736,6 +743,12 @@ cdef class FlightEndpoint(_Weakrefable): CLocation.Parse(tobytes(location)).Value(&c_location)) self.endpoint.locations.push_back(c_location) + if expiration_time is not None: + self.endpoint.expiration_time = time_point( + microseconds(expiration_time.cast(timestamp("us")).value)) + + self.endpoint.app_metadata = tobytes(app_metadata) + @property def ticket(self): """Get the ticket in this endpoint.""" @@ -746,6 +759,24 @@ cdef class FlightEndpoint(_Weakrefable): return [Location.wrap(location) for location in self.endpoint.locations] + @property + def expiration_time(self): + cdef: + int64_t time_since_epoch + const char* UTC = "UTC" + shared_ptr[CTimestampType] time_type = make_shared[CTimestampType](TimeUnit.TimeUnit_MICRO, UTC) + shared_ptr[CTimestampScalar] shared + if self.endpoint.expiration_time.has_value(): + time_since_epoch = duration_cast[microseconds]( + self.endpoint.expiration_time.value().time_since_epoch()).count() + shared = make_shared[CTimestampScalar](time_since_epoch, time_type) + return Scalar.wrap( shared) + return None + + @property + def app_metadata(self): + return self.endpoint.app_metadata + def serialize(self): """Get the wire-format representation of this type. @@ -770,7 +801,9 @@ cdef class FlightEndpoint(_Weakrefable): def __repr__(self): return (f"") + f"locations={self.locations!r} " + f"expiration_time={self.expiration_time} " + f"app_metadata='{self.app_metadata.hex()}'>") def __eq__(self, FlightEndpoint other): return self.endpoint == other.endpoint diff --git a/python/pyarrow/includes/chrono.pxd b/python/pyarrow/includes/chrono.pxd new file mode 100644 index 0000000000000..432b9b609b85d --- /dev/null +++ b/python/pyarrow/includes/chrono.pxd @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from libc.stdint cimport * + + +cdef extern from "" namespace "std::chrono": + cdef cppclass duration: + duration(int64_t count) + const int64_t count() + + cdef cppclass microseconds(duration): + microseconds(int64_t count) + + T duration_cast[T](duration d) + + +cdef extern from "" namespace "std::chrono::system_clock": + cdef cppclass time_point: + time_point(const duration& d) + const duration time_since_epoch() diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 0d871f411b11b..29584af9da7f1 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -275,6 +275,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef cppclass CTimestampType" arrow::TimestampType"(CFixedWidthType): CTimestampType(TimeUnit unit) + CTimestampType(TimeUnit unit, const c_string& timezone) TimeUnit unit() const c_string& timezone() diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd index b7e54c546256b..d2bc3c9d0da23 100644 --- a/python/pyarrow/includes/libarrow_flight.pxd +++ b/python/pyarrow/includes/libarrow_flight.pxd @@ -19,6 +19,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * +from pyarrow.includes.chrono cimport time_point cdef extern from "arrow/flight/api.h" namespace "arrow" nogil: @@ -134,6 +135,8 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil: CTicket ticket vector[CLocation] locations + optional[time_point] expiration_time + c_string app_metadata bint operator==(CFlightEndpoint) CResult[c_string] SerializeToString() diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py index dc55cab16cdc9..1e62ee1a4bfb3 100644 --- a/python/pyarrow/tests/test_flight.py +++ b/python/pyarrow/tests/test_flight.py @@ -251,12 +251,14 @@ def get_flight_info(self, context, descriptor): flight.FlightEndpoint( b'', [flight.Location.for_grpc_tcp('localhost', 5005)], + pa.scalar("2023-04-05T12:34:56.789").cast(pa.timestamp("ms")), + "endpoint app metadata" ), ], 1, 42, True, - "test app metadata" + "info app metadata" ) def get_schema(self, context, descriptor): @@ -877,7 +879,9 @@ def test_repr(): descriptor_repr = "" endpoint_repr = (" " - "locations=[]>") + "locations=[] " + "expiration_time=2023-04-05 12:34:56+00:00 " + "app_metadata='656e64706f696e7420617070206d65746164617461'>") info_repr = ( "