Skip to content

Commit

Permalink
Add a RayMaterializedViewRunner for refreshing materialized views (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhou Fang authored Dec 29, 2023
1 parent 7b904e6 commit 36abe74
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 16 deletions.
7 changes: 4 additions & 3 deletions python/src/space/core/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _write_arrow(self, data: pa.Table) -> Optional[runtime.Patch]:
data_files = self._storage.data_files(pk_filter)

mode = self._options.mode
patches: List[runtime.Patch] = []
patches: List[Optional[runtime.Patch]] = []
if data_files.index_files:
if mode == InsertOptions.Mode.INSERT:
read_op = FileSetReadOp(
Expand All @@ -105,14 +105,15 @@ def _write_arrow(self, data: pa.Table) -> Optional[runtime.Patch]:
return utils.merge_patches(patches)


def _try_delete_data(op: BaseDeleteOp, patches: List[runtime.Patch]) -> None:
def _try_delete_data(op: BaseDeleteOp,
patches: List[Optional[runtime.Patch]]) -> None:
patch = op.delete()
if patch is not None:
patches.append(patch)


def _try_append_data(op: BaseAppendOp, data: pa.Table,
patches: List[runtime.Patch]) -> None:
patches: List[Optional[runtime.Patch]]) -> None:
op.write(data)
patch = op.finish()
if patch is not None:
Expand Down
6 changes: 5 additions & 1 deletion python/src/space/core/ops/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,17 @@ def primary_key_filter(primary_keys: List[str],
return filter_


def merge_patches(patches: List[runtime.Patch]) -> Optional[runtime.Patch]:
def merge_patches(
patches: List[Optional[runtime.Patch]]) -> Optional[runtime.Patch]:
"""Merge multiple patches into one."""
patch = runtime.Patch()
stats_update = meta.StorageStatistics()

empty = True
for p in patches:
if p is None:
continue

if empty:
empty = False

Expand Down
23 changes: 15 additions & 8 deletions python/src/space/core/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,25 @@ def diff(self, start_version: Union[int],
"""


class BaseReadWriteRunner(BaseReadOnlyRunner):
"""Abstract base runner class."""
class StorageCommitMixin:
"""Provide storage commit utilities."""

def __init__(self, storage: Storage):
self._storage = storage

def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult:
if patch is not None:
self._storage.commit(patch)

return _job_result(patch)


class BaseReadWriteRunner(StorageCommitMixin, BaseReadOnlyRunner):
"""Abstract base runner class."""

def __init__(self, storage: Storage):
StorageCommitMixin.__init__(self, storage)

@abstractmethod
def append(self, data: InputData) -> runtime.JobResult:
"""Append data into the dataset."""
Expand Down Expand Up @@ -124,12 +137,6 @@ def _insert(self, data: InputData,
def delete(self, filter_: pc.Expression) -> runtime.JobResult:
"""Delete data matching the filter from the dataset."""

def _try_commit(self, patch: Optional[runtime.Patch]) -> runtime.JobResult:
if patch is not None:
self._storage.commit(patch)

return _job_result(patch)


class LocalRunner(BaseReadWriteRunner):
"""A runner that runs operations locally."""
Expand Down
14 changes: 13 additions & 1 deletion python/src/space/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from space.core.utils.lazy_imports_utils import ray # pylint: disable=unused-import
from space.core.utils.paths import UDF_DIR, metadata_dir
from space.core.utils.plans import LogicalPlanBuilder, UserDefinedFn
from space.ray.runners import RayReadOnlyRunner
from space.core.runners import LocalRunner
from space.ray.runners import RayMaterializedViewRunner, RayReadOnlyRunner

if TYPE_CHECKING:
from space.core.datasets import Dataset
Expand Down Expand Up @@ -159,6 +160,17 @@ def view(self) -> View:
"""Return view of the materialized view."""
return self._view

def ray(self) -> RayMaterializedViewRunner:
"""Return a Ray runner for the materialized view."""
return RayMaterializedViewRunner(self)

def local(self) -> LocalRunner:
"""Get a runner that runs operations locally.
TODO: should use a read-only local runner.
"""
return LocalRunner(self._storage)

@classmethod
def create(cls, location: str, view: View, logical_plan: meta.LogicalPlan,
udfs: Dict[str, UserDefinedFn]) -> MaterializedView:
Expand Down
46 changes: 44 additions & 2 deletions python/src/space/ray/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import pyarrow as pa
import pyarrow.compute as pc

from space.core.runners import BaseReadOnlyRunner
from space.core.runners import BaseReadOnlyRunner, StorageCommitMixin
from space.core.ops import utils
from space.core.ops.append import LocalAppendOp
from space.core.ops.change_data import ChangeType, read_change_data
from space.core.ops.delete import FileSetDeleteOp
import space.core.proto.runtime_pb2 as runtime
from space.core.utils.lazy_imports_utils import ray
from space.core.versions.utils import version_to_snapshot_id

if TYPE_CHECKING:
from space.core.views import View
from space.core.views import MaterializedView, View


class RayReadOnlyRunner(BaseReadOnlyRunner):
Expand Down Expand Up @@ -58,3 +62,41 @@ def diff(self, start_version: Union[int],
processed_ray_data = self._view.process_source(data)
processed_data = ray.get(processed_ray_data.to_arrow_refs())
yield change_type, pa.concat_tables(processed_data)


class RayMaterializedViewRunner(RayReadOnlyRunner, StorageCommitMixin):
"""Ray runner for materialized views."""

def __init__(self, mv: MaterializedView):
RayReadOnlyRunner.__init__(self, mv.view)
StorageCommitMixin.__init__(self, mv.storage)

def refresh(self, target_version: Union[int]) -> runtime.JobResult:
"""Refresh the materialized view by synchronizing from source dataset."""
start_snapshot_id = self._storage.metadata.current_snapshot_id
end_snapshot_id = version_to_snapshot_id(target_version)

patches: List[Optional[runtime.Patch]] = []
for change_type, data in self.diff(start_snapshot_id, end_snapshot_id):
if change_type == ChangeType.DELETE:
patches.append(self._process_delete(data))
elif change_type == ChangeType.ADD:
patches.append(self._process_append(data))
else:
raise NotImplementedError(f"Change type {change_type} not supported")

return self._try_commit(utils.merge_patches(patches))

def _process_delete(self, data: pa.Table) -> Optional[runtime.Patch]:
filter_ = utils.primary_key_filter(self._storage.primary_keys, data)
if filter_ is None:
return None

op = FileSetDeleteOp(self._storage.location, self._storage.metadata,
self._storage.data_files(filter_), filter_)
return op.delete()

def _process_append(self, data: pa.Table) -> Optional[runtime.Patch]:
op = LocalAppendOp(self._storage.location, self._storage.metadata)
op.write(data)
return op.finish()
16 changes: 15 additions & 1 deletion python/tests/ray/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def sample_dataset(tmp_path, sample_schema):

class TestRayReadOnlyRunner:

def test_diff_map_batches(self, sample_dataset):
def test_diff_map_batches(self, tmp_path, sample_dataset):
# A sample UDF for testing.
def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["float64"] = batch["float64"] + 1
Expand All @@ -62,6 +62,7 @@ def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
input_fields=["int64", "binary"],
output_schema=view_schema,
output_record_fields=["binary"])
mv = view.materialize(str(tmp_path / "mv"))

ds_runner = sample_dataset.local()
view_runner = view.ray()
Expand Down Expand Up @@ -92,6 +93,19 @@ def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
# Test several changes.
assert list(view_runner.diff(0, 2)) == [expected_change0, expected_change1]

# Test materialized views.
ray_runner = mv.ray()
local = mv.local()

ray_runner.refresh(1)
assert local.read_all() == expected_change0[1]

ray_runner.refresh(2)
assert local.read_all() == pa.Table.from_pydict({
"int64": [1, 3],
"float64": [1.1, 1.3],
})

def test_diff_filter(self, sample_dataset):
# A sample UDF for testing.
def _sample_filter_udf(row: Dict[str, Any]) -> Dict[str, Any]:
Expand Down

0 comments on commit 36abe74

Please sign in to comment.