Skip to content

Commit

Permalink
Feat/replace transport with httpx (#201)
Browse files Browse the repository at this point in the history
Co-authored-by: Mateusz Wiktor <39187473+techwritermat@users.noreply.github.com>

Co-authored-by: Mateusz Wiktor <39187473+techwritermat@users.noreply.github.com>

* PubNub SDK 10.0.0 release.

---------

Co-authored-by: Mateusz Wiktor <39187473+techwritermat@users.noreply.github.com>
Co-authored-by: PubNub Release Bot <120067856+pubnub-release-bot@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 13, 2025
1 parent ba33368 commit 7494aaa
Show file tree
Hide file tree
Showing 94 changed files with 3,782 additions and 4,726 deletions.
39 changes: 19 additions & 20 deletions .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: python
version: 9.1.0
version: 10.0.0
schema: 1
scm: github.com/pubnub/python
sdks:
Expand All @@ -18,7 +18,7 @@ sdks:
distributions:
- distribution-type: library
distribution-repository: package
package-name: pubnub-9.1.0
package-name: pubnub-10.0.0
location: https://pypi.org/project/pubnub/
supported-platforms:
supported-operating-systems:
Expand Down Expand Up @@ -61,12 +61,6 @@ sdks:
- x86
- x86-64
requires:
- name: requests
min-version: "2.4"
location: https://pypi.org/project/requests/
license: Apache Software License (Apache 2.0)
license-url: https://github.com/psf/requests/blob/master/LICENSE
is-required: Required
- name: pycryptodomex
min-version: "3.3"
location: https://pypi.org/project/pycryptodomex/
Expand All @@ -79,11 +73,11 @@ sdks:
license: MIT License (MIT)
license-url: https://github.com/agronholm/cbor2/blob/master/LICENSE.txt
is-required: Required
- name: aiohttp
min-version: "2.3.10"
location: https://pypi.org/project/aiohttp/
license: Apache Software License (Apache 2)
license-url: https://github.com/aio-libs/aiohttp/blob/master/LICENSE.txt
- name: httpx
min-version: "0.28.0"
location: https://pypi.org/project/httpx/
license: BSD License (BSD-3-Clause)
license-url: https://github.com/encode/httpx/blob/master/LICENSE.md
is-required: Required
-
language: python
Expand All @@ -97,8 +91,8 @@ sdks:
-
distribution-type: library
distribution-repository: git release
package-name: pubnub-9.1.0
location: https://github.com/pubnub/python/releases/download/v9.1.0/pubnub-9.1.0.tar.gz
package-name: pubnub-10.0.0
location: https://github.com/pubnub/python/releases/download/10.0.0/pubnub-10.0.0.tar.gz
supported-platforms:
supported-operating-systems:
Linux:
Expand Down Expand Up @@ -162,13 +156,18 @@ sdks:
license-url: https://github.com/agronholm/cbor2/blob/master/LICENSE.txt
is-required: Required
-
name: aiohttp
min-version: "2.3.10"
location: https://pypi.org/project/aiohttp/
license: Apache Software License (Apache 2)
license-url: https://github.com/aio-libs/aiohttp/blob/master/LICENSE.txt
name: httpx
min-version: "0.28.0"
location: https://pypi.org/project/httpx/
license: BSD License (BSD-3-Clause)
license-url: https://github.com/encode/httpx/blob/master/LICENSE.md
is-required: Required
changelog:
- date: 2025-01-13
version: 10.0.0
changes:
- type: feature
text: "Introduced configurable request handler with HTTP/2 support."
- date: 2024-11-19
version: v9.1.0
changes:
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 10.0.0
January 13 2025

#### Added
- Introduced configurable request handler with HTTP/2 support.

## v9.1.0
November 19 2024

Expand Down
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ You will need the publish and subscribe keys to authenticate your app. Get your
## Configure PubNub

1. Integrate the Python SDK into your project using `pip`:

```bash
pip install pubnub
```
Expand Down Expand Up @@ -83,9 +83,8 @@ pubnub.subscribe().channels('my_channel').execute()
## Documentation
* [Build your first realtime Python app with PubNub](https://www.pubnub.com/docs/platform/quickstarts/python)
* [API reference for Python](https://www.pubnub.com/docs/python/pubnub-python-sdk)
* [API reference for Python (asyncio)](https://www.pubnub.com/docs/python-aiohttp/pubnub-python-sdk)
* [Build your first realtime Python app with PubNub](https://www.pubnub.com/docs/general/basics/set-up-your-account)
* [API reference for Python](https://www.pubnub.com/docs/sdks/python)
## Support
Expand Down
2 changes: 2 additions & 0 deletions pubnub/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def decrypt(self, key, file, use_random_iv=True):
cipher = AES.new(bytes(secret[0:32], "utf-8"), self.mode, initialization_vector)
result = unpad(cipher.decrypt(extracted_file), 16)
except ValueError:
if not self.fallback_mode: # No fallback mode so we return the original content
return file
cipher = AES.new(bytes(secret[0:32], "utf-8"), self.fallback_mode, initialization_vector)
result = unpad(cipher.decrypt(extracted_file), 16)

Expand Down
7 changes: 4 additions & 3 deletions pubnub/endpoints/file_operations/download_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from pubnub.enums import HttpMethod, PNOperationType
from pubnub.crypto import PubNubFileCrypto
from pubnub.models.consumer.file import PNDownloadFileResult
from pubnub.request_handlers.requests_handler import RequestsRequestHandler
from pubnub.endpoints.file_operations.get_file_url import GetFileDownloadUrl
from warnings import warn
from urllib.parse import urlparse, parse_qs


class DownloadFileNative(FileOperationEndpoint):
Expand Down Expand Up @@ -69,7 +69,8 @@ def use_base_path(self):
return False

def build_params_callback(self):
return lambda a: {}
params = parse_qs(urlparse(self._download_data.result.file_url).query)
return lambda a: {key: str(params[key][0]) for key in params.keys()}

def name(self):
return "Downloading file"
Expand All @@ -84,4 +85,4 @@ def sync(self):
return super(DownloadFileNative, self).sync()

def pn_async(self, callback):
return RequestsRequestHandler(self._pubnub).async_file_based_operation(self.sync, callback, "File Download")
self._pubnub.get_request_handler().async_file_based_operation(self.sync, callback, "File Download")
3 changes: 1 addition & 2 deletions pubnub/endpoints/file_operations/send_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from pubnub.models.consumer.file import PNSendFileResult
from pubnub.endpoints.file_operations.publish_file_message import PublishFileMessage
from pubnub.endpoints.file_operations.fetch_upload_details import FetchFileUploadS3Data
from pubnub.request_handlers.requests_handler import RequestsRequestHandler
from pubnub.endpoints.mixins import TimeTokenOverrideMixin
from warnings import warn

Expand Down Expand Up @@ -152,4 +151,4 @@ def sync(self):
return response_envelope

def pn_async(self, callback):
return RequestsRequestHandler(self._pubnub).async_file_based_operation(self.sync, callback, "File Download")
self._pubnub.get_request_handler().async_file_based_operation(self.sync, callback, "File Download")
43 changes: 15 additions & 28 deletions pubnub/endpoints/file_operations/send_file_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,28 @@
import aiohttp

from pubnub.endpoints.file_operations.send_file import SendFileNative
from pubnub.endpoints.file_operations.publish_file_message import PublishFileMessage
from pubnub.endpoints.file_operations.fetch_upload_details import FetchFileUploadS3Data


class AsyncioSendFile(SendFileNative):
def build_file_upload_request(self):
file = self.encrypt_payload()
form_data = aiohttp.FormData()
for form_field in self._file_upload_envelope.result.data["form_fields"]:
form_data.add_field(form_field["key"], form_field["value"], content_type="multipart/form-data")
form_data.add_field("file", file, filename=self._file_name, content_type="application/octet-stream")

return form_data

def options(self):
request_options = super(SendFileNative, self).options()
request_options.data = request_options.files
return request_options

async def future(self):
self._file_upload_envelope = await FetchFileUploadS3Data(self._pubnub).\
channel(self._channel).\
file_name(self._file_name).future()
self._file_upload_envelope = await FetchFileUploadS3Data(self._pubnub) \
.channel(self._channel) \
.file_name(self._file_name).future()

response_envelope = await super(SendFileNative, self).future()

publish_file_response = await PublishFileMessage(self._pubnub).\
channel(self._channel).\
meta(self._meta).\
message(self._message).\
file_id(response_envelope.result.file_id).\
file_name(response_envelope.result.name).\
should_store(self._should_store).\
ttl(self._ttl).\
cipher_key(self._cipher_key).future()
publish_file_response = await PublishFileMessage(self._pubnub) \
.channel(self._channel) \
.meta(self._meta) \
.message(self._message) \
.file_id(response_envelope.result.file_id) \
.file_name(response_envelope.result.name) \
.should_store(self._should_store) \
.ttl(self._ttl) \
.replicate(self._replicate) \
.ptto(self._ptto) \
.custom_message_type(self._custom_message_type) \
.cipher_key(self._cipher_key).future()

response_envelope.result.timestamp = publish_file_response.result.timestamp
return response_envelope
Expand Down
5 changes: 4 additions & 1 deletion pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,15 @@ async def delayed_reconnect_async(self, delay, attempt):
elif response.status.error:
self.logger.warning(f'Reconnect failed: {response.status.error_data.__dict__}')
self.failure(response.status.error_data, attempt, self.get_timetoken())
else:
elif 't' in response.result:
cursor = response.result['t']
timetoken = int(self.invocation.timetoken) if self.invocation.timetoken else cursor['t']
region = cursor['r']
messages = response.result['m']
self.success(timetoken=timetoken, region=region, messages=messages)
else:
self.logger.warning(f'Reconnect failed: Invalid response {str(response)}')
self.failure(str(response), attempt, self.get_timetoken())

def stop(self):
self.logger.debug(f'stop called on {self.__class__.__name__}')
Expand Down
16 changes: 16 additions & 0 deletions pubnub/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,19 @@ def __init__(self, errormsg="", status_code=0, pn_error=None, status=None):
def _status(self):
raise DeprecationWarning
return self.status


class PubNubAsyncioException(Exception):
def __init__(self, result, status):
self.result = result
self.status = status

def __str__(self):
return str(self.status.error_data.exception)

@staticmethod
def is_error():
return True

def value(self):
return self.status.error_data.exception
8 changes: 8 additions & 0 deletions pubnub/models/envelopes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class AsyncioEnvelope:
def __init__(self, result, status):
self.result = result
self.status = status

@staticmethod
def is_error():
return False
72 changes: 54 additions & 18 deletions pubnub/pubnub.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,58 @@
import copy
import importlib
import logging
import threading
import os

from typing import Type
from threading import Event
from queue import Queue, Empty
from . import utils
from .request_handlers.base import BaseRequestHandler
from .request_handlers.requests_handler import RequestsRequestHandler
from .callbacks import SubscribeCallback, ReconnectionCallback
from .endpoints.presence.heartbeat import Heartbeat
from .endpoints.presence.leave import Leave
from .endpoints.pubsub.subscribe import Subscribe
from .enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
from .managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager, TelemetryManager
from .models.consumer.common import PNStatus
from .pnconfiguration import PNConfiguration
from .pubnub_core import PubNubCore
from .structures import PlatformOptions
from .workers import SubscribeMessageWorker
from pubnub import utils
from pubnub.request_handlers.base import BaseRequestHandler
from pubnub.request_handlers.httpx import HttpxRequestHandler
from pubnub.callbacks import SubscribeCallback, ReconnectionCallback
from pubnub.endpoints.presence.heartbeat import Heartbeat
from pubnub.endpoints.presence.leave import Leave
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager, TelemetryManager
from pubnub.models.consumer.common import PNStatus
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_core import PubNubCore
from pubnub.structures import PlatformOptions
from pubnub.workers import SubscribeMessageWorker

logger = logging.getLogger("pubnub")


class PubNub(PubNubCore):
"""PubNub Python API"""

def __init__(self, config):
def __init__(self, config: PNConfiguration, *, custom_request_handler: Type[BaseRequestHandler] = None):
""" PubNub instance constructor
Parameters:
config (PNConfiguration): PNConfiguration instance (required)
custom_request_handler (BaseRequestHandler): Custom request handler class (optional)
"""
assert isinstance(config, PNConfiguration)
PubNubCore.__init__(self, config)
self._request_handler = RequestsRequestHandler(self)

if (not custom_request_handler) and (handler := os.getenv('PUBNUB_REQUEST_HANDLER')):
module_name, class_name = handler.rsplit('.', 1)
module = importlib.import_module(module_name)
custom_request_handler = getattr(module, class_name)
if not issubclass(custom_request_handler, BaseRequestHandler):
raise Exception("Custom request handler must be subclass of BaseRequestHandler")
self._request_handler = custom_request_handler(self)

if custom_request_handler:
if not issubclass(custom_request_handler, BaseRequestHandler):
raise Exception("Custom request handler must be subclass of BaseRequestHandler")
self._request_handler = custom_request_handler(self)
else:
self._request_handler = HttpxRequestHandler(self)

if self.config.enable_subscribe:
self._subscription_manager = NativeSubscriptionManager(self)
Expand All @@ -40,10 +64,22 @@ def __init__(self, config):
def sdk_platform(self):
return ""

def set_request_handler(self, handler):
def set_request_handler(self, handler: BaseRequestHandler):
"""Set custom request handler
Parametrers:
handler (BaseRequestHandler): Instance of custom request handler
"""
assert isinstance(handler, BaseRequestHandler)
self._request_handler = handler

def get_request_handler(self) -> BaseRequestHandler:
"""Get instance of request handler
Return: handler(BaseRequestHandler): Instance of request handler
"""
return self._request_handler

def request_sync(self, endpoint_call_options):
platform_options = PlatformOptions(self.headers, self.config)

Expand All @@ -63,7 +99,7 @@ def request_async(self, endpoint_name, endpoint_call_options, callback, cancella
tt = endpoint_call_options.params["tt"] if "tt" in endpoint_call_options.params else 0
print(f'\033[48;5;236m{endpoint_name=}, {endpoint_call_options.path}, TT={tt}\033[0m\n')

return self._request_handler.async_request(
return self._request_handler.threaded_request(
endpoint_name,
platform_options,
endpoint_call_options,
Expand Down
Loading

0 comments on commit 7494aaa

Please sign in to comment.