Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[state]: state api refactor. #374

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.antgroup.geaflow.state.StoreType;
import com.antgroup.geaflow.store.IStoreBuilder;
import com.antgroup.geaflow.store.api.key.IKVStore;
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
import com.antgroup.geaflow.store.context.StoreContext;

public class MemoryClusterMetaKVStore<V> implements IClusterMetaKVStore<String, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.antgroup.geaflow.state.DataModel;
import com.antgroup.geaflow.state.StoreType;
import com.antgroup.geaflow.store.IStoreBuilder;
import com.antgroup.geaflow.store.api.key.IKVStore;
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
import com.antgroup.geaflow.store.api.key.IKVStatefulStore;
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
import com.antgroup.geaflow.store.context.StoreContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,15 +29,16 @@ public class RocksdbClusterMetaKVStore<K, V> implements IClusterMetaKVStore<K, V

private static final Integer DEFAULT_VERSION = 1;

private IKVStore<K, Object> kvStore;
private IKVStatefulStore<K, Object> kvStore;
private transient long version;
private String name;

@Override
public void init(StoreContext storeContext) {
IStoreBuilder builder = StoreBuilderFactory.build(StoreType.ROCKSDB.name());
this.name = storeContext.getName();
kvStore = (IKVStore<K, Object>) builder.getStore(DataModel.KV, storeContext.getConfig());
kvStore = (IKVStatefulStore<K, Object>) builder.getStore(DataModel.KV,
storeContext.getConfig());
kvStore.init(storeContext);

// recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.antgroup.geaflow.state.StoreType;
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
import com.antgroup.geaflow.store.IStoreBuilder;
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
import com.antgroup.geaflow.store.context.StoreContext;
import com.antgroup.geaflow.store.redis.KVRedisStore;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.antgroup.geaflow.state.serializer.IKVSerializer;
import com.antgroup.geaflow.store.IStoreBuilder;
import com.antgroup.geaflow.store.api.key.IKVStore;
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
import com.antgroup.geaflow.store.context.StoreContext;
import com.antgroup.geaflow.store.rocksdb.RocksdbStoreBuilder;
import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
import com.antgroup.geaflow.store.IStoreBuilder;
import com.antgroup.geaflow.store.api.key.IKVStore;
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
import com.antgroup.geaflow.store.context.StoreContext;
import java.util.LinkedList;
import java.util.Queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.antgroup.geaflow.state.serializer.DefaultKVSerializer;
import com.antgroup.geaflow.store.IStoreBuilder;
import com.antgroup.geaflow.store.api.key.IKVStore;
import com.antgroup.geaflow.store.api.key.StoreBuilderFactory;
import com.antgroup.geaflow.store.api.StoreBuilderFactory;
import com.antgroup.geaflow.store.context.StoreContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
130 changes: 130 additions & 0 deletions geaflow/geaflow-plugins/geaflow-store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# GeaFlow Store API

GeaFlow Store API provides a unified storage interface for managing state data in graph computing.
This document introduces how to use and implement these storage interfaces.

## Core Interfaces

GeaFlow Store API includes the following core interfaces:

### IStoreBuilder

Interface for building storage instances. Main methods:

- `getStoreDesc()`: Get storage description
- `build()`: Build storage instance

### IBaseStore

The base interface for all storage implementations, defining basic storage operations:

- `init()`: Initialize storage
- `flush()`: Flush data to storage
- `close()`: Close storage connection

### IStatefulStore

State management storage interface, inheriting from IBaseStore, providing stateful management
operations:

- `archive()`: Archive current state
- `recovery()`: Recover state from specified position
- `recoveryLatest()`: Recover state from specified position
- `compact()`: Compress/merge historical states
- `drop()`: Drop all data

### IGraphStore

Graph data storage interface, inheriting from IStatefulStore, used for storing graph vertices and
edges:

- `addVertex()`: Add vertex
- `getVertex()`: Get vertex
- `addEdge()`: Add edge
- `getEdges()`: Get edges
- `getOneDegreeGraph()`: Get one degree Graph.

## Usage Examples

### Basic Graph Storage Example

```java
// 1. Create StoreBuilder
IStoreBuilder builder = new MemoryStoreBuilder();

// 2. Build storage instance
IGraphStore graphStore = (IGraphStore) builder.build();

// 3. Initialize storage
graphStore.init(context);

// 4. Use storage
// Add vertex
graphStore.addVertex(vertex);

// Get vertex
IVertex vertex = graphStore.getVertex(vertexId);

// Add edge
graphStore.addEdge(edge);

// Get edges
List<IEdge> edges = graphStore.getEdges(vertexId);

// 5. Close storage
graphStore.close();
```

### State Management Example

```java
// 1. Create stateful storage instance
IStatefulStore statefulStore = new MemoryStatefulStore();

// 2. Initialize
statefulStore.init(context);

// 3. State management operations
// Archive current state
statefulStore.archive();

// Recover from specific version
statefulStore.recovery(version);

// Compact historical states
statefulStore.compact(version);

// 4. Close storage
statefulStore.close();
```

## Implementing Custom Storage

To implement custom storage, you need to:

1. Implement `IStoreBuilder` interface to create storage builder
2. Implement corresponding storage interfaces based on requirements:
- Basic storage: implement `IBaseStore`
- State management storage: implement `IStatefulStore`
- Graph data storage: implement `IGraphStore`
3. Configure SPI service in `resources/META-INF/services`

For examples, refer to implementations in `geaflow-store-memory` module:

- `MemoryStoreBuilder`
- `StaticGraphMemoryStore`

## Configuration

Specify storage implementation and related parameters through configuration:

```java
Configuration conf = new Configuration();
// Specify storage type
conf.put(StoreConfig.STORE_TYPE, "memory");

// State management configurations
conf.put(StoreConfig.STATE_RETENTION_TIME, "24h"); // State retention time
conf.put(StoreConfig.COMPACT_INTERVAL, "6h"); // State compaction interval

// Other storage related configurations

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,12 @@ public interface IBaseStore {
void init(StoreContext storeContext);

/**
* archive current store data for persistence.
*/
void archive(long checkpointId);

/**
* recovery the store data from persistent storage.
*/
void recovery(long checkpointId);

/**
* recovery the latest store data.
*/
long recoveryLatest();

/**
* compact the store data.
*/
void compact();

/**
* flush the store data to disk or remote storage.
* flush memory data to disk.
*/
void flush();

/**
* close the store handler and ll other used resources.
* close the store handler and all other used resources.
*/
void close();

/**
* delete the disk data.
*/
void drop();
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.antgroup.geaflow.store;

/**
* IStateful store is stateful, which means it ensure data HA and can be recovered.
*/
public interface IStatefulStore extends IBaseStore {

/**
* make a snapshot and ensure data HA.
*/
void archive(long checkpointId);

/**
* recover the store data.
*/
void recovery(long checkpointId);

/**
* recover the latest store data.
*/
long recoveryLatest();

/**
* trigger manual store data compaction.
*/
void compact();

/**
* delete the store data.
*/
void drop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.store.api.key;
package com.antgroup.geaflow.store.api;

import com.antgroup.geaflow.common.errorcode.RuntimeErrors;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
Expand All @@ -31,7 +31,7 @@ public static synchronized IStoreBuilder build(String storeType) {
}

ServiceLoader<IStoreBuilder> serviceLoader = ServiceLoader.load(IStoreBuilder.class);
for (IStoreBuilder storeBuilder: serviceLoader) {
for (IStoreBuilder storeBuilder : serviceLoader) {
if (storeBuilder.getStoreDesc().name().equalsIgnoreCase(storeType)) {
CONCURRENT_TYPE_MAP.put(storeType, storeBuilder);
return storeBuilder;
Expand Down
Loading
Loading