Skip to content

Commit

Permalink
Generalise aggregation & processing of samples
Browse files Browse the repository at this point in the history
  • Loading branch information
ricklupton committed Nov 3, 2017
1 parent 6985323 commit 339630e
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 67 deletions.
2 changes: 2 additions & 0 deletions sankeyview/graph_to_sankey.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def get_data(data, key):
get_value = lambda data, key: float(get_data(data, key))
elif sample == 'mean':
get_value = lambda data, key: get_data(data, key).mean()
elif callable(sample):
get_value = lambda data, key: sample(get_data(data, key))
else:
get_value = lambda data, key: get_data(data, key)[sample]

Expand Down
37 changes: 15 additions & 22 deletions sankeyview/results_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,21 @@ def group_flows(flows,
measure,
agg_measures=None):

if agg_measures is None:
agg_measures = {}
agg_all_measures = dict(agg_measures)
agg_all_measures[measure] = 'sum'
if callable(measure):
data = measure
elif isinstance(measure, str):
if agg_measures is None:
agg_measures = {}
agg_all_measures = dict(agg_measures)
agg_all_measures[measure] = 'sum'
def data(group):
agg = group.groupby(lambda x: '').agg(agg_all_measures)
return {
'value': agg[measure].iloc[0],
'measures': {k: agg[k].iloc[0] for k in agg_measures},
}
else:
raise ValueError('measure must be string or callable')

