Skip to content

Commit

Permalink
Add initial implementation of storage commit (addition only) (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhou Fang authored Dec 23, 2023
1 parent 794913f commit 01d0766
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 66 deletions.
4 changes: 2 additions & 2 deletions python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ def finish(self) -> Optional[runtime.Patch]:
# Write manifest files.
index_manifest_full_path = self._index_manifest_writer.finish()
if index_manifest_full_path is not None:
self._patch.added_index_manifest_files.append(
self._patch.addition.index_manifest_files.append(
self.short_path(index_manifest_full_path))

record_manifest_path = self._record_manifest_writer.finish()
if record_manifest_path:
self._patch.added_record_manifest_files.append(
self._patch.addition.record_manifest_files.append(
self.short_path(record_manifest_path))

if self._patch.storage_statistics_update.num_rows == 0:
Expand Down
29 changes: 23 additions & 6 deletions python/src/space/core/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ message EntryPoint {
// Metadata persisting the current status of a storage, including logical
// metadata such as schema, and physical metadata persisted as a history of
// snapshots
// NEXT_ID: 8
// NEXT_ID: 7
message StorageMetadata {
// Create time of the storage.
google.protobuf.Timestamp create_time = 1;
Expand All @@ -56,14 +56,11 @@ message StorageMetadata {

// All alive snapshots with snapshot ID as key.
map<int64, Snapshot> snapshots = 6;

// Statistics of all data in the storage.
StorageStatistics storage_statistics = 7;
}

// The storage logical schema where user provided types are persisted instead
// of their physical storage format.
// NEXT_ID: 2
// NEXT_ID: 4
message Schema {
// Fields persisted as Substrait named struct.
substrait.NamedStruct fields = 1;
Expand All @@ -77,13 +74,33 @@ message Schema {

// Storage snapshot persisting physical metadata such as manifest file paths.
// It is used for obtaining all alive data file paths for a given snapshot.
// NEXT_ID: 3
// NEXT_ID: 5
message Snapshot {
// The snapshot ID.
int64 snapshot_id = 1;

// The create time of the snapshot.
google.protobuf.Timestamp create_time = 2;

// All data in this snapshot.
oneof data_info {
// Manifest file information embedded in Snapshot. Preferred option when
// the number of manifest files are small.
ManifestFiles manifest_files = 3;
}

// Statistics of all data in the storage.
StorageStatistics storage_statistics = 4;
}

// Stores information of manifest files.
// NEXT_ID: 3
message ManifestFiles {
// Manifest for index files.
repeated string index_manifest_files = 1;

// Manifest for record files.
repeated string record_manifest_files = 2;
}

// Statistics of storage data.
Expand Down
26 changes: 14 additions & 12 deletions python/src/space/core/proto/metadata_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 46 additions & 12 deletions python/src/space/core/proto/metadata_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class StorageMetadata(google.protobuf.message.Message):
"""Metadata persisting the current status of a storage, including logical
metadata such as schema, and physical metadata persisted as a history of
snapshots
NEXT_ID: 8
NEXT_ID: 7
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand Down Expand Up @@ -107,7 +107,6 @@ class StorageMetadata(google.protobuf.message.Message):
SCHEMA_FIELD_NUMBER: builtins.int
CURRENT_SNAPSHOT_ID_FIELD_NUMBER: builtins.int
SNAPSHOTS_FIELD_NUMBER: builtins.int
STORAGE_STATISTICS_FIELD_NUMBER: builtins.int
@property
def create_time(self) -> google.protobuf.timestamp_pb2.Timestamp:
"""Create time of the storage."""
Expand All @@ -123,9 +122,6 @@ class StorageMetadata(google.protobuf.message.Message):
@property
def snapshots(self) -> google.protobuf.internal.containers.MessageMap[builtins.int, global___Snapshot]:
"""All alive snapshots with snapshot ID as key."""
@property
def storage_statistics(self) -> global___StorageStatistics:
"""Statistics of all data in the storage."""
def __init__(
self,
*,
Expand All @@ -135,18 +131,17 @@ class StorageMetadata(google.protobuf.message.Message):
schema: global___Schema | None = ...,
current_snapshot_id: builtins.int = ...,
snapshots: collections.abc.Mapping[builtins.int, global___Snapshot] | None = ...,
storage_statistics: global___StorageStatistics | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "schema", b"schema", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "schema", b"schema", "snapshots", b"snapshots", "storage_statistics", b"storage_statistics", "type", b"type"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "last_update_time", b"last_update_time", "schema", b"schema"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "current_snapshot_id", b"current_snapshot_id", "last_update_time", b"last_update_time", "schema", b"schema", "snapshots", b"snapshots", "type", b"type"]) -> None: ...

global___StorageMetadata = StorageMetadata

@typing_extensions.final
class Schema(google.protobuf.message.Message):
"""The storage logical schema where user provided types are persisted instead
of their physical storage format.
NEXT_ID: 2
NEXT_ID: 4
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand Down Expand Up @@ -179,29 +174,68 @@ global___Schema = Schema
class Snapshot(google.protobuf.message.Message):
"""Storage snapshot persisting physical metadata such as manifest file paths.
It is used for obtaining all alive data file paths for a given snapshot.
NEXT_ID: 3
NEXT_ID: 5
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

SNAPSHOT_ID_FIELD_NUMBER: builtins.int
CREATE_TIME_FIELD_NUMBER: builtins.int
MANIFEST_FILES_FIELD_NUMBER: builtins.int
STORAGE_STATISTICS_FIELD_NUMBER: builtins.int
snapshot_id: builtins.int
"""The snapshot ID."""
@property
def create_time(self) -> google.protobuf.timestamp_pb2.Timestamp:
"""The create time of the snapshot."""
@property
def manifest_files(self) -> global___ManifestFiles:
"""Manifest file information embedded in Snapshot. Preferred option when
the number of manifest files are small.
"""
@property
def storage_statistics(self) -> global___StorageStatistics:
"""Statistics of all data in the storage."""
def __init__(
self,
*,
snapshot_id: builtins.int = ...,
create_time: google.protobuf.timestamp_pb2.Timestamp | None = ...,
manifest_files: global___ManifestFiles | None = ...,
storage_statistics: global___StorageStatistics | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "snapshot_id", b"snapshot_id"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "snapshot_id", b"snapshot_id", "storage_statistics", b"storage_statistics"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["data_info", b"data_info"]) -> typing_extensions.Literal["manifest_files"] | None: ...

global___Snapshot = Snapshot

@typing_extensions.final
class ManifestFiles(google.protobuf.message.Message):
"""Stores information of manifest files.
NEXT_ID: 3
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

INDEX_MANIFEST_FILES_FIELD_NUMBER: builtins.int
RECORD_MANIFEST_FILES_FIELD_NUMBER: builtins.int
@property
def index_manifest_files(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
"""Manifest for index files."""
@property
def record_manifest_files(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
"""Manifest for record files."""
def __init__(
self,
*,
index_manifest_files: collections.abc.Iterable[builtins.str] | None = ...,
record_manifest_files: collections.abc.Iterable[builtins.str] | None = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["index_manifest_files", b"index_manifest_files", "record_manifest_files", b"record_manifest_files"]) -> None: ...

global___ManifestFiles = ManifestFiles

@typing_extensions.final
class StorageStatistics(google.protobuf.message.Message):
"""Statistics of storage data.
Expand Down
13 changes: 5 additions & 8 deletions python/src/space/core/proto/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@ package space.proto;
// A patch describing metadata changes to the storage.
// NEXT_ID: 4
message Patch {
// Index manifest file paths newly added to the storage.
repeated string added_index_manifest_files = 1;
// Manifest files to add to the storage.
ManifestFiles addition = 1;

// Index manifest file paths to be removed from the storage.
repeated string deleted_index_manifest_files = 2;

// Record manifest file paths newly added to the storage.
repeated string added_record_manifest_files = 3;
// Manifest files to remove from the storage.
ManifestFiles deletion = 2;

// The change of the storage statistics.
StorageStatistics storage_statistics_update = 4;
StorageStatistics storage_statistics_update = 3;
}

// Result of a job.
Expand Down
12 changes: 6 additions & 6 deletions python/src/space/core/proto/runtime_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 10 additions & 17 deletions python/src/space/core/proto/runtime_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import builtins
import collections.abc
import google.protobuf.descriptor
import google.protobuf.internal.containers
import google.protobuf.internal.enum_type_wrapper
import google.protobuf.message
import space.core.proto.metadata_pb2
Expand All @@ -40,32 +38,27 @@ class Patch(google.protobuf.message.Message):

DESCRIPTOR: google.protobuf.descriptor.Descriptor

ADDED_INDEX_MANIFEST_FILES_FIELD_NUMBER: builtins.int
DELETED_INDEX_MANIFEST_FILES_FIELD_NUMBER: builtins.int
ADDED_RECORD_MANIFEST_FILES_FIELD_NUMBER: builtins.int
ADDITION_FIELD_NUMBER: builtins.int
DELETION_FIELD_NUMBER: builtins.int
STORAGE_STATISTICS_UPDATE_FIELD_NUMBER: builtins.int
@property
def added_index_manifest_files(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
"""Index manifest file paths newly added to the storage."""
def addition(self) -> space.core.proto.metadata_pb2.ManifestFiles:
"""Manifest files to add to the storage."""
@property
def deleted_index_manifest_files(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
"""Index manifest file paths to be removed from the storage."""
@property
def added_record_manifest_files(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
"""Record manifest file paths newly added to the storage."""
def deletion(self) -> space.core.proto.metadata_pb2.ManifestFiles:
"""Manifest files to remove from the storage."""
@property
def storage_statistics_update(self) -> space.core.proto.metadata_pb2.StorageStatistics:
"""The change of the storage statistics."""
def __init__(
self,
*,
added_index_manifest_files: collections.abc.Iterable[builtins.str] | None = ...,
deleted_index_manifest_files: collections.abc.Iterable[builtins.str] | None = ...,
added_record_manifest_files: collections.abc.Iterable[builtins.str] | None = ...,
addition: space.core.proto.metadata_pb2.ManifestFiles | None = ...,
deletion: space.core.proto.metadata_pb2.ManifestFiles | None = ...,
storage_statistics_update: space.core.proto.metadata_pb2.StorageStatistics | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["storage_statistics_update", b"storage_statistics_update"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["added_index_manifest_files", b"added_index_manifest_files", "added_record_manifest_files", b"added_record_manifest_files", "deleted_index_manifest_files", b"deleted_index_manifest_files", "storage_statistics_update", b"storage_statistics_update"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["addition", b"addition", "deletion", b"deletion", "storage_statistics_update", b"storage_statistics_update"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["addition", b"addition", "deletion", b"deletion", "storage_statistics_update", b"storage_statistics_update"]) -> None: ...

global___Patch = Patch

Expand Down
Loading

0 comments on commit 01d0766

Please sign in to comment.