Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: unpin "numpy<2.0" #99

Closed
wants to merge 17 commits into from
Closed
16 changes: 8 additions & 8 deletions .github/workflows/build-wheel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ jobs:
- os: ubuntu-latest
arch: aarch64
requires-python: ">=3.11,<3.12"
- os: macos-12
arch: x86_64
requires-python: ">=3.8,<3.10"
- os: macos-12
arch: x86_64
requires-python: ">=3.10,<3.12"
- os: macos-12
- os: macos-13
arch: universal2
requires-python: ">=3.8,<3.10"
- os: macos-12
- os: macos-13
arch: universal2
requires-python: ">=3.10,<3.12"
- os: macos-13
arch: arm64
requires-python: ">=3.8,<3.10"
- os: macos-13
arch: arm64
requires-python: ">=3.10,<3.12"

steps:
- uses: actions/checkout@v3
Expand Down
29 changes: 24 additions & 5 deletions .github/workflows/python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ jobs:
needs: lint
env:
CONDA_ENV: xoscar-test
SELF_HOST_PYTHON: /root/miniconda3/envs/xoscar-test/bin/python
SELF_HOST_CONDA: /root/miniconda3/condabin/conda
defaults:
run:
shell: bash -l {0}
Expand All @@ -85,7 +87,8 @@ jobs:
- { os: windows-latest, python-version: 3.9}
- { os: windows-latest, python-version: 3.10}
include:
- { os: self-hosted, module: gpu, python-version: 3.9}
- { os: self-hosted, module: gpu, python-version: 3.11}
- { os: ubuntu-20.04, module: doc-build, python-version: 3.9}

steps:
- name: Check out code
Expand Down Expand Up @@ -116,12 +119,15 @@ jobs:
run: |
pip install numpy scipy cython coverage flaky
pip install -e ".[dev,extra]"
# ucx_info -v
working-directory: ./python

- name: Install ucx dependencies
if: ${{ (matrix.module != 'gpu') && (matrix.os == 'ubuntu-latest') && (matrix.python-version != '3.11') }}
if: ${{ (matrix.module != 'gpu') && (matrix.os == 'ubuntu-latest') && (matrix.python-version >= '3.9')}}
run: |
conda install -c conda-forge -c rapidsai ucx-proc=*=cpu ucx ucx-py
# ucx-py move to ucxx and ucxx-cu12 can be run on CPU
# conda install -c conda-forge -c rapidsai ucx-proc=*=cpu ucx ucx-py
pip install ucxx-cu12

- name: Install fury
if: ${{ (matrix.module != 'gpu') && (matrix.os == 'ubuntu-latest') && (matrix.python-version == '3.9') }}
Expand All @@ -131,10 +137,22 @@ jobs:
- name: Install on GPU
if: ${{ matrix.module == 'gpu' }}
run: |
python setup.py build_ext -i
source activate ${{ env.CONDA_ENV }}
conda install -y conda-forge::nccl=2.22.3
pip install --extra-index-url=https://pypi.nvidia.com cudf-cu12
pip install ucx-py-cu12 cloudpickle psutil tblib uvloop packaging "numpy<2.0.0" scipy cython coverage flaky
python setup.py clean --all
pip install -e ./
working-directory: ./python

- name: Build doc
if: ${{ matrix.module == 'doc-build' }}
run: |
make html
working-directory: ./doc

- name: Test with pytest
if: ${{ matrix.module != 'doc-build'}}
env:
MODULE: ${{ matrix.module }}
run: |
Expand All @@ -143,12 +161,13 @@ jobs:
-W ignore::PendingDeprecationWarning \
--cov-config=setup.cfg --cov-report=xml --cov=xoscar xoscar --capture=no
else
source activate ${{ env.CONDA_ENV }}
pytest -m cuda --cov-config=setup.cfg --cov-report=xml --cov=xoscar --capture=no
fi
working-directory: ./python

- name: Report coverage data
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
working-directory: ./python
flags: unittests
2 changes: 1 addition & 1 deletion CI/conda-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: xoscar-test
channels:
- defaults
dependencies:
- numpy<2.0.0
- numpy
- cloudpickle
- coverage
- cython
Expand Down
2 changes: 1 addition & 1 deletion CI/requirements-wheel.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
oldest-supported-numpy