e = flows.copy()
set_partition_keys(e, partition1, 'k1', v + '^', process_side='source')
Expand All @@ -106,24 +117,6 @@ def group_flows(flows,
set_partition_keys(e, time_partition, 'k4', '')
grouped = e.groupby(['k1', 'k2', 'k3', 'k4'])

if 'sample' in flows:

def data(group):
agg = group.groupby('sample').agg(agg_all_measures)
d = {'value': agg[measure].values}
if agg_measures:
d['measures'] = {k: agg[k].values for k in agg_measures}
return d

else:

def data(group):
agg = group.groupby(lambda x: '').agg(agg_all_measures)
d = {'value': agg[measure].iloc[0]}
if agg_measures:
d['measures'] = {k: agg[k].iloc[0] for k in agg_measures}
return d

return [
(source, target, (material, time), data(group))
for (source, target, material, time), group in grouped
Expand Down
82 changes: 55 additions & 27 deletions test/test_results_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,21 @@ def test_results_graph_overall():
'title': 'n'}),
]
assert sorted(Gr.edges(keys=True, data=True)) == [
('a^*', 'via^m', ('m', '*'), {'value': 3,
('a^*', 'via^m', ('m', '*'), {'value': 3, 'measures': {},
'bundles': [0]}),
('a^*', 'via^n', ('n', '*'), {'value': 1,
('a^*', 'via^n', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [0]}),
('b^*', 'via^m', ('m', '*'), {'value': 3,
('b^*', 'via^m', ('m', '*'), {'value': 3, 'measures': {},
'bundles': [1]}),
('b^*', 'via^n', ('n', '*'), {'value': 1,
('b^*', 'via^n', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [1]}),
('via^m', 'c^c1', ('m', '*'), {'value': 4,
('via^m', 'c^c1', ('m', '*'), {'value': 4, 'measures': {},
'bundles': [0, 1]}),
('via^m', 'c^c2', ('m', '*'), {'value': 2,
('via^m', 'c^c2', ('m', '*'), {'value': 2, 'measures': {},
'bundles': [0, 1]}),
('via^n', 'c^c1', ('n', '*'), {'value': 1,
('via^n', 'c^c1', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [0, 1]}),
('via^n', 'c^c2', ('n', '*'), {'value': 1,
('via^n', 'c^c2', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [0, 1]}),
]

Expand Down Expand Up @@ -138,9 +138,9 @@ def test_results_graph_time_partition():
bundle_flows,
time_partition=time_partition)
assert sorted(Gr.edges(keys=True, data=True)) == [
('a^*', 'b^*', ('*', '1'), {'value': 6,
('a^*', 'b^*', ('*', '1'), {'value': 6, 'measures': {},
'bundles': [0]}),
('a^*', 'b^*', ('*', '2'), {'value': 4,
('a^*', 'b^*', ('*', '2'), {'value': 4, 'measures': {},
'bundles': [0]}),
]

Expand All @@ -149,13 +149,13 @@ def test_results_graph_time_partition():
Gr, groups = results_graph(view_graph, bundle_flows, material_partition,
time_partition)
assert sorted(Gr.edges(keys=True, data=True)) == [
('a^*', 'b^*', ('m', '1'), {'value': 3,
('a^*', 'b^*', ('m', '1'), {'value': 3, 'measures': {},
'bundles': [0]}),
('a^*', 'b^*', ('m', '2'), {'value': 1,
('a^*', 'b^*', ('m', '2'), {'value': 1, 'measures': {},
'bundles': [0]}),
('a^*', 'b^*', ('n', '1'), {'value': 3,
('a^*', 'b^*', ('n', '1'), {'value': 3, 'measures': {},
'bundles': [0]}),
('a^*', 'b^*', ('n', '2'), {'value': 3,
('a^*', 'b^*', ('n', '2'), {'value': 3, 'measures': {},
'bundles': [0]}),
]

Expand Down Expand Up @@ -183,17 +183,17 @@ def test_results_graph_material_key():
view_graph.edge['a']['c']['flow_partition'] = material_partition
Gr, groups = results_graph(view_graph, bundle_flows)
assert sorted(Gr.edges(keys=True, data=True)) == [
('a^*', 'c^*', ('m', '*'), {'value': 3,
('a^*', 'c^*', ('m', '*'), {'value': 3, 'measures': {},
'bundles': [0]}),
('a^*', 'c^*', ('n', '*'), {'value': 1,
('a^*', 'c^*', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [0]}),
]

# Partition based on shape
view_graph.edge['a']['c']['flow_partition'] = shape_partition
Gr, groups = results_graph(view_graph, bundle_flows)
assert sorted(Gr.edges(keys=True, data=True)) == [
('a^*', 'c^*', ('long', '*'), {'value': 4,
('a^*', 'c^*', ('long', '*'), {'value': 4, 'measures': {},
'bundles': [0]}),
]

Expand All @@ -214,7 +214,7 @@ def test_results_graph_measures():
# Results assuming measure = 'value'
Gr, groups = results_graph(view_graph, bundle_flows)
assert Gr.edges(keys=True, data=True) == [
('a^*', 'b^*', ('*', '*'), {'value': 11,
('a^*', 'b^*', ('*', '*'), {'value': 11, 'measures': {},
'bundles': [0]}),
]

Expand All @@ -223,7 +223,7 @@ def test_results_graph_measures():
bundle_flows,
measure='another_measure')
assert Gr.edges(keys=True, data=True) == [
('a^*', 'b^*', ('*', '*'), {'value': 3,
('a^*', 'b^*', ('*', '*'), {'value': 3, 'measures': {},
'bundles': [0]}),
]

Expand All @@ -239,9 +239,10 @@ def test_results_graph_measures():


def test_results_graph_samples():
view_graph = _twonode_viewgraph()
view_graph = _threenode_viewgraph()

# Mock flow data with multiple samples
# Mock flow data with multiple samples. NB missing data for sample 1 in
# bundle 1.
bundle_flows = {
0: pd.DataFrame.from_records(
[
Expand All @@ -251,12 +252,25 @@ def test_results_graph_samples():
('a', 'b2', 'm', 1, 1),
],
columns=('source', 'target', 'material', 'sample', 'value')),
1: pd.DataFrame.from_records(
[
('a', 'c1', 'm', 1, 3),
],
columns=('source', 'target', 'material', 'sample', 'value')),
}

# Aggregation function
index = pd.Index(np.arange(2))
def measure(group):
agg = group.groupby('sample').value.agg('sum')
d = {'value': agg.reindex(index).fillna(0).values}
return d

# Results
Gr, groups = results_graph(view_graph, bundle_flows)
assert len(Gr.edges()) == 1
Gr, groups = results_graph(view_graph, bundle_flows, measure=measure)
assert len(Gr.edges()) == 2
assert np.allclose(Gr['a^*']['b^*']['*', '*']['value'], [3, 4])
assert np.allclose(Gr['a^*']['c^*']['*', '*']['value'], [0, 3])


def test_results_graph_unused_nodes():
Expand Down Expand Up @@ -284,9 +298,9 @@ def test_results_graph_unused_nodes():

assert set(Gr.nodes()) == {'a^a1', 'a^a2', 'b^b1'}
assert sorted(Gr.edges(keys=True, data=True)) == [
('a^a1', 'b^b1', ('*', '*'), {'value': 3,
('a^a1', 'b^b1', ('*', '*'), {'value': 3, 'measures': {},
'bundles': [0]}),
('a^a2', 'b^b1', ('*', '*'), {'value': 1,
('a^a2', 'b^b1', ('*', '*'), {'value': 1, 'measures': {},
'bundles': [0]}),
]

Expand Down Expand Up @@ -321,9 +335,9 @@ def test_results_graph_with_extra_or_not_enough_groups():

assert set(Gr.nodes()) == {'a^a1', 'a^_', 'b^b1'}
assert sorted(Gr.edges(keys=True, data=True)) == [
('a^_', 'b^b1', ('*', '*'), {'value': 1,
('a^_', 'b^b1', ('*', '*'), {'value': 1, 'measures': {},
'bundles': [0]}),
('a^a1', 'b^b1', ('*', '*'), {'value': 3,
('a^a1', 'b^b1', ('*', '*'), {'value': 3, 'measures': {},
'bundles': [0]}),
]

Expand Down Expand Up @@ -376,3 +390,17 @@ def _twonode_viewgraph():
[['b']],
])
return view_graph


def _threenode_viewgraph():
view_graph = LayeredGraph()
view_graph.add_node('a', node=ProcessGroup())
view_graph.add_node('b', node=ProcessGroup())
view_graph.add_node('c', node=ProcessGroup())
view_graph.add_edge('a', 'b', {'bundles': [0]})
view_graph.add_edge('a', 'c', {'bundles': [1]})
view_graph.ordering = Ordering([
[['a']],
[['b', 'c']],
])
return view_graph
36 changes: 18 additions & 18 deletions test/test_sankey_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ def test_sankey_view_results():

assert set(GR.nodes()) == {'a^*', 'b^*', 'via^m', 'via^n', 'c^c1', 'c^c2'}
assert sorted(GR.edges(keys=True, data=True)) == [
('a^*', 'via^m', ('*', '*'), {'value': 3,
('a^*', 'via^m', ('*', '*'), {'value': 3, 'measures': {},
'bundles': [0]}),
('a^*', 'via^n', ('*', '*'), {'value': 1,
('a^*', 'via^n', ('*', '*'), {'value': 1, 'measures': {},
'bundles': [0]}),
('b^*', 'via^m', ('*', '*'), {'value': 3,
('b^*', 'via^m', ('*', '*'), {'value': 3, 'measures': {},
'bundles': [1]}),
('b^*', 'via^n', ('*', '*'), {'value': 1,
('b^*', 'via^n', ('*', '*'), {'value': 1, 'measures': {},
'bundles': [1]}),
('via^m', 'c^c1', ('*', '*'), {'value': 4,
('via^m', 'c^c1', ('*', '*'), {'value': 4, 'measures': {},
'bundles': [0, 1]}),
('via^m', 'c^c2', ('*', '*'), {'value': 2,
('via^m', 'c^c2', ('*', '*'), {'value': 2, 'measures': {},
'bundles': [0, 1]}),
('via^n', 'c^c1', ('*', '*'), {'value': 1,
('via^n', 'c^c1', ('*', '*'), {'value': 1, 'measures': {},
'bundles': [0, 1]}),
('via^n', 'c^c2', ('*', '*'), {'value': 1,
('via^n', 'c^c2', ('*', '*'), {'value': 1, 'measures': {},
'bundles': [0, 1]}),
]

Expand Down Expand Up @@ -90,21 +90,21 @@ def test_sankey_view_results():
flow_partition=Partition.Simple('material', ['m', 'n']))
GR, groups = sankey_view(vd2, dataset)
assert sorted(GR.edges(keys=True, data=True)) == [
('a^*', 'via^m', ('m', '*'), {'value': 3,
('a^*', 'via^m', ('m', '*'), {'value': 3, 'measures': {},
'bundles': [0]}),
('a^*', 'via^n', ('n', '*'), {'value': 1,
('a^*', 'via^n', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [0]}),
('b^*', 'via^m', ('m', '*'), {'value': 3,
('b^*', 'via^m', ('m', '*'), {'value': 3, 'measures': {},
'bundles': [1]}),
('b^*', 'via^n', ('n', '*'), {'value': 1,
('b^*', 'via^n', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [1]}),
('via^m', 'c^c1', ('m', '*'), {'value': 4,
('via^m', 'c^c1', ('m', '*'), {'value': 4, 'measures': {},
'bundles': [0, 1]}),
('via^m', 'c^c2', ('m', '*'), {'value': 2,
('via^m', 'c^c2', ('m', '*'), {'value': 2, 'measures': {},
'bundles': [0, 1]}),
('via^n', 'c^c1', ('n', '*'), {'value': 1,
('via^n', 'c^c1', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [0, 1]}),
('via^n', 'c^c2', ('n', '*'), {'value': 1,
('via^n', 'c^c2', ('n', '*'), {'value': 1, 'measures': {},
'bundles': [0, 1]}),
]

Expand Down Expand Up @@ -134,9 +134,9 @@ def test_sankey_view_results_time_partition():
GR, groups = sankey_view(vd, dataset)
assert set(GR.nodes()) == {'a^*', 'b^*'}
assert sorted(GR.edges(keys=True, data=True)) == [
('a^*', 'b^*', ('*', '1'), {'value': 3,
('a^*', 'b^*', ('*', '1'), {'value': 3, 'measures': {},
'bundles': [0]}),
('a^*', 'b^*', ('*', '2'), {'value': 2,
('a^*', 'b^*', ('*', '2'), {'value': 2, 'measures': {},
'bundles': [0]}),
]
assert GR.ordering == Ordering([[['a^*']], [['b^*']]])

0 comments on commit 339630e

Please sign in to comment.