From 0382abf8fb0895cdefaa264aeb26006557ee755d Mon Sep 17 00:00:00 2001 From: TeachMeTW Date: Tue, 5 Nov 2024 21:27:23 -0800 Subject: [PATCH] Time Filter --- .../dwell_segmentation_time_filter.py | 254 ++++++++++++------ 1 file changed, 165 insertions(+), 89 deletions(-) diff --git a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py index d75760cd9..4364cdeaf 100644 --- a/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py +++ b/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py @@ -13,6 +13,7 @@ import numpy as np import pandas as pd import datetime as pydt +import time # Our imports import emission.analysis.point_features as pf @@ -20,6 +21,9 @@ import emission.core.wrapper.location as ecwl import emission.analysis.intake.segmentation.restart_checking as eaisr +import emission.storage.decorations.stats_queries as esds +import emission.core.timer as ect +import emission.core.wrapper.pipelinestate as ecwp class DwellSegmentationTimeFilter(eaist.TripSegmentationMethod): def __init__(self, time_threshold, point_threshold, distance_threshold): @@ -61,14 +65,41 @@ def segment_into_trips(self, timeseries, time_query): data that they want from the sensor streams in order to determine the segmentation points. """ - filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) - # Sometimes, we can get bogus points because data.ts and - # metadata.write_ts are off by a lot. If we don't do this, we end up - # appearing to travel back in time - # https://github.com/e-mission/e-mission-server/issues/457 - filtered_points_df = filtered_points_pre_ts_diff_df[(filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000] - filtered_points_df.reset_index(inplace=True) - transition_df = timeseries.get_data_df("statemachine/transition", time_query) + with ect.Timer() as t_get_filtered_points_pre: + filtered_points_pre_ts_diff_df = timeseries.get_data_df("background/filtered_location", time_query) + user_id = filtered_points_pre_ts_diff_df["user_id"].iloc[0] + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/get_filtered_points_pre_ts_diff_df", + time.time(), + t_get_filtered_points_pre.elapsed + ) + + with ect.Timer() as t_filter_bogus_points: + # Sometimes, we can get bogus points because data.ts and + # metadata.write_ts are off by a lot. If we don't do this, we end up + # appearing to travel back in time + # https://github.com/e-mission/e-mission-server/issues/457 + filtered_points_df = filtered_points_pre_ts_diff_df[ + (filtered_points_pre_ts_diff_df.metadata_write_ts - filtered_points_pre_ts_diff_df.ts) < 1000 + ] + filtered_points_df.reset_index(inplace=True) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/filter_bogus_points", + time.time(), + t_filter_bogus_points.elapsed + ) + + with ect.Timer() as t_get_transition_df: + transition_df = timeseries.get_data_df("statemachine/transition", time_query) + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/get_transition_df", + time.time(), + t_get_transition_df.elapsed + ) + if len(transition_df) > 0: logging.debug("transition_df = %s" % transition_df[["fmt_time", "transition"]]) else: @@ -83,88 +114,133 @@ def segment_into_trips(self, timeseries, time_query): curr_trip_start_point = None just_ended = True prevPoint = None - for idx, row in filtered_points_df.iterrows(): - currPoint = ad.AttrDict(row) - currPoint.update({"idx": idx}) - logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30) - if curr_trip_start_point is None: - logging.debug("Appending currPoint because the current start point is None") - # segmentation_points.append(currPoint) - - if just_ended: - if self.continue_just_ended(idx, currPoint, filtered_points_df): - # We have "processed" the currPoint by deciding to glom it - self.last_ts_processed = currPoint.metadata_write_ts - continue - # else: - sel_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) - curr_trip_start_point = sel_point - just_ended = False - - last5MinsPoints_df = filtered_points_df[np.logical_and( - np.logical_and( - filtered_points_df.ts > currPoint.ts - self.time_threshold, - filtered_points_df.ts < currPoint.ts), - filtered_points_df.ts >= curr_trip_start_point.ts)] - # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. - # Using .iloc just ends up including points after this one. - # So we reset_index upstream and use it here. - # We are going to use the last 8 points for now. - # TODO: Change this back to last 10 points once we normalize phone and this - last10Points_df = filtered_points_df.iloc[max(idx-self.point_threshold, curr_trip_start_point.idx):idx+1] - distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) - timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts - last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) - logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) - last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) - logging.debug("last10PointsDistances = %s with length %d, shape %s" % (last10PointsDistances.to_numpy(), - len(last10PointsDistances), - last10PointsDistances.shape)) - - # Fix for https://github.com/e-mission/e-mission-server/issues/348 - last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) - - logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % - (len(last10PointsDistances), len(last5MinsDistances))) - logging.debug("last5MinsTimes.max() = %s, time_threshold = %s" % - (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) - - if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): - (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, - last10Points_df, last5MinsPoints_df) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts - if ended_before_this: - # in this case, we end a trip at the previous point, and the next trip starts at this - # point, not the next one + + with ect.Timer() as t_loop: + for idx, row in filtered_points_df.iterrows(): + currPoint = ad.AttrDict(row) + currPoint.update({"idx": idx}) + logging.debug("-" * 30 + str(currPoint.fmt_time) + "-" * 30) + if curr_trip_start_point is None: + logging.debug("Appending currPoint because the current start point is None") + # segmentation_points.append(currPoint) + + if just_ended: + if self.continue_just_ended(idx, currPoint, filtered_points_df): + # We have "processed" the currPoint by deciding to glom it + self.last_ts_processed = currPoint.metadata_write_ts + continue + # else: + sel_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % (sel_point, sel_point.idx)) + curr_trip_start_point = sel_point just_ended = False - prevPoint = currPoint - curr_trip_start_point = currPoint - logging.debug("Setting new trip start point %s with idx %s" % - (currPoint, currPoint.idx)) - else: - # We end a trip at the current point, and the next trip starts at the next point - just_ended = True - prevPoint = None - else: - prevPoint = currPoint - - logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % - (just_ended, len(transition_df))) - if not just_ended and len(transition_df) > 0: - stopped_moving_after_last = transition_df[(transition_df.ts > currPoint.ts) & (transition_df.transition == 2)] - logging.debug("looking after %s, found transitions %s" % - (currPoint.ts, stopped_moving_after_last)) - if len(stopped_moving_after_last) > 0: - (unused, last_trip_end_point) = self.get_last_trip_end_point(filtered_points_df, - last10Points_df, None) - segmentation_points.append((curr_trip_start_point, last_trip_end_point)) - logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) - # We have processed everything up to the trip end by marking it as a completed trip - self.last_ts_processed = currPoint.metadata_write_ts + + with ect.Timer() as t_calculations: + last5MinsPoints_df = filtered_points_df[np.logical_and( + np.logical_and( + filtered_points_df.ts > currPoint.ts - self.time_threshold, + filtered_points_df.ts < currPoint.ts + ), + filtered_points_df.ts >= curr_trip_start_point.ts + )] + # Using .loc here causes problems if we have filtered out some points and so the index is non-consecutive. + # Using .iloc just ends up including points after this one. + # So we reset_index upstream and use it here. + # We are going to use the last 8 points for now. + # TODO: Change this back to last 10 points once we normalize phone and this + last10Points_df = filtered_points_df.iloc[ + max(idx - self.point_threshold, curr_trip_start_point.idx):idx + 1 + ] + distanceToLast = lambda row: pf.calDistance(ad.AttrDict(row), currPoint) + timeToLast = lambda row: currPoint.ts - ad.AttrDict(row).ts + last5MinsDistances = last5MinsPoints_df.apply(distanceToLast, axis=1) + logging.debug("last5MinsDistances = %s with length %d" % (last5MinsDistances.to_numpy(), len(last5MinsDistances))) + last10PointsDistances = last10Points_df.apply(distanceToLast, axis=1) + logging.debug("last10PointsDistances = %s with length %d, shape %s" % ( + last10PointsDistances.to_numpy(), + len(last10PointsDistances), + last10PointsDistances.shape + )) + + # Fix for https://github.com/e-mission/e-mission-server/issues/348 + last5MinTimes = last5MinsPoints_df.apply(timeToLast, axis=1) + + logging.debug("len(last10PointsDistances) = %d, len(last5MinsDistances) = %d" % + (len(last10PointsDistances), len(last5MinsDistances))) + logging.debug("last5MinTimes.max() = %s, time_threshold = %s" % + (last5MinTimes.max() if len(last5MinTimes) > 0 else np.NaN, self.time_threshold)) + + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/calculations_per_iteration", + time.time(), + t_calculations.elapsed + ) + + with ect.Timer() as t_has_trip_ended: + if self.has_trip_ended(prevPoint, currPoint, timeseries, last10PointsDistances, last5MinsDistances, last5MinTimes): + (ended_before_this, last_trip_end_point) = self.get_last_trip_end_point( + filtered_points_df, + last10Points_df, + last5MinsPoints_df + ) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.info("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + if ended_before_this: + # in this case, we end a trip at the previous point, and the next trip starts at this + # point, not the next one + just_ended = False + prevPoint = currPoint + curr_trip_start_point = currPoint + logging.debug("Setting new trip start point %s with idx %s" % + (currPoint, currPoint.idx)) + else: + # We end a trip at the current point, and the next trip starts at the next point + just_ended = True + prevPoint = None + else: + prevPoint = currPoint + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/has_trip_ended", + time.time(), + t_has_trip_ended.elapsed + ) + + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/loop", + time.time(), + t_loop.elapsed + ) + + with ect.Timer() as t_post_loop: + logging.debug("Iterated over all points, just_ended = %s, len(transition_df) = %s" % + (just_ended, len(transition_df))) + if not just_ended and len(transition_df) > 0: + stopped_moving_after_last = transition_df[ + (transition_df.ts > currPoint.ts) & (transition_df.transition == 2) + ] + logging.debug("looking after %s, found transitions %s" % + (currPoint.ts, stopped_moving_after_last)) + if len(stopped_moving_after_last) > 0: + (unused, last_trip_end_point) = self.get_last_trip_end_point( + filtered_points_df, + last10Points_df, + None + ) + segmentation_points.append((curr_trip_start_point, last_trip_end_point)) + logging.debug("Found trip end at %s" % last_trip_end_point.fmt_time) + # We have processed everything up to the trip end by marking it as a completed trip + self.last_ts_processed = currPoint.metadata_write_ts + esds.store_pipeline_time( + user_id, + ecwp.PipelineStages.TRIP_SEGMENTATION.name + "/segment_into_trips/post_loop", + time.time(), + t_post_loop.elapsed + ) return segmentation_points