-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add add_async/delete_async methods in InputTable #6061
feat: Add add_async/delete_async methods in InputTable #6061
Conversation
5ae174c
to
c8de273
Compare
"""Asynchronously writes rows from the provided table to this input table. If this is a keyed input table, | ||
added rows with keys that match existing rows will replace those rows. This method returns immediately without | ||
waiting for the operation to complete. If the operation succeeds, the optional on_success callback if provided | ||
will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error | ||
is not provided, a default callback function will be called that simply prints out the received exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there order processing guarantees?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes for the requests made on the same thread, and it is probably safe to assume that it is not a typical use pattern to add to an InputTable from multiple threads, in which case, it should fall on the user to sync the threads if some kind of ordering needs to be achieved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Document the guarantees if they exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jianfeng's statement around ordering is accurate. That said, it's currently implementation-defined, rather than something the Java interface guarantees. The implementations are thread-safe, and that is to be expected; concurrent usage from multiple threads, however, gives no guarantees about ordering (nor should it).
append_only_input_table.add_async(t, on_success=on_success) | ||
append_only_input_table.add_async(t, on_success=on_success) | ||
while success_count < 2: | ||
sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rcaudy is going to cry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this is the best I can do but I think it is safe/deterministic . Initially I used await_update
but because it is what add_async
uses behind the scene to wait for UGP to finish processing, it creates a race condition where one of the calls will wait for ever.
@@ -272,6 +285,70 @@ def delete(self, table: Table) -> None: | |||
except Exception as e: | |||
raise DHError(e, "delete data in the InputTable failed.") from e | |||
|
|||
def add_async(self, table: Table, on_success: Callable[[], None] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
support should also be added to the client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that we need to.
- async add/delete is what the gRPC API uses already on the server.
- the Python client talks to the server only synchronously. It'd be a paradigm shift if we were to support Python asyncIO, not to say it is not doable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern on this is that users frequently want to write code once and use it on both the client and server. If that might happen with InputTable
, we should have the method to make the API consistent, even if the method is just calling add
. If there are no reasonable cases where that might happen, then maybe it isn't a concern. This is what I'm worried about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a problem with introducing async support, but I do think Jianfeng is right that it may significantly expand scope and require some research on his end. I'd rather we do that in a separate PR, since this one is self-contained and makes things better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with a separate PR.
@@ -478,6 +475,48 @@ def test_j_input_wrapping(self): | |||
self.assertFalse(isinstance(t, InputTable)) | |||
self.assertTrue(isinstance(t, Table)) | |||
|
|||
def test_input_table_async(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to see a test or two where on_error
gets called. e.g. wrong schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the error that causes on_error to be called isn't something we can easily produce. Added a test case to demo that.
...ns/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java
Outdated
Show resolved
Hide resolved
...ns/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java
Outdated
Show resolved
Hide resolved
...ns/src/main/java/io/deephaven/integrations/python/PythonInputTableStatusListenerAdapter.java
Outdated
Show resolved
Hide resolved
"""Asynchronously writes rows from the provided table to this input table. If this is a keyed input table, | ||
added rows with keys that match existing rows will replace those rows. This method returns immediately without | ||
waiting for the operation to complete. If the operation succeeds, the optional on_success callback if provided | ||
will be called. If the operation fails, the optional on_error callback if provided will be called. If on_error | ||
is not provided, a default callback function will be called that simply prints out the received exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jianfeng's statement around ordering is accurate. That said, it's currently implementation-defined, rather than something the Java interface guarantees. The implementations are thread-safe, and that is to be expected; concurrent usage from multiple threads, however, gives no guarantees about ordering (nor should it).
Labels indicate documentation is required. Issues for documentation have been opened: Community: deephaven/deephaven-docs-community#313 |
Fixes #3887