Skip to content

Commit

Permalink
Implement Ray runners for processing data distributedly (#21)
Browse files Browse the repository at this point in the history
* Implement Ray runners for processing data distributedly

* Add unit tests for Ray runner
  • Loading branch information
Zhou Fang authored Dec 30, 2023
1 parent 36abe74 commit b9d04dc
Show file tree
Hide file tree
Showing 14 changed files with 646 additions and 82 deletions.
22 changes: 21 additions & 1 deletion python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
"""Space dataset is the interface to interact with underlying storage."""

from __future__ import annotations
from typing import Dict, List
from typing import Dict, List, Optional

import pyarrow as pa
import pyarrow.compute as pc
from substrait.algebra_pb2 import ReadRel, Rel

from space.core.runners import LocalRunner
Expand All @@ -26,6 +27,8 @@
from space.core.utils.lazy_imports_utils import ray
from space.core.utils.plans import LogicalPlanBuilder
from space.core.views import View
from space.ray.data_sources import SpaceDataSource
from space.ray.runners import RayReadWriterRunner


class Dataset(View):
Expand Down Expand Up @@ -101,3 +104,20 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel:
def process_source(self, data: pa.Table) -> ray.Dataset:
# Dataset is the source, there is no transform, so simply return the data.
return ray.data.from_arrow(data)

def ray_dataset(self,
filter_: Optional[pc.Expression] = None,
fields: Optional[List[str]] = None,
snapshot_id: Optional[int] = None,
reference_read: bool = False) -> ray.Dataset:
"""Return a Ray dataset for a Space dataset."""
return ray.data.read_datasource(SpaceDataSource(),
storage=self._storage,
filter_=filter_,
fields=fields,
snapshot_id=snapshot_id,
reference_read=reference_read)

def ray(self) -> RayReadWriterRunner:
"""Get a Ray runner."""
return RayReadWriterRunner(self)
79 changes: 44 additions & 35 deletions python/src/space/core/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
from typing import List, Optional

import pyarrow as pa
import pyarrow.compute as pc

from space.core.ops.append import BaseAppendOp, LocalAppendOp
from space.core.ops.delete import BaseDeleteOp, FileSetDeleteOp
from space.core.ops.append import LocalAppendOp
from space.core.ops.delete import FileSetDeleteOp
from space.core.ops.read import FileSetReadOp, ReadOptions
from space.core.ops import utils
from space.core.ops.base import BaseOp, InputData
import space.core.proto.metadata_pb2 as meta
import space.core.proto.runtime_pb2 as runtime
from space.core.storage import Storage
from space.core.utils.paths import StoragePathsMixin
Expand Down Expand Up @@ -54,7 +56,7 @@ def write(self, data: InputData) -> Optional[runtime.Patch]:


class LocalInsertOp(BaseInsertOp, StoragePathsMixin):
'''Append data to a dataset.'''
"""Insert data to a dataset."""

def __init__(self, storage: Storage, options: InsertOptions):
StoragePathsMixin.__init__(self, storage.location)
Expand All @@ -74,47 +76,54 @@ def _write_arrow(self, data: pa.Table) -> Optional[runtime.Patch]:
if data.num_rows == 0:
return None

pk_filter = utils.primary_key_filter(
list(self._metadata.schema.primary_keys), data)
assert pk_filter is not None
filter_ = utils.primary_key_filter(self._storage.primary_keys, data)
assert filter_ is not None

data_files = self._storage.data_files(pk_filter)
data_files = self._storage.data_files(filter_)

mode = self._options.mode
patches: List[Optional[runtime.Patch]] = []
if data_files.index_files:
if mode == InsertOptions.Mode.INSERT:
read_op = FileSetReadOp(
self._location, self._metadata, data_files,
ReadOptions(filter_=pk_filter, fields=self._storage.primary_keys))

for batch in iter(read_op):
if batch.num_rows > 0:
# TODO: to customize the error and converted it to JobResult failed
# status.
raise RuntimeError('Primary key to insert already exist')
self._check_duplication(data_files, filter_)
elif mode == InsertOptions.Mode.UPSERT:
_try_delete_data(
FileSetDeleteOp(self._location, self._metadata, data_files,
pk_filter), patches)
self._delete(filter_, data_files, patches)
else:
raise RuntimeError(f"Insert mode {mode} not supported")

_try_append_data(LocalAppendOp(self._location, self._metadata), data,
patches)
self._append(data, patches)
return utils.merge_patches(patches)


def _try_delete_data(op: BaseDeleteOp,
patches: List[Optional[runtime.Patch]]) -> None:
patch = op.delete()
if patch is not None:
patches.append(patch)


def _try_append_data(op: BaseAppendOp, data: pa.Table,
patches: List[Optional[runtime.Patch]]) -> None:
op.write(data)
patch = op.finish()
if patch is not None:
patches.append(patch)
def _check_duplication(self, data_files: runtime.FileSet,
filter_: pc.Expression):
if filter_matched(self._location, self._metadata, data_files, filter_,
self._storage.primary_keys):
raise RuntimeError("Primary key to insert already exist")

def _delete(self, filter_: pc.Expression, data_files: runtime.FileSet,
patches: List[Optional[runtime.Patch]]) -> None:
delete_op = FileSetDeleteOp(self._location, self._metadata, data_files,
filter_)
patches.append(delete_op.delete())

def _append(self, data: pa.Table,
patches: List[Optional[runtime.Patch]]) -> None:
append_op = LocalAppendOp(self._location, self._metadata)
append_op.write(data)
patches.append(append_op.finish())


def filter_matched(location: str, metadata: meta.StorageMetadata,
data_files: runtime.FileSet, filter_: pc.Expression,
primary_keys: List[str]) -> bool:
"""Return True if there are data matching the provided filter."""
op = FileSetReadOp(location, metadata, data_files,
ReadOptions(filter_=filter_, fields=primary_keys))

for data in iter(op):
if data.num_rows > 0:
# TODO: to customize the error and converted it to JobResult failed
# status.
return True

return False
41 changes: 23 additions & 18 deletions python/src/space/core/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult:


class BaseReadWriteRunner(StorageCommitMixin, BaseReadOnlyRunner):
"""Abstract base runner class."""
"""Abstract base read and write runner class."""

def __init__(self, storage: Storage):
StorageCommitMixin.__init__(self, storage)
Expand All @@ -89,7 +89,9 @@ def append(self, data: InputData) -> runtime.JobResult:
"""Append data into the dataset."""

@abstractmethod
def append_from(self, source: Iterator[InputData]) -> runtime.JobResult:
def append_from(
self, sources: Union[Iterator[InputData], List[Iterator[InputData]]]
) -> runtime.JobResult:
"""Append data into the dataset from an iterator source."""

@abstractmethod
Expand Down Expand Up @@ -154,17 +156,27 @@ def read(self,
fields=fields,
reference_read=reference_read)))