numpy<2.0.0
numpy
packaging
wheel

Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ requires = [
"pandas==1.4.0; python_version>='3.10' and python_version<'3.11' and platform_machine=='arm64'",
"pandas==1.5.1; python_version>='3.11' and python_version<'3.12'",
"pandas>=2.1.1; python_version>'3.11'",
"numpy<2.0.0",
"numpy",
"cython>=0.29.33",
"requests>=2.4.0",
"cloudpickle>=2.2.1; python_version>='3.11'",
Expand Down
6 changes: 3 additions & 3 deletions python/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ zip_safe = False
include_package_data = True
packages = find:
install_requires =
numpy>=1.14.0,<2.0.0
numpy>=1.14.0
pandas>=1.0.0
scipy>=1.0.0; sys_platform!="win32" or python_version>="3.10"
scipy>=1.0.0,<=1.9.1; sys_platform=="win32" and python_version<"3.10"
Expand All @@ -50,15 +50,15 @@ dev =
pytest-forked>=1.0
pytest-asyncio>=0.14.0
ipython>=6.5.0
sphinx>=3.0.0,<5.0.0
sphinx
pydata-sphinx-theme>=0.3.0
sphinx-intl>=0.9.9
mock>=4.0.0; python_version<"3.8"
flake8>=3.8.0
black
doc =
ipython>=6.5.0
sphinx>=3.0.0,<5.0.0
sphinx
pydata-sphinx-theme>=0.3.0
sphinx-intl>=0.9.9
extra =
Expand Down
28 changes: 24 additions & 4 deletions python/xoscar/backends/communication/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ..._utils import to_binary
from ...constants import XOSCAR_UNIX_SOCKET_DIR
from ...serialization import AioDeserializer, AioSerializer, deserialize
from ...utils import classproperty, implements
from ...utils import classproperty, implements, is_v6_ip
from .base import Channel, ChannelType, Client, Server
from .core import register_client, register_server
from .utils import read_buffers, write_buffers
Expand Down Expand Up @@ -201,17 +201,37 @@ def client_type(self) -> Type["Client"]:
def channel_type(self) -> int:
return ChannelType.remote

@classmethod
def parse_config(cls, config: dict) -> dict:
if config is None or not config:
return dict()
# we only need the following config
keys = ["listen_elastic_ip"]
parsed_config = {key: config[key] for key in keys if key in config}

return parsed_config

@staticmethod
@implements(Server.create)
async def create(config: Dict) -> "Server":
config = config.copy()
if "address" in config:
address = config.pop("address")
host, port = address.split(":", 1)
host, port = address.rsplit(":", 1)
port = int(port)
else:
host = config.pop("host")
port = int(config.pop("port"))
_host = host
if config.pop("listen_elastic_ip", False):
# The Actor.address will be announce to client, and is not on our host,
# cannot actually listen on it,
# so we have to keep SocketServer.host untouched to make sure Actor.address not changed
if is_v6_ip(host):
_host = "::"
else:
_host = "0.0.0.0"

handle_channel = config.pop("handle_channel")
if "start_serving" not in config:
config["start_serving"] = False
Expand All @@ -224,7 +244,7 @@ async def handle_connection(reader: StreamReader, writer: StreamWriter):

port = port if port != 0 else None
aio_server = await asyncio.start_server(
handle_connection, host=host, port=port, **config
handle_connection, host=_host, port=port, **config
)

# get port of the socket if not specified
Expand All @@ -250,7 +270,7 @@ class SocketClient(Client):
async def connect(
dest_address: str, local_address: str | None = None, **kwargs
) -> "Client":
host, port_str = dest_address.split(":", 1)
host, port_str = dest_address.rsplit(":", 1)
port = int(port_str)
(reader, writer) = await asyncio.open_connection(host=host, port=port, **kwargs)
channel = SocketChannel(
Expand Down
29 changes: 20 additions & 9 deletions python/xoscar/backends/communication/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
from ...nvutils import get_cuda_context, get_index_and_uuid
from ...serialization import deserialize
from ...serialization.aio import BUFFER_SIZES_NAME, AioSerializer, get_header_length
from ...utils import classproperty, implements, is_cuda_buffer, lazy_import
from ...utils import classproperty, implements, is_cuda_buffer, is_v6_ip, lazy_import
from ..message import _MessageBase
from .base import Channel, ChannelType, Client, Server
from .core import register_client, register_server
from .errors import ChannelClosed

ucp = lazy_import("ucp")
ucp = lazy_import("ucxx")
numba_cuda = lazy_import("numba.cuda")
rmm = lazy_import("rmm")

Expand Down Expand Up @@ -86,7 +86,7 @@ def _get_options(ucx_config: dict) -> Tuple[dict, dict]:
tls += ",cuda_copy"

if ucx_config.get("infiniband"): # pragma: no cover
tls = "rc," + tls
tls = "ib," + tls
if ucx_config.get("nvlink"): # pragma: no cover
tls += ",cuda_ipc"

Expand Down Expand Up @@ -177,7 +177,8 @@ def init(ucx_config: dict):
new_environ.update(envs)
os.environ = new_environ # type: ignore
try:
ucp.init(options=options, env_takes_precedence=True)
# let UCX determine the appropriate transports
ucp.init()
finally:
os.environ = original_environ

Expand Down Expand Up @@ -313,7 +314,7 @@ async def send_buffers(self, buffers: list, meta: Optional[_MessageBase] = None)
await self.ucp_endpoint.send(buf)
for buffer in buffers:
await self.ucp_endpoint.send(buffer)
except ucp.exceptions.UCXBaseException: # pragma: no cover
except ucp.exceptions.UCXError: # pragma: no cover
self.abort()
raise ChannelClosed("While writing, the connection was closed")

Expand Down Expand Up @@ -401,11 +402,21 @@ async def create(config: Dict) -> "Server":
prefix = f"{UCXServer.scheme}://"
if address.startswith(prefix):
address = address[len(prefix) :]
host, port = address.split(":", 1)
host, port = address.rsplit(":", 1)
port = int(port)
else:
host = config.pop("host")
port = int(config.pop("port"))
_host = host
if config.pop("listen_elastic_ip", False):
# The Actor.address will be announce to client, and is not on our host,
# cannot actually listen on it,
# so we have to keep SocketServer.host untouched to make sure Actor.address not changed
if is_v6_ip(host):
_host = "::"
else:
_host = "0.0.0.0"

handle_channel = config.pop("handle_channel")

# init
Expand All @@ -414,7 +425,7 @@ async def create(config: Dict) -> "Server":
async def serve_forever(client_ucp_endpoint: "ucp.Endpoint"): # type: ignore
try:
await server.on_connected(
client_ucp_endpoint, local_address=server.address
client_ucp_endpoint, local_address="%s:%d" % (_host, port)
)
except ChannelClosed: # pragma: no cover
logger.exception("Connection closed before handshake completed")
Expand Down Expand Up @@ -498,15 +509,15 @@ async def connect(
prefix = f"{UCXClient.scheme}://"
if dest_address.startswith(prefix):
dest_address = dest_address[len(prefix) :]
host, port_str = dest_address.split(":", 1)
host, port_str = dest_address.rsplit(":", 1)
port = int(port_str)
kwargs = kwargs.copy()
ucx_config = kwargs.pop("config", dict()).get("ucx", dict())
UCXInitializer.init(ucx_config)

try:
ucp_endpoint = await ucp.create_endpoint(host, port)
except ucp.exceptions.UCXBaseException as e: # pragma: no cover
except ucp.exceptions.UCXError as e: # pragma: no cover
raise ChannelClosed(
f"Connection closed before handshake completed, "
f"local address: {local_address}, dest address: {dest_address}"
Expand Down
5 changes: 3 additions & 2 deletions python/xoscar/backends/indigen/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def get_external_addresses(
"""Get external address for every process"""
assert n_process is not None
if ":" in address:
host, port_str = address.split(":", 1)
host, port_str = address.rsplit(":", 1)
port = int(port_str)
if ports:
if len(ports) != n_process:
Expand Down Expand Up @@ -324,6 +324,7 @@ async def append_sub_pool(
start_method: str | None = None,
kwargs: dict | None = None,
):
# external_address has port 0, subprocess will bind random port.
external_address = (
external_address
or MainActorPool.get_external_addresses(self.external_address, n_process=1)[
Expand Down Expand Up @@ -393,7 +394,7 @@ def start_pool_in_process():
content=self._config,
)
await self.handle_control_command(control_message)

# The actual port will return in process_status.
return process_status.external_addresses[0]

async def remove_sub_pool(
Expand Down
Loading
Loading