Skip to content

New Workflow API

Tom Diethe edited this page Jul 26, 2017 · 2 revisions

In the SPHERE-HyperStream repository, there is an example of the old- and new-style workflow APIs, which are both given below for comparison.

Old style API

From here

workflow_id = "lda_localisation_model_predict:

S = hyperstream.channel_manager.sphere
D = hyperstream.channel_manager.mongo
M = hyperstream.channel_manager.memory
A = hyperstream.channel_manager.assets

with hyperstream.create_workflow(
        workflow_id=workflow_id,
        name="Live Predictions",
        owner="TD",
        description="Deploy the LDA localisation model for live predictions",
        online=True,
        safe=safe) as w:

    nodes = (
        ("rss_raw",                                 S, ["H"]),
        ("location_prediction",                     D, ["H", "LocalisationModels"]),
        ("location_prediction_lda",                 M, ["H"]),
        ("every_2s",                                M, ["H.W"]),
        ("rss_per_uid",                             M, ["H.W"]),
        ("rss_per_uid_2s",                          M, ["H.W"]),
        ("location_prediction_models_broadcasted",  M, ["H.W"]),
        ("predicted_locations_broadcasted",         D, ["H.W"]),
        ("wearables_by_house",                      A, ["H"]),
        ("access_points_by_house",                  A, ["H"])
    )

    # Create all of the nodes
    N = dict((stream_name, w.create_node(stream_name, channel, plate_ids))
             for stream_name, channel, plate_ids in nodes)

    w.create_multi_output_factor(
        tool=hyperstream.channel_manager.get_tool(
            name="sphere",
            parameters=dict(modality="wearable", elements={"rss"})
        ),
        source=None,
        splitting_node=None,
        sink=N["rss_raw"])

    w.create_multi_output_factor(
        tool=hyperstream.channel_manager.get_tool(
            name="splitter_from_stream",
            parameters=dict(
                element="uid"
            )
        ),
        source=N["rss_raw"],
        splitting_node=N["wearables_by_house"],
        sink=N["rss_per_uid"])

    w.create_factor(
        tool=hyperstream.channel_manager.get_tool(
            name="sliding_window",
            parameters=dict(lower=-2.0, upper=0.0, increment=2.0)
        ),
        sources=None,
        sink=N["every_2s"])

    def component_wise_max(init_value=None, id_field='aid', value_field='wearable-rss'):
        if init_value is None:
            init_value = {}

        def func(data):
            result = init_value.copy()
            for (time, value) in data:
                if value[id_field] in result:
                    result[value[id_field]] = max(result[value[id_field]], value[value_field])
                else:
                    result[value[id_field]] = value[value_field]
            return result

        return func

    w.create_factor(
        tool=hyperstream.channel_manager.get_tool(
            name="sliding_apply",
            parameters=dict(func=component_wise_max())
        ),
        sources=[N["every_2s"], N["rss_per_uid"]],
        sink=N["rss_per_uid_2s"])

    w.create_factor(
        tool=hyperstream.channel_manager.get_tool(
            name="index_of",
            parameters=dict(selector_meta_data="localisation_model", index="lda")
        ),
        sources=[N['location_prediction']],
        sink=N["location_prediction_lda"])

    w.create_multi_output_factor(
        tool=hyperstream.channel_manager.get_tool(
            name="stream_broadcaster_from_stream",
            parameters=dict(func=lambda x: x.last())
        ),
        source=N["location_prediction_lda"],
        splitting_node=N["wearables_by_house"],
        sink=N["location_prediction_models_broadcasted"])

    w.create_factor(
        tool=hyperstream.channel_manager.get_tool(
            name="localisation_model_predict",
            parameters=dict()
        ),
        sources=[N['location_prediction_models_broadcasted'], N["rss_per_uid_2s"]],
        sink=N["predicted_locations_broadcasted"])

    return w

Note that the factor creation is done using w.create_factor() (and related functions) and that the tools are loaded using get_tool(). Using the new API, this now looks like:

## New style API

From here

workflow_id = "lda_localisation_model_predict"

sp = hs.plugins.sphere

S = hs.channel_manager.sphere
D = hs.channel_manager.mongo
M = hs.channel_manager.memory
A = hs.channel_manager.assets

houses = hs.plate_manager.plates["H"]
wearables = hs.plate_manager.plates["H.W"]
models = hs.plate_manager.plates["LocalisationModels"]

with hs.create_workflow(
        workflow_id=workflow_id,
        name="Live Predictions",
        owner="TD",
        description="Deploy the LDA localisation model for live predictions",
        online=True,
        safe=safe) as w:

    nodes = (
        ("rss_raw",                                 S, ["H"]),
        ("location_prediction",                     D, ["H", "LocalisationModels"]),
        ("location_prediction_lda",                 M, ["H"]),
        ("every_2s",                                M, ["H.W"]),
        ("rss_per_uid",                             M, ["H.W"]),
        ("rss_per_uid_2s",                          M, ["H.W"]),
        ("location_prediction_models_broadcasted",  M, ["H.W"]),
        ("predicted_locations_broadcasted",         D, ["H.W"]),
        ("wearables_by_house",                      A, ["H"]),
        ("access_points_by_house",                  A, ["H"])
    )

    # Create all of the nodes
    N = dict((stream_name, w.create_node(stream_name, channel, plate_ids))
             for stream_name, channel, plate_ids in nodes)

    def component_wise_max(init_value=None, id_field='aid', value_field='wearable-rss'):
        if init_value is None:
            init_value = {}

        def func(data):
            result = init_value.copy()
            for (time, value) in data:
                if value[id_field] in result:
                    result[value[id_field]] = max(result[value[id_field]], value[value_field])
                else:
                    result[value[id_field]] = value[value_field]
            return result

        return func

    for house in houses:
        N["rss_raw"][house] = sp.factors.sphere(source=None, modality="wearable", elements={"rss"})

        for wearable in wearables[house]:
            N["rss_per_uid"][house][wearable] = hs.factors.splitter_from_stream(
                source=N["rss_raw"], splitting_node=N["wearables_by_house"], element="uid")

            N["every_2s"][house][wearable] = hs.factors.sliding_window(
                sources=None, lower=-2.0, upper=0.0, increment=2.0)

            N["rss_per_uid_2s"][house][wearable] = hs.factors.sliding_apply(
                sources=[N["every_2s"], N["rss_per_uid"]], func=component_wise_max())

        N["location_prediction_lda"][house] = hs.factors.index_of(
            sources=[N["location_prediction"]], selector_meta_data="localisation_model", index="lda")

        for model in models:
            N["location_prediction_models_broadcasted"][house, model] = hs.factors.stream_broadcaster_from_stream(
                source=N["location_prediction_lda"], func=lambda x: x.last())

        for wearable in wearables:
            N["predicted_locations_broadcasted"][house][wearable] = sp.factors.localisation_model_predict(
                sources=[N['location_prediction_models_broadcasted'], N["rss_per_uid_2s"]])

    return w
Clone this wiki locally