Skip to content
This repository has been archived by the owner on Aug 15, 2018. It is now read-only.

Commit

Permalink
Update ingestion of data into distributed tables
Browse files Browse the repository at this point in the history
  • Loading branch information
hackermd committed Jul 25, 2017
1 parent 579a6b1 commit c05a3c9
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
9 changes: 5 additions & 4 deletions tmserver/api/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ def delete_feature(experiment_id, feature_id):
"""
logger.info('delete feature %d of experiment %d', feature_id, experiment_id)
with tm.utils.ExperimentConnection(experiment_id) as connection:
tm.Feature.delete_cascade(connection, id=feature_id)
with tm.utils.ExperimentSession(experiment_id, False) as session:
session.query(tm.FeatureValue.values.delete(str(feature_id)))
session.query(tm.Feature).filter_by(id=feature_id).delete()
return jsonify(message='ok')


Expand Down Expand Up @@ -237,7 +238,7 @@ def add_feature_values(experiment_id, mapobject_type_id):
if len(segmentations) == 0:
raise ResourceNotFoundError(tm.MapobjectSegmentation)

with tm.utils.ExperimentConnection(experiment_id) as connection:
with tm.utils.ExperimentSession(experiment_id, False) as session:
feature_values = list()
for mapobject_id, label in segmentations:
try:
Expand All @@ -250,7 +251,7 @@ def add_feature_values(experiment_id, mapobject_type_id):
tm.MapobjectSegmentation, label=label
)
feature_values.append(values)
tm.FeatureValues.add_multiple(connection, feature_values)
session.bulk_ingest(feature_values)

return jsonify(message='ok')

Expand Down
20 changes: 13 additions & 7 deletions tmserver/api/mapobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,13 @@ def delete_mapobject_type(experiment_id, mapobject_type_id):
'delete mapobject type %d of experiment %d',
mapobject_type_id, experiment_id
)
with tm.utils.ExperimentConnection(experiment_id) as connection:
tm.MapobjectType.delete_cascade(connection, id=mapobject_type_id)
with tm.utils.ExperimentSession(experiment_id, False) as session:
session.query(tm.Mapobject).\
filter_by(mapobject_type_id=mapobject_type_id).\
delete()
session.query(tm.MapobjectType).\
filter_by(id=mapobject_type_id).\
delete()
return jsonify(message='ok')


Expand Down Expand Up @@ -540,7 +545,7 @@ def add_segmentations(experiment_id, mapobject_type_id):
all()
)

with tm.utils.ExperimentConnection(experiment_id) as connection:
with tm.utils.ExperimentSession(experiment_id, False) as session:
segmentations = list()
for label, polygon in image.extract_polygons(y_offset, x_offset):
mapobject = tm.Mapobject(site_id, mapobject_type_id)
Expand All @@ -551,14 +556,15 @@ def add_segmentations(experiment_id, mapobject_type_id):
# exist, however. This will lead to an error upon insertion.
mapobject.id = existing_segmentations_map[label]
else:
mapobject = tm.Mapobject.add(connection, mapobject)
seg = tm.MapobjectSegmentation(
session.add(mapobject)
session.flush()
s = tm.MapobjectSegmentation(
partition_key=site_id, mapobject_id=mapobject.id,
geom_polygon=polygon, geom_centroid=polygon.centroid,
segmentation_layer_id=segmentation_layer_id, label=label
)
segmentations.append(seg)
tm.MapobjectSegmentation.add_multiple(connection, segmentations)
segmentations.append(s)
session.bulk_ingest(segmentations)

return jsonify(message='ok')

Expand Down

0 comments on commit c05a3c9

Please sign in to comment.