From 0a76f12df82ee0623a16c2452a3b0dfc14558e59 Mon Sep 17 00:00:00 2001 From: Weves Date: Tue, 15 Aug 2023 17:17:55 -0700 Subject: [PATCH] Fix slack pagination --- backend/danswer/connectors/slack/connector.py | 8 ++++---- backend/danswer/connectors/slack/utils.py | 20 +++++++++++-------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/backend/danswer/connectors/slack/connector.py b/backend/danswer/connectors/slack/connector.py index 5f1e4919d5d..89a7a7fce66 100644 --- a/backend/danswer/connectors/slack/connector.py +++ b/backend/danswer/connectors/slack/connector.py @@ -36,7 +36,7 @@ def _make_paginated_slack_api_call( call: Callable[..., SlackResponse], **kwargs: Any -) -> list[dict[str, Any]]: +) -> Generator[dict[str, Any], None, None]: return make_slack_api_call_paginated( make_slack_api_rate_limited(make_slack_api_call_logged(call)) )(**kwargs) @@ -50,9 +50,9 @@ def _make_slack_api_call( def get_channel_info(client: WebClient, channel_id: str) -> ChannelType: """Get information about a channel. Needed to convert channel ID to channel name""" - return _make_paginated_slack_api_call( - client.conversations_info, channel=channel_id - )[0]["channel"] + return _make_slack_api_call(client.conversations_info, channel=channel_id)[0][ + "channel" + ] def get_channels( diff --git a/backend/danswer/connectors/slack/utils.py b/backend/danswer/connectors/slack/utils.py index 2e8c750d06f..dbbdac5090b 100644 --- a/backend/danswer/connectors/slack/utils.py +++ b/backend/danswer/connectors/slack/utils.py @@ -1,6 +1,7 @@ import re import time from collections.abc import Callable +from collections.abc import Generator from functools import wraps from typing import Any from typing import cast @@ -45,20 +46,20 @@ def logged_call(**kwargs: Any) -> SlackResponse: def make_slack_api_call_paginated( call: Callable[..., SlackResponse], -) -> Callable[..., list[dict[str, Any]]]: +) -> Callable[..., Generator[dict[str, Any], None, None]]: """Wraps calls to slack API so that they automatically handle pagination""" @wraps(call) - def paginated_call(**kwargs: Any) -> list[dict[str, Any]]: - results: list[dict[str, Any]] = [] + def paginated_call(**kwargs: Any) -> Generator[dict[str, Any], None, None]: cursor: str | None = None has_more = True while has_more: - for result in call(cursor=cursor, limit=_SLACK_LIMIT, **kwargs): - has_more = result.get("has_more", False) - cursor = result.get("response_metadata", {}).get("next_cursor", "") - results.append(cast(dict[str, Any], result)) - return results + response = call(cursor=cursor, limit=_SLACK_LIMIT, **kwargs) + yield cast(dict[str, Any], response.validate()) + cursor = cast(dict[str, Any], response.get("response_metadata", {})).get( + "next_cursor", "" + ) + has_more = bool(cursor) return paginated_call @@ -84,6 +85,9 @@ def rate_limited_call(**kwargs: Any) -> SlackResponse: if e.response["error"] == "ratelimited": # Handle rate limiting: get the 'Retry-After' header value and sleep for that duration retry_after = int(e.response.headers.get("Retry-After", 1)) + logger.info( + f"Slack call rate limited, retrying after {retry_after} seconds. Exception: {e}" + ) time.sleep(retry_after) else: # Raise the error for non-transient errors