From 5ce228d73ae85ebc3374f74ac718822ee5a6aa85 Mon Sep 17 00:00:00 2001 From: Aki Ariga Date: Thu, 5 Sep 2024 20:33:15 -0700 Subject: [PATCH] Use msgpack.Unpacker.feed() to allow fetching large data from the stream (#118) This is to prevent timeout error when the response is large. It works with 3+GiB msgpack data. --- tdclient/job_api.py | 8 +++++--- tdclient/test/test_helper.py | 4 ++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/tdclient/job_api.py b/tdclient/job_api.py index 60b163b..80a95a9 100644 --- a/tdclient/job_api.py +++ b/tdclient/job_api.py @@ -248,9 +248,11 @@ def job_result_format_each(self, job_id, format, header=False): if code != 200: self.raise_error("Get job result failed", res, "") if format == "msgpack": - unpacker = msgpack.Unpacker(res, raw=False) - for row in unpacker: - yield row + unpacker = msgpack.Unpacker(raw=False, max_buffer_size=1000 * 1024 ** 2) + for chunk in res.stream(1024 ** 2): + unpacker.feed(chunk) + for row in unpacker: + yield row elif format == "json": for row in codecs.getreader("utf-8")(res): yield json.loads(row) diff --git a/tdclient/test/test_helper.py b/tdclient/test/test_helper.py index 180155e..d14d1b2 100644 --- a/tdclient/test/test_helper.py +++ b/tdclient/test/test_helper.py @@ -46,7 +46,11 @@ def read(size=None): else: return b"" + def stream(size=None): + yield read(size) + response.read.side_effect = read + response.stream.side_effect = stream return response