Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Commit

Permalink
Merge pull request #30 from facebookresearch/labgraph_2.0.0
Browse files Browse the repository at this point in the history
Labgraph 2.0.0
  • Loading branch information
jfResearchEng authored Dec 7, 2021
2 parents e4f347c + f8acf63 commit c5f63ba
Show file tree
Hide file tree
Showing 62 changed files with 1,138 additions and 566 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ WORKDIR "/opt/labgraph"
COPY . .

# Build LabGraph Wheel
RUN python3.6 setup.py install --user
RUN python3.6 setup.py sdist bdist_wheel
RUN python3.6 setup_py36.py install --user
RUN python3.6 setup_py36.py sdist bdist_wheel
RUN python3.6 -m pip install auditwheel
RUN auditwheel repair dist/*whl -w dist/

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.Centos
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ RUN chmod 2777 /usr/local/var/run/watchman
# Copy LabGraph files
WORKDIR "/opt/labgraph"
COPY . .
RUN python3.6 setup.py install --user
RUN python3.6 setup_py36.py install --user
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ LabGraph is a streaming framework built by the Facebook Reality Labs Research te
### Method 1 - using PyPI (Recommended)

**Prerequisites**:
- [Python 3.6](https://www.python.org/downloads/release/python-368/)
- Windows and Linux (CentOS 7, CentOS 8, Ubuntu 20.04)
- Python3.6+ (Python 3.8 recommended)
- Mac (Big Sur), Windows and Linux (CentOS 7, CentOS 8, Ubuntu 20.04; Python3.6 only)
- Based on [PyPa](https://github.com/pypa/manylinux), the following Linux systems are also supported: Fedora 32+, Mageia 8+, openSUSE 15.3+, Photon OS 4.0+ (3.0+ with updates), Ubuntu 20.04+

```
Expand All @@ -20,7 +20,7 @@ pip install labgraph
**Prerequisites**:

- [Buck](https://buck.build/setup/getting_started.html) ([Watchman](https://facebook.github.io/watchman/docs/install) also recommended)
- [Python 3.6](https://www.python.org/downloads/release/python-368/) (note: currently incompatible with Anaconda)
- [Python3.6-Python3.10](https://www.python.org/downloads/release/)
- **Windows only:** [Build Tools for Visual Studio 2019](https://visualstudio.microsoft.com/downloads/#build-tools-for-visual-studio-2019)

```
Expand Down
4 changes: 2 additions & 2 deletions labgraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"Connections",
"CPPNodeConfig",
"DeferredMessage",
"LabGraphError",
"LabgraphError",
"Event",
"EventGraph",
"EventPublishingHeap",
Expand Down Expand Up @@ -108,4 +108,4 @@
TimestampAligner,
run,
)
from .util import LabGraphError
from .util import LabgraphError
2 changes: 1 addition & 1 deletion labgraph/_cthulhu/bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright 2004-present Facebook. All Rights Reserved.

# This is a wrapper around cthulhubindings to randomize the name of the shared memory it
# uses. This allows us to keep shared memory for different LabGraph graphs separate
# uses. This allows us to keep shared memory for different Labgraph graphs separate
# when they are running simultaneously.

import os
Expand Down
41 changes: 21 additions & 20 deletions labgraph/_cthulhu/cthulhu.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from typing import Callable, Generic, Optional, Type, TypeVar

from ..messages.message import Message
from ..util.error import LabGraphError
from ..util.error import LabgraphError
from ..util.typing import is_generic_subclass
from .bindings import ( # type: ignore
PerformanceSummary,
StreamConsumer,
Expand All @@ -22,7 +23,7 @@
T = TypeVar("T")


class LabGraphCallbackParams(Generic[T]):
class LabgraphCallbackParams(Generic[T]):
def __init__(self, message: T, stream_id: Optional[str]) -> None:
self.message = message
self.stream_id = stream_id
Expand All @@ -31,7 +32,7 @@ def __init__(self, message: T, stream_id: Optional[str]) -> None:
stream_id: Optional[str]


LabGraphCallback = Callable[..., None]
LabgraphCallback = Callable[..., None]
CthulhuCallback = Callable[[StreamSample], None]


Expand All @@ -43,17 +44,17 @@ class Mode(Enum):
class Consumer(StreamConsumer): # type: ignore
"""
Convenience wrapper of Cthulhu's `StreamConsumer` that allows us to specify a
callback accepting LabGraph `Message`s.
callback accepting Labgraph `Message`s.
Args:
stream_interface: The stream interface to use.
sample_callback: The callback to use (uses LabGraph messages).
sample_callback: The callback to use (uses Labgraph messages).
"""

def __init__(
self,
stream_interface: StreamInterface,
sample_callback: LabGraphCallback,
sample_callback: LabgraphCallback,
mode: Mode = Mode.SYNC,
stream_id: Optional[str] = None,
) -> None:
Expand All @@ -66,9 +67,9 @@ def __init__(
)
self.stream_id = stream_id

def _to_cthulhu_callback(self, callback: LabGraphCallback) -> CthulhuCallback:
def _to_cthulhu_callback(self, callback: LabgraphCallback) -> CthulhuCallback:
"""
Given a LabGraph callback, creates a Cthulhu callback (accepting
Given a Labgraph callback, creates a Cthulhu callback (accepting
`StreamSample`s).
"""

Expand All @@ -83,23 +84,23 @@ def wrapped_callback(sample: StreamSample) -> None:
message_types = [
arg_type
for arg_type in annotated_types.values()
if issubclass(arg_type, Message)
or issubclass(arg_type, LabGraphCallbackParams)
if is_generic_subclass(arg_type, LabgraphCallbackParams)
or issubclass(arg_type, Message)
]
assert len(message_types) == 1

message_type = message_types[0]
if issubclass(message_type, Message):
message = message_type(__sample__=sample)
callback(message)
elif issubclass(message_type, LabGraphCallbackParams):
if is_generic_subclass(message_type, LabgraphCallbackParams):
(arg_type,) = message_type.__args__
message = arg_type(__sample__=sample)
params = LabGraphCallbackParams(message, self.stream_id)
params = LabgraphCallbackParams(message, self.stream_id)
callback(params)
elif issubclass(message_type, Message):
message = message_type(__sample__=sample)
callback(message)
else:
raise TypeError(
f"Expected callback taking type '{Message.__name__}' or '{LabGraphCallbackParams.__name__}', got '{message_type.__name__}'"
f"Expected callback taking type '{Message.__name__}' or '{LabgraphCallbackParams.__name__}', got '{message_type.__name__}'"
)

return wrapped_callback
Expand All @@ -118,7 +119,7 @@ def __exit__(

class Producer(StreamProducer): # type: ignore
"""
Convenience wrapper of Cthulhu's `StreamProducer` that accepts a LabGraph message.
Convenience wrapper of Cthulhu's `StreamProducer` that accepts a Labgraph message.
Args:
stream_interface: The stream interface to use.
Expand All @@ -134,7 +135,7 @@ def __init__(

def produce_message(self, message: Message) -> None:
"""
Produces a LabGraph message to the Cthulhu stream.
Produces a Labgraph message to the Cthulhu stream.
Args:
message: The message to produce.
Expand All @@ -155,7 +156,7 @@ def __exit__(

def register_stream(name: str, message_type: Type[Message]) -> StreamInterface:
"""
Registers a stream with a LabGraph message type to the Cthulhu stream registry.
Registers a stream with a Labgraph message type to the Cthulhu stream registry.
Args:
name: The name of the stream.
Expand All @@ -168,7 +169,7 @@ def register_stream(name: str, message_type: Type[Message]) -> StreamInterface:
type_id = existing_stream.description.type
existing_type = typeRegistry().findTypeID(type_id)
if existing_type.typeName != message_type.versioned_name:
raise LabGraphError(
raise LabgraphError(
f"Tried to register stream '{name}' with type "
f"'{message_type.versioned_name}', but it already exists with type "
f"'{existing_type.typeName}'"
Expand Down
12 changes: 6 additions & 6 deletions labgraph/_cthulhu/tests/test_cthulhu.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ...messages.message import Message
from ...util.random import random_string
from ...util.testing import local_test
from ..cthulhu import Consumer, LabGraphCallbackParams, Producer, register_stream
from ..cthulhu import Consumer, LabgraphCallbackParams, Producer, register_stream


RANDOM_ID_LENGTH = 128
Expand All @@ -23,7 +23,7 @@ class MyMessage(Message):
@local_test
def test_producer_and_consumer() -> None:
"""
Tests that we can use the LabGraph wrappers around the Cthulhu APIs to publish
Tests that we can use the Labgraph wrappers around the Cthulhu APIs to publish
and subscribe to messages.
"""
stream_name = random_string(length=RANDOM_ID_LENGTH)
Expand All @@ -33,7 +33,7 @@ def test_producer_and_consumer() -> None:

with Producer(stream_interface=stream_interface) as producer:

def callback(params: LabGraphCallbackParams[MyMessage]) -> None:
def callback(params: LabgraphCallbackParams[MyMessage]) -> None:
received_messages.append(params.message)

with Consumer(stream_interface=stream_interface, sample_callback=callback):
Expand All @@ -51,7 +51,7 @@ def callback(params: LabGraphCallbackParams[MyMessage]) -> None:
@local_test
def test_complex_graph() -> None:
"""
Tests that we can use the LabGraph wrappers around the Cthulhu APIs to stream
Tests that we can use the Labgraph wrappers around the Cthulhu APIs to stream
messages in a more complex graph.
"""
stream_name1 = random_string(length=RANDOM_ID_LENGTH)
Expand All @@ -65,14 +65,14 @@ def test_complex_graph() -> None:

with Producer(stream_interface=stream2) as producer2:

def transform_callback(params: LabGraphCallbackParams[MyMessage]) -> None:
def transform_callback(params: LabgraphCallbackParams[MyMessage]) -> None:
producer2.produce_message(
MyMessage(int_field=params.message.int_field * 2)
)

with Consumer(stream_interface=stream1, sample_callback=transform_callback):

def sink_callback(params: LabGraphCallbackParams[MyMessage]) -> None:
def sink_callback(params: LabgraphCallbackParams[MyMessage]) -> None:
received_messages.append(params.message)

with Consumer(stream_interface=stream2, sample_callback=sink_callback):
Expand Down
2 changes: 1 addition & 1 deletion labgraph/cpp/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace py = pybind11;
namespace labgraph {

void bindings(py::module_& m) {
m.doc() = "LabGraph C++: C++ nodes for LabGraph";
m.doc() = "Labgraph C++: C++ nodes for Labgraph";

py::class_<Node>(m, "Node")
.def("setup", &Node::setup)
Expand Down
18 changes: 9 additions & 9 deletions labgraph/cpp/include/labgraph/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace labgraph {
/**
* struct NodeTopic
*
* Describes a mapping between a LabGraph topic and a Cthulhu stream.
* Describes a mapping between a Labgraph topic and a Cthulhu stream.
*/
struct NodeTopic {
std::string topicName;
Expand All @@ -20,11 +20,11 @@ struct NodeTopic {
/**
* struct NodeBootstrapInfo
*
* Contains all information needed to bootstrap a LabGraph C++ node into a state ready
* for execution in an existing LabGraph graph.
* Contains all information needed to bootstrap a Labgraph C++ node into a state ready
* for execution in an existing Labgraph graph.
*/
struct NodeBootstrapInfo {
std::vector<NodeTopic> topics; // Mapping of LabGraph topics to Cthulhu streams
std::vector<NodeTopic> topics; // Mapping of Labgraph topics to Cthulhu streams
};

typedef std::function<void()> Publisher;
Expand Down Expand Up @@ -67,26 +67,26 @@ struct TransformerInfo {
/**
* class Node
*
* Describes a C++ node in a LabGraph graph.
* Describes a C++ node in a Labgraph graph.
*/
class Node {
public:
Node();
virtual ~Node();

/*** Setup function that is run when the LabGraph graph is starting up. */
/*** Setup function that is run when the Labgraph graph is starting up. */
virtual void setup();

/**
* Entry point that is run in the LabGraph graph to start all the node's publishers.
* Entry point that is run in the Labgraph graph to start all the node's publishers.
*/
void run();

/*** Cleanup function that is run when the LabGraph graph is shutting down. */
/*** Cleanup function that is run when the Labgraph graph is shutting down. */
virtual void cleanup();

/**
* Bootstrapping function that is run by the LabGraph graph to connect this node's
* Bootstrapping function that is run by the Labgraph graph to connect this node's
* topics with their corresponding Cthulhu streams.
*/
void bootstrap(NodeBootstrapInfo& bootstrapInfo);
Expand Down
2 changes: 1 addition & 1 deletion labgraph/cpp/tests/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace py = pybind11;

PYBIND11_MODULE(MyCPPNodes, m) {
m.doc() = "LabGraph C++: MyCPPNodes unit test";
m.doc() = "Labgraph C++: MyCPPNodes unit test";

std::vector<std::string> sourceTopics = {"A"};
labgraph::bindNode<MyCPPSource>(m, "MyCPPSource", sourceTopics)
Expand Down
10 changes: 5 additions & 5 deletions labgraph/events/event_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from ..graphs.topic import Topic
from ..messages.message import Message, TimestampedMessage
from ..util.error import LabGraphError
from ..util.error import LabgraphError
from ..util.min_heap import MinHeap


Expand Down Expand Up @@ -61,7 +61,7 @@ class Event:

def __post_init__(self) -> None:
if self.duration < 0.0:
raise LabGraphError("event cannot have a negative duration.")
raise LabgraphError("event cannot have a negative duration.")

def __hash__(self) -> int: # Needed for usage as dictionary key
return hash(id(self))
Expand Down Expand Up @@ -126,20 +126,20 @@ def _add_start_event(self, event: Event) -> None:
Adds `event` to the heap as the first event of the graph.
"""
if event.delay != 0.0:
raise LabGraphError("start_event cannot have a non-zero delay.")
raise LabgraphError("start_event cannot have a non-zero delay.")
self._push_heap_entry(event, 0.0)

def _get_accumulated_time(
self, event: Event, previous_event: Event, add_duration: bool
) -> float:
accumulated_time = self._accumulated_times.get(previous_event)
if accumulated_time is None:
raise LabGraphError("previous_event has not been inserted yet.")
raise LabgraphError("previous_event has not been inserted yet.")
if add_duration:
accumulated_time += previous_event.duration
accumulated_time += event.delay
if accumulated_time < 0.0:
raise LabGraphError("event occurs before start time.")
raise LabgraphError("event occurs before start time.")
return accumulated_time

def _push_heap_entry(self, event: Event, accumulated_time: float) -> None:
Expand Down
2 changes: 1 addition & 1 deletion labgraph/events/event_generator_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright 2004-present Facebook. All Rights Reserved.

from abc import abstractmethod
from time import time # TODO: Replace with LabGraph clock
from time import time # TODO: Replace with Labgraph clock
from typing import Any, Dict, List, Tuple

from ..graphs.method import AsyncPublisher, get_method_metadata
Expand Down
Loading

0 comments on commit c5f63ba

Please sign in to comment.