def append(self, data: InputData) -> runtime.JobResult:

def make_iter():
yield data
def diff(self, start_version: Union[int],
end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]:
return read_change_data(self._storage,
version_to_snapshot_id(start_version),
version_to_snapshot_id(end_version))

return self.append_from(make_iter())
def append(self, data: InputData) -> runtime.JobResult:
op = LocalAppendOp(self._storage.location, self._storage.metadata)
op.write(data)
return self._try_commit(op.finish())

def append_from(self, source: Iterator[InputData]) -> runtime.JobResult:
def append_from(
self, sources: Union[Iterator[InputData], List[Iterator[InputData]]]
) -> runtime.JobResult:
op = LocalAppendOp(self._storage.location, self._storage.metadata)
for data in source:
op.write(data)
if not isinstance(sources, list):
sources = [sources]

for source in sources:
for data in source:
op.write(data)

return self._try_commit(op.finish())

Expand All @@ -185,17 +197,10 @@ def _insert(self, data: InputData,
return self._try_commit(op.write(data))

def delete(self, filter_: pc.Expression) -> runtime.JobResult:
ds = self._storage
op = FileSetDeleteOp(self._storage.location, self._storage.metadata,
ds.data_files(filter_), filter_)
self._storage.data_files(filter_), filter_)
return self._try_commit(op.delete())

def diff(self, start_version: Union[int],
end_version: Union[int]) -> Iterator[Tuple[ChangeType, pa.Table]]:
return read_change_data(self._storage,
version_to_snapshot_id(start_version),
version_to_snapshot_id(end_version))


def _job_result(patch: Optional[runtime.Patch]) -> runtime.JobResult:
if patch is None:
Expand Down
28 changes: 22 additions & 6 deletions python/src/space/core/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
"""Classes for transforming datasets and """

from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from os import path
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Optional

import pyarrow as pa
import pyarrow.compute as pc
from substrait.algebra_pb2 import Expression
from substrait.algebra_pb2 import FilterRel
from substrait.algebra_pb2 import FunctionArgument
Expand Down Expand Up @@ -98,6 +100,21 @@ def _arguments(self) -> List[FunctionArgument]:
field_id_dict = arrow.field_name_to_id_dict(self.input_.schema)
return [_fn_arg(field_id_dict[name]) for name in input_fields]

def process_source(self, data: pa.Table) -> ray.Dataset:
return self._transform(self.input_.process_source(data))

def ray_dataset(self,
filter_: Optional[pc.Expression] = None,
fields: Optional[List[str]] = None,
snapshot_id: Optional[int] = None,
reference_read: bool = False) -> ray.Dataset:
return self._transform(
self.input_.ray_dataset(filter_, fields, snapshot_id, reference_read))

@abstractmethod
def _transform(self, ds: ray.Dataset) -> ray.Dataset:
"""Transform a Ray dataset using the UDF."""


class MapTransform(BaseUdfTransform):
"""Map a view by a user defined function."""
Expand Down Expand Up @@ -128,10 +145,9 @@ def from_relation(cls, location: str, metadata: meta.StorageMetadata,
return MapTransform(*_load_udf(location, metadata, rel.project.
expressions[0], rel.project.input, plan))

def process_source(self, data: pa.Table) -> ray.Dataset:
def _transform(self, ds: ray.Dataset) -> ray.Dataset:
batch_size = self.udf.batch_size if self.udf.batch_size >= 0 else "default"
return self.input_.process_source(data).map_batches(self.udf.fn,
batch_size=batch_size)
return ds.map_batches(self.udf.fn, batch_size=batch_size)


@dataclass
Expand Down Expand Up @@ -163,8 +179,8 @@ def from_relation(cls, location: str, metadata: meta.StorageMetadata,
return FilterTransform(*_load_udf(location, metadata, rel.filter.condition,
rel.filter.input, plan))

def process_source(self, data: pa.Table) -> ray.Dataset:
return self.input_.process_source(data).filter(self.udf.fn)
def _transform(self, ds: ray.Dataset) -> ray.Dataset:
return ds.filter(self.udf.fn)


@dataclass
Expand Down
9 changes: 9 additions & 0 deletions python/src/space/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Callable, Dict, List, Optional, TYPE_CHECKING

import pyarrow as pa
import pyarrow.compute as pc
from substrait.algebra_pb2 import Rel

from space.core.fs.factory import create_fs
Expand Down Expand Up @@ -78,6 +79,14 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel:
def process_source(self, data: pa.Table) -> ray.Dataset:
"""Process input data using the transform defined by the view."""

@abstractmethod
def ray_dataset(self,
filter_: Optional[pc.Expression] = None,
fields: Optional[List[str]] = None,
snapshot_id: Optional[int] = None,
reference_read: bool = False) -> ray.Dataset:
"""Return a Ray dataset for a Space view."""

def ray(self) -> RayReadOnlyRunner:
"""Return a Ray runner for the view."""
return RayReadOnlyRunner(self)
Expand Down
Loading

0 comments on commit b9d04dc

Please sign in to comment.