From c7e5b721288b85e1dfbb729ea9e2c7c9be963527 Mon Sep 17 00:00:00 2001 From: khuzema786 Date: Sun, 10 Sep 2023 16:38:59 +0530 Subject: [PATCH] Perf : Decreased API Latency for Update Location and Refactoring --- Cargo.lock | 1 + LocationTrackingMetrics.json | 570 ++++++++++++++++++ crates/location_tracking_service/Cargo.toml | 1 + .../src/common/types.rs | 129 ---- .../src/domain/action/internal/driver.rs | 2 +- .../src/domain/action/internal/location.rs | 1 + .../src/domain/action/internal/ride.rs | 1 + .../src/domain/action/ui/location.rs | 295 +++++---- .../src/domain/api/internal/driver.rs | 2 +- .../src/domain/api/internal/location.rs | 2 +- .../src/domain/api/internal/ride.rs | 1 + .../src/domain/api/ui/healthcheck.rs | 3 +- .../src/domain/api/ui/location.rs | 1 + .../src/environment.rs | 284 +++++++++ crates/location_tracking_service/src/lib.rs | 2 + crates/location_tracking_service/src/main.rs | 206 +------ .../src/middleware.rs | 88 +++ .../src/redis/commands.rs | 83 ++- dhall_config/location_tracking_service.dhall | 3 +- 19 files changed, 1183 insertions(+), 492 deletions(-) create mode 100644 LocationTrackingMetrics.json create mode 100644 crates/location_tracking_service/src/environment.rs create mode 100644 crates/location_tracking_service/src/middleware.rs diff --git a/Cargo.lock b/Cargo.lock index 9b79e19..4bddd81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1418,6 +1418,7 @@ dependencies = [ "geo", "geojson", "log", + "once_cell", "rand", "rdkafka", "reqwest", diff --git a/LocationTrackingMetrics.json b/LocationTrackingMetrics.json new file mode 100644 index 0000000..a974f00 --- /dev/null +++ b/LocationTrackingMetrics.json @@ -0,0 +1,570 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 10, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 7, + "options": { + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showThresholdLabels": false, + "showThresholdMarkers": true + }, + "pluginVersion": "10.1.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "editorMode": "code", + "expr": "queue_guage{job=\"location-tracking-service\"}", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Queued Driver Locations / 5m", + "transparent": true, + "type": "gauge" + }, + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 3, + "x": 6, + "y": 0 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "editorMode": "code", + "expr": "sum(increase(incoming_api_count{job=\"location-tracking-service\", status=~\"2..\"}[5m]))", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "2xx / 5m", + "transparent": true, + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 4, + "x": 9, + "y": 0 + }, + "id": 5, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "editorMode": "code", + "expr": "sum(increase(incoming_api_count{job=\"location-tracking-service\", status=~\"5..\"}[5m]))", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "5xx / 5m", + "transparent": true, + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 3, + "x": 13, + "y": 0 + }, + "id": 6, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "editorMode": "code", + "expr": "sum(increase(incoming_api_count{job=\"location-tracking-service\", status=~\"4..\"}[5m]))", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "4xx / 5m", + "transparent": true, + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 3, + "x": 16, + "y": 0 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "center", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "editorMode": "code", + "expr": "sum(increase(incoming_api_count{job=\"location-tracking-service\", status=~\"3..\"}[5m]))", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "3xx / 5m", + "transparent": true, + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 18, + "x": 6, + "y": 5 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum(rate(incoming_api_bucket{job=\"location-tracking-service\", le=\"0.5\"}[5m])) by (method, endpoint))", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "TP95 Response Latency", + "transparent": true, + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 10, + "w": 24, + "x": 0, + "y": 14 + }, + "id": 1, + "options": { + "legend": { + "calcs": [ + "min", + "mean", + "max" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "f895b74d-6b0d-4005-b120-2fbe346173d0" + }, + "editorMode": "code", + "expr": "sum(rate(incoming_api_count{job=\"location-tracking-service\"}[5m])) by (method, endpoint, status)", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Request Counts", + "transparent": true, + "type": "timeseries" + } + ], + "refresh": "", + "schemaVersion": 38, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Location Tracking Metrics", + "uid": "cabcb144-7134-49fe-a149-8d867bc039a7", + "version": 3, + "weekStart": "" +} \ No newline at end of file diff --git a/crates/location_tracking_service/Cargo.toml b/crates/location_tracking_service/Cargo.toml index c31f4a4..a801fca 100644 --- a/crates/location_tracking_service/Cargo.toml +++ b/crates/location_tracking_service/Cargo.toml @@ -11,6 +11,7 @@ path = "src/main.rs" [dependencies] actix-web = "4.3.1" +once_cell = "1.17.1" serde = { version = "1.0.167", features = ["derive"] } serde_json = "1.0.100" serde_dhall = "0.12.1" diff --git a/crates/location_tracking_service/src/common/types.rs b/crates/location_tracking_service/src/common/types.rs index cce3893..23e57dc 100644 --- a/crates/location_tracking_service/src/common/types.rs +++ b/crates/location_tracking_service/src/common/types.rs @@ -7,18 +7,8 @@ */ use chrono::{DateTime, Utc}; use geo::MultiPolygon; -use rdkafka::producer::FutureProducer; use serde::{Deserialize, Serialize}; -use shared::redis::types::RedisConnectionPool; -use shared::tools::error::AppError; - -use std::{ - collections::HashMap, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; use strum_macros::{Display, EnumIter, EnumString}; -use tokio::sync::Mutex; pub type RideId = String; pub type DriverId = String; @@ -146,122 +136,3 @@ pub struct DriverAllDetails { pub driver_last_known_location: Option, pub driver_mode: Option, } - -#[derive(Clone)] -pub struct AppState { - pub non_persistent_redis: Arc, - pub persistent_redis: Arc, - pub drainer_delay: u64, - #[allow(clippy::type_complexity)] - pub queue: Arc>>>, - pub polygon: Vec, - pub auth_url: String, - pub auth_api_key: String, - pub bulk_location_callback_url: String, - pub auth_token_expiry: u32, - pub redis_expiry: u32, - pub min_location_accuracy: i32, - pub last_location_timstamp_expiry: u32, - pub location_update_limit: usize, - pub location_update_interval: u64, - pub producer: Option, - pub driver_location_update_topic: String, - pub driver_location_update_key: String, - pub batch_size: i64, - pub bucket_size: u64, - pub nearby_bucket_threshold: u64, -} - -impl AppState { - pub async fn sliding_window_limiter( - &self, - key: &str, - frame_hits_lim: usize, - frame_len: u32, - persistent_redis_pool: &RedisConnectionPool, - ) -> Result, AppError> { - let curr_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() as i64; - - let hits = persistent_redis_pool.get_key(key).await?; - - let hits = match hits { - Some(hits) => serde_json::from_str::>(&hits).map_err(|_| { - AppError::InternalError("Failed to parse hits from redis.".to_string()) - })?, - None => vec![], - }; - let (filt_hits, ret) = - Self::sliding_window_limiter_pure(curr_time, &hits, frame_hits_lim, frame_len); - - if !ret { - return Err(AppError::HitsLimitExceeded); - } - - let _ = persistent_redis_pool - .set_with_expiry( - key, - serde_json::to_string(&filt_hits).expect("Failed to parse filt_hits to string."), - frame_len, - ) - .await; - - Ok(filt_hits) - } - - fn sliding_window_limiter_pure( - curr_time: i64, - hits: &[i64], - frame_hits_lim: usize, - frame_len: u32, - ) -> (Vec, bool) { - let curr_frame = Self::get_time_frame(curr_time, frame_len); - let filt_hits = hits - .iter() - .filter(|&&hit| Self::hits_filter(curr_frame, hit)) - .cloned() - .collect::>(); - let prev_frame_hits_len = filt_hits - .iter() - .filter(|&&hit| Self::prev_frame_hits_filter(curr_frame, hit)) - .count(); - let prev_frame_weight = 1.0 - (curr_time as f64 % frame_len as f64) / frame_len as f64; - let curr_frame_hits_len: i32 = filt_hits - .iter() - .filter(|&&hit| Self::curr_frame_hits_filter(curr_frame, hit)) - .count() as i32; - - let res = (prev_frame_hits_len as f64 * prev_frame_weight) as i32 + curr_frame_hits_len - < frame_hits_lim as i32; - - ( - if res { - let mut new_hits = Vec::with_capacity(filt_hits.len() + 1); - new_hits.push(curr_frame); - new_hits.extend(filt_hits); - new_hits - } else { - filt_hits.clone() - }, - res, - ) - } - - fn get_time_frame(time: i64, frame_len: u32) -> i64 { - time / frame_len as i64 - } - - fn hits_filter(curr_frame: i64, time_frame: i64) -> bool { - time_frame == curr_frame - 1 || time_frame == curr_frame - } - - fn prev_frame_hits_filter(curr_frame: i64, time_frame: i64) -> bool { - time_frame == curr_frame - 1 - } - - fn curr_frame_hits_filter(curr_frame: i64, time_frame: i64) -> bool { - time_frame == curr_frame - } -} diff --git a/crates/location_tracking_service/src/domain/action/internal/driver.rs b/crates/location_tracking_service/src/domain/action/internal/driver.rs index 821e0ff..5317936 100644 --- a/crates/location_tracking_service/src/domain/action/internal/driver.rs +++ b/crates/location_tracking_service/src/domain/action/internal/driver.rs @@ -8,7 +8,7 @@ use actix_web::web::Data; use shared::tools::error::AppError; -use crate::{common::types::*, redis::commands::set_driver_mode_details}; +use crate::{common::types::*, environment::AppState, redis::commands::set_driver_mode_details}; pub async fn driver_details( data: Data, diff --git a/crates/location_tracking_service/src/domain/action/internal/location.rs b/crates/location_tracking_service/src/domain/action/internal/location.rs index 7951fc0..f94d2ca 100644 --- a/crates/location_tracking_service/src/domain/action/internal/location.rs +++ b/crates/location_tracking_service/src/domain/action/internal/location.rs @@ -15,6 +15,7 @@ use crate::{ utils::{get_city, get_current_bucket}, }, domain::types::internal::location::*, + environment::AppState, redis::commands::*, }; use shared::{tools::error::AppError, utils::logger::*}; diff --git a/crates/location_tracking_service/src/domain/action/internal/ride.rs b/crates/location_tracking_service/src/domain/action/internal/ride.rs index 262a946..d3464f0 100644 --- a/crates/location_tracking_service/src/domain/action/internal/ride.rs +++ b/crates/location_tracking_service/src/domain/action/internal/ride.rs @@ -5,6 +5,7 @@ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ +use crate::environment::AppState; use crate::redis::commands::*; use crate::{ diff --git a/crates/location_tracking_service/src/domain/action/ui/location.rs b/crates/location_tracking_service/src/domain/action/ui/location.rs index 1a6c5b0..2fb5e35 100644 --- a/crates/location_tracking_service/src/domain/action/ui/location.rs +++ b/crates/location_tracking_service/src/domain/action/ui/location.rs @@ -5,10 +5,11 @@ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -use crate::common::kafka::push_to_kafka; -use crate::common::{types::*, utils::get_city}; +use crate::common::{kafka::push_to_kafka, types::*, utils::get_city}; use crate::domain::types::ui::location::*; +use crate::environment::AppState; use crate::redis::{commands::*, keys::*}; +use actix::Arbiter; use actix_web::web::Data; use chrono::Utc; @@ -17,7 +18,6 @@ use serde::{Deserialize, Serialize}; use shared::tools::error::AppError; use shared::utils::callapi::*; use shared::utils::logger::*; -use shared::utils::prometheus; #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] @@ -102,9 +102,10 @@ pub async fn update_driver_location( data.polygon.clone(), )?; - let driver_id = get_driver_id(data.clone(), &token) - .await? - .or(get_driver_id_from_authentication(data.clone(), &token, &merchant_id).await?); + let driver_id = match get_driver_id(data.clone(), &token).await? { + Some(driver_id) => Some(driver_id), + None => get_driver_id_from_authentication(data.clone(), &token, &merchant_id).await?, + }; if let Some(driver_id) = driver_id { let _ = data @@ -114,12 +115,11 @@ pub async fn update_driver_location( data.location_update_interval as u32, &data.persistent_redis, ) - .await?; + .await; - with_lock_redis( + Arbiter::current().spawn(with_lock_redis( data.persistent_redis.clone(), - driver_processing_location_update_lock_key(&merchant_id.clone(), &city.clone()) - .as_str(), + driver_processing_location_update_lock_key(&merchant_id.clone(), &city.clone()), 60, process_driver_locations, ( @@ -131,8 +131,7 @@ pub async fn update_driver_location( city.clone(), driver_mode.clone(), ), - ) - .await?; + )); Ok(APISuccess::default()) } else { @@ -152,7 +151,7 @@ async fn process_driver_locations( CityName, Option, ), -) -> Result<(), AppError> { +) -> () { let (data, mut locations, driver_id, merchant_id, vehicle_type, city, driver_mode) = args; locations.sort_by(|a, b| (a.ts).cmp(&b.ts)); @@ -173,14 +172,14 @@ async fn process_driver_locations( lon: driver_location.pt.lon, }; - set_driver_last_location_update( + let _ = set_driver_last_location_update( data.clone(), &driver_id, &merchant_id, &driver_location, driver_mode.clone(), ) - .await?; + .await; let locations: Vec = locations .clone() @@ -194,76 +193,124 @@ async fn process_driver_locations( locations.len() ); - match get_driver_ride_details(data.clone(), &driver_id, &merchant_id, &city).await { - Ok(RideDetails { - ride_id, - ride_status: RideStatus::INPROGRESS, - }) => { - if locations.len() > 100 { - error!( - "Way points more than 100 points {} on_ride: True", - locations.len() - ); - } - let mut geo_entries = Vec::new(); - let mut queue = data.queue.lock().await; - - for loc in locations { - geo_entries.push(Point { - lat: loc.pt.lat, - lon: loc.pt.lon, - }); - - let dimensions = Dimensions { - merchant_id: merchant_id.clone(), - city: city.clone(), - vehicle_type: vehicle_type.clone(), - on_ride: true, - }; - - prometheus::QUEUE_GUAGE.inc(); - - queue.entry(dimensions).or_insert_with(Vec::new).push(( - loc.pt.lat, - loc.pt.lon, - driver_id.clone(), - )); - - let _ = kafka_stream_updates( - data.clone(), - &merchant_id, - &ride_id, - loc, - Some(RideStatus::INPROGRESS), - driver_mode.clone(), - ) + let driver_ride_details = + get_driver_ride_details(data.clone(), &driver_id, &merchant_id, &city).await; + + if let Ok(Some(RideDetails { + ride_id, + ride_status, + })) = driver_ride_details + { + if ride_status == RideStatus::INPROGRESS { + process_on_ride_driver_location( + data.clone(), + merchant_id, + city, + vehicle_type, + ride_id, + driver_id, + driver_mode, + RideStatus::INPROGRESS, + locations, + ) + .await; + } else { + process_on_ride_driver_location( + data.clone(), + merchant_id, + city, + vehicle_type, + ride_id, + driver_id, + driver_mode, + RideStatus::INPROGRESS, + locations, + ) + .await; + } + } else { + process_off_ride_driver_location( + data.clone(), + merchant_id, + city, + vehicle_type, + driver_id, + driver_mode, + None, + locations, + ) + .await; + } +} + +async fn process_on_ride_driver_location( + data: Data, + merchant_id: MerchantId, + city: CityName, + vehicle_type: VehicleType, + ride_id: RideId, + driver_id: DriverId, + driver_mode: Option, + ride_status: RideStatus, + locations: Vec, +) -> () { + if locations.len() > 100 { + error!( + "Way points more than 100 points {} on_ride: True", + locations.len() + ); + } + let mut geo_entries = Vec::new(); + + for loc in locations { + geo_entries.push(Point { + lat: loc.pt.lat, + lon: loc.pt.lon, + }); + + let dimensions = Dimensions { + merchant_id: merchant_id.clone(), + city: city.clone(), + vehicle_type: vehicle_type.clone(), + on_ride: true, + }; + + if data.include_on_ride_driver_for_nearby { + data.push_queue(dimensions, loc.clone().pt, driver_id.clone()) .await; - } + } + + let _ = kafka_stream_updates( + data.clone(), + &merchant_id, + &ride_id, + loc, + Some(ride_status.clone()), + driver_mode.clone(), + ) + .await; + } + + let _ = + push_on_ride_driver_location(data.clone(), &driver_id, &merchant_id, &city, &geo_entries) + .await; - push_on_ride_driver_location( + let on_ride_driver_location_count = + get_on_ride_driver_location_count(data.clone(), &driver_id, &merchant_id, &city).await; + + if let Ok(on_ride_driver_location_count) = on_ride_driver_location_count { + if on_ride_driver_location_count >= data.batch_size { + let on_ride_driver_locations = get_on_ride_driver_locations( data.clone(), &driver_id, &merchant_id, &city, - &geo_entries, + on_ride_driver_location_count, ) - .await?; - - let on_ride_driver_location_count = - get_on_ride_driver_location_count(data.clone(), &driver_id, &merchant_id, &city) - .await?; - - if on_ride_driver_location_count >= data.batch_size { - let on_ride_driver_locations = get_on_ride_driver_locations( - data.clone(), - &driver_id, - &merchant_id, - &city, - on_ride_driver_location_count, - ) - .await?; + .await; - let _: APISuccess = call_api( + if let Ok(on_ride_driver_locations) = on_ride_driver_locations { + let _ = call_api::( Method::POST, &data.bulk_location_callback_url, vec![("content-type", "application/json")], @@ -273,59 +320,57 @@ async fn process_driver_locations( loc: on_ride_driver_locations, }), ) - .await?; - } - } - _ => { - if locations.len() > 100 { - error!( - "Way points more than 100 points {} on_ride: False", - locations.len() - ); - } - - let mut queue = data.queue.lock().await; - - for loc in locations { - let dimensions = Dimensions { - merchant_id: merchant_id.clone(), - city: city.clone(), - vehicle_type: vehicle_type.clone(), - on_ride: false, - }; - - prometheus::QUEUE_GUAGE.inc(); - - queue.entry(dimensions).or_insert_with(Vec::new).push(( - loc.pt.lat, - loc.pt.lon, - driver_id.clone(), - )); - - let current_ride_status = get_driver_ride_status( - data.clone(), - &driver_id.clone(), - &merchant_id.clone(), - &city.clone(), - ) - .await?; - - let _ = kafka_stream_updates( - data.clone(), - &merchant_id, - &"".to_string(), - loc, - current_ride_status, - driver_mode.clone(), - ) .await; } - - drop(queue); } } +} - Ok(()) +async fn process_off_ride_driver_location( + data: Data, + merchant_id: MerchantId, + city: CityName, + vehicle_type: VehicleType, + driver_id: DriverId, + driver_mode: Option, + ride_status: Option, + locations: Vec, +) -> () { + if locations.len() > 100 { + error!( + "Way points more than 100 points {} on_ride: False", + locations.len() + ); + } + + for loc in locations { + let dimensions = Dimensions { + merchant_id: merchant_id.clone(), + city: city.clone(), + vehicle_type: vehicle_type.clone(), + on_ride: false, + }; + + data.push_queue( + dimensions, + Point { + lat: loc.pt.lat, + lon: loc.pt.lon, + }, + driver_id.clone(), + ) + .await; + + let _ = kafka_stream_updates( + data.clone(), + &merchant_id, + &"".to_string(), + loc, + ride_status.clone(), + driver_mode.clone(), + ) + .await; + } } pub async fn track_driver_location( diff --git a/crates/location_tracking_service/src/domain/api/internal/driver.rs b/crates/location_tracking_service/src/domain/api/internal/driver.rs index 81c10ce..20ca997 100644 --- a/crates/location_tracking_service/src/domain/api/internal/driver.rs +++ b/crates/location_tracking_service/src/domain/api/internal/driver.rs @@ -10,7 +10,7 @@ use actix_web::{ web::{Data, Json}, }; -use crate::{common::types::*, domain::action::internal::*}; +use crate::{common::types::*, domain::action::internal::*, environment::AppState}; use shared::tools::error::AppError; #[post("/internal/driver/driverDetails")] diff --git a/crates/location_tracking_service/src/domain/api/internal/location.rs b/crates/location_tracking_service/src/domain/api/internal/location.rs index 8456a09..a146b11 100644 --- a/crates/location_tracking_service/src/domain/api/internal/location.rs +++ b/crates/location_tracking_service/src/domain/api/internal/location.rs @@ -12,8 +12,8 @@ use actix_web::{ }; use crate::{ - common::types::*, domain::{action::internal::*, types::internal::location::*}, + environment::AppState, }; use shared::tools::error::AppError; diff --git a/crates/location_tracking_service/src/domain/api/internal/ride.rs b/crates/location_tracking_service/src/domain/api/internal/ride.rs index eefd48e..9b80689 100644 --- a/crates/location_tracking_service/src/domain/api/internal/ride.rs +++ b/crates/location_tracking_service/src/domain/api/internal/ride.rs @@ -13,6 +13,7 @@ use actix_web::{ use crate::{ common::types::*, domain::{action::internal::*, types::internal::ride::*}, + environment::AppState, }; use shared::tools::error::AppError; diff --git a/crates/location_tracking_service/src/domain/api/ui/healthcheck.rs b/crates/location_tracking_service/src/domain/api/ui/healthcheck.rs index 4009d82..e202a94 100644 --- a/crates/location_tracking_service/src/domain/api/ui/healthcheck.rs +++ b/crates/location_tracking_service/src/domain/api/ui/healthcheck.rs @@ -11,7 +11,8 @@ use actix_web::{ }; use crate::{ - common::types::*, domain::types::internal::ride::ResponseData, redis::keys::health_check_key, + domain::types::internal::ride::ResponseData, environment::AppState, + redis::keys::health_check_key, }; use shared::tools::error::AppError; diff --git a/crates/location_tracking_service/src/domain/api/ui/location.rs b/crates/location_tracking_service/src/domain/api/ui/location.rs index 6ca9438..02f542e 100644 --- a/crates/location_tracking_service/src/domain/api/ui/location.rs +++ b/crates/location_tracking_service/src/domain/api/ui/location.rs @@ -16,6 +16,7 @@ use actix_web::{ use crate::{ common::types::*, domain::{action::ui::location, types::ui::location::*}, + environment::AppState, }; use shared::tools::error::AppError; diff --git a/crates/location_tracking_service/src/environment.rs b/crates/location_tracking_service/src/environment.rs new file mode 100644 index 0000000..ab04f39 --- /dev/null +++ b/crates/location_tracking_service/src/environment.rs @@ -0,0 +1,284 @@ +use std::{ + collections::HashMap, + env::var, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use rdkafka::{error::KafkaError, producer::FutureProducer, ClientConfig}; +use serde::Deserialize; +use shared::{ + redis::types::{RedisConnectionPool, RedisSettings}, + tools::error::AppError, + utils::{logger::*, prometheus}, +}; +use tokio::sync::Mutex; + +use crate::common::{geo_polygon::read_geo_polygon, types::*}; + +#[derive(Debug, Deserialize, Clone)] +pub struct AppConfig { + pub port: u16, + pub non_persistent_redis_cfg: RedisConfig, + pub drainer_delay: u64, + pub include_on_ride_driver_for_nearby: bool, + pub persistent_redis_cfg: RedisConfig, + pub auth_url: String, + pub auth_api_key: String, + pub bulk_location_callback_url: String, + pub auth_token_expiry: u32, + pub redis_expiry: u32, + pub min_location_accuracy: i32, + pub last_location_timstamp_expiry: u32, + pub location_update_limit: usize, + pub location_update_interval: u64, + pub kafka_cfg: KafkaConfig, + pub driver_location_update_topic: String, + pub driver_location_update_key: String, + pub batch_size: i64, + pub bucket_size: u64, + pub nearby_bucket_threshold: u64, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct KafkaConfig { + pub kafka_key: String, + pub kafka_host: String, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct RedisConfig { + pub redis_host: String, + pub redis_port: u16, + pub redis_pool_size: usize, + pub redis_partition: usize, + pub reconnect_max_attempts: u32, + pub reconnect_delay: u32, + pub default_ttl: u32, + pub default_hash_ttl: u32, + pub stream_read_count: u64, +} + +#[derive(Clone)] +pub struct AppState { + pub non_persistent_redis: Arc, + pub persistent_redis: Arc, + pub queue: Arc>>>, + pub drainer_delay: u64, + pub include_on_ride_driver_for_nearby: bool, + pub polygon: Vec, + pub auth_url: String, + pub auth_api_key: String, + pub bulk_location_callback_url: String, + pub auth_token_expiry: u32, + pub redis_expiry: u32, + pub min_location_accuracy: i32, + pub last_location_timstamp_expiry: u32, + pub location_update_limit: usize, + pub location_update_interval: u64, + pub producer: Option, + pub driver_location_update_topic: String, + pub driver_location_update_key: String, + pub batch_size: i64, + pub bucket_size: u64, + pub nearby_bucket_threshold: u64, +} + +impl AppState { + pub async fn new(app_config: AppConfig) -> AppState { + let non_persistent_redis = Arc::new( + RedisConnectionPool::new(&RedisSettings::new( + app_config.non_persistent_redis_cfg.redis_host, + app_config.non_persistent_redis_cfg.redis_port, + app_config.non_persistent_redis_cfg.redis_pool_size, + app_config.non_persistent_redis_cfg.redis_partition, + app_config.non_persistent_redis_cfg.reconnect_max_attempts, + app_config.non_persistent_redis_cfg.reconnect_delay, + app_config.non_persistent_redis_cfg.default_ttl, + app_config.non_persistent_redis_cfg.default_hash_ttl, + app_config.non_persistent_redis_cfg.stream_read_count, + )) + .await + .expect("Failed to create Location Redis connection pool"), + ); + + let persistent_redis = Arc::new( + RedisConnectionPool::new(&RedisSettings::new( + app_config.persistent_redis_cfg.redis_host, + app_config.persistent_redis_cfg.redis_port, + app_config.persistent_redis_cfg.redis_pool_size, + app_config.persistent_redis_cfg.redis_partition, + app_config.persistent_redis_cfg.reconnect_max_attempts, + app_config.persistent_redis_cfg.reconnect_delay, + app_config.persistent_redis_cfg.default_ttl, + app_config.persistent_redis_cfg.default_hash_ttl, + app_config.persistent_redis_cfg.stream_read_count, + )) + .await + .expect("Failed to create Generic Redis connection pool"), + ); + + let queue = Arc::new(Mutex::new(HashMap::new())); + + let geo_config_path = var("GEO_CONFIG").unwrap_or_else(|_| "./geo_config".to_string()); + let polygons = read_geo_polygon(&geo_config_path).expect("Failed to read geoJSON"); + + let producer: Option; + + let result: Result = ClientConfig::new() + .set( + app_config.kafka_cfg.kafka_key, + app_config.kafka_cfg.kafka_host, + ) + .set("compression.type", "lz4") + .create(); + + match result { + Ok(val) => { + producer = Some(val); + } + Err(err) => { + producer = None; + info!( + tag = "[Kafka Connection]", + "Error connecting to kafka config: {err}" + ); + } + } + + AppState { + non_persistent_redis, + persistent_redis, + drainer_delay: app_config.drainer_delay, + queue, + include_on_ride_driver_for_nearby: app_config.include_on_ride_driver_for_nearby, + polygon: polygons, + auth_url: app_config.auth_url, + auth_api_key: app_config.auth_api_key, + bulk_location_callback_url: app_config.bulk_location_callback_url, + auth_token_expiry: app_config.auth_token_expiry, + min_location_accuracy: app_config.min_location_accuracy, + redis_expiry: app_config.redis_expiry, + last_location_timstamp_expiry: app_config.last_location_timstamp_expiry, + location_update_limit: app_config.location_update_limit, + location_update_interval: app_config.location_update_interval, + producer, + driver_location_update_topic: app_config.driver_location_update_topic, + driver_location_update_key: app_config.driver_location_update_key, + batch_size: app_config.batch_size, + bucket_size: app_config.bucket_size, + nearby_bucket_threshold: app_config.nearby_bucket_threshold, + } + } + + pub async fn push_queue(&self, dimensions: Dimensions, pt: Point, driver_id: DriverId) { + let mut queue = self.queue.lock().await; + prometheus::QUEUE_GUAGE.inc(); + queue + .entry(dimensions) + .or_insert_with(Vec::new) + .push((pt.lat, pt.lon, driver_id)); + } + + pub async fn get_and_clear_queue(&self) -> HashMap> { + let mut queue = self.queue.lock().await; + let values = queue.clone(); + for _ in 0..queue.len() { + prometheus::QUEUE_GUAGE.dec(); + } + queue.clear(); + values + } + + pub async fn sliding_window_limiter( + &self, + key: &str, + frame_hits_lim: usize, + frame_len: u32, + persistent_redis_pool: &RedisConnectionPool, + ) -> Result, AppError> { + let curr_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() as i64; + + let hits = persistent_redis_pool.get_key(key).await?; + + let hits = match hits { + Some(hits) => serde_json::from_str::>(&hits).map_err(|_| { + AppError::InternalError("Failed to parse hits from redis.".to_string()) + })?, + None => vec![], + }; + let (filt_hits, ret) = + Self::sliding_window_limiter_pure(curr_time, &hits, frame_hits_lim, frame_len); + + if !ret { + return Err(AppError::HitsLimitExceeded); + } + + let _ = persistent_redis_pool + .set_with_expiry( + key, + serde_json::to_string(&filt_hits).expect("Failed to parse filt_hits to string."), + frame_len, + ) + .await; + + Ok(filt_hits) + } + + fn sliding_window_limiter_pure( + curr_time: i64, + hits: &[i64], + frame_hits_lim: usize, + frame_len: u32, + ) -> (Vec, bool) { + let curr_frame = Self::get_time_frame(curr_time, frame_len); + let filt_hits = hits + .iter() + .filter(|&&hit| Self::hits_filter(curr_frame, hit)) + .cloned() + .collect::>(); + let prev_frame_hits_len = filt_hits + .iter() + .filter(|&&hit| Self::prev_frame_hits_filter(curr_frame, hit)) + .count(); + let prev_frame_weight = 1.0 - (curr_time as f64 % frame_len as f64) / frame_len as f64; + let curr_frame_hits_len: i32 = filt_hits + .iter() + .filter(|&&hit| Self::curr_frame_hits_filter(curr_frame, hit)) + .count() as i32; + + let res = (prev_frame_hits_len as f64 * prev_frame_weight) as i32 + curr_frame_hits_len + < frame_hits_lim as i32; + + ( + if res { + let mut new_hits = Vec::with_capacity(filt_hits.len() + 1); + new_hits.push(curr_frame); + new_hits.extend(filt_hits); + new_hits + } else { + filt_hits.clone() + }, + res, + ) + } + + fn get_time_frame(time: i64, frame_len: u32) -> i64 { + time / frame_len as i64 + } + + fn hits_filter(curr_frame: i64, time_frame: i64) -> bool { + time_frame == curr_frame - 1 || time_frame == curr_frame + } + + fn prev_frame_hits_filter(curr_frame: i64, time_frame: i64) -> bool { + time_frame == curr_frame - 1 + } + + fn curr_frame_hits_filter(curr_frame: i64, time_frame: i64) -> bool { + time_frame == curr_frame + } +} diff --git a/crates/location_tracking_service/src/lib.rs b/crates/location_tracking_service/src/lib.rs index 62d9e86..4804649 100644 --- a/crates/location_tracking_service/src/lib.rs +++ b/crates/location_tracking_service/src/lib.rs @@ -7,4 +7,6 @@ */ pub mod common; pub mod domain; +pub mod environment; +pub mod middleware; pub mod redis; diff --git a/crates/location_tracking_service/src/main.rs b/crates/location_tracking_service/src/main.rs index b74e020..384f048 100644 --- a/crates/location_tracking_service/src/main.rs +++ b/crates/location_tracking_service/src/main.rs @@ -5,93 +5,17 @@ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -use actix_web::body::MessageBody; -use actix_web::dev::{Service, ServiceRequest, ServiceResponse}; -use actix_web::{web, App, Error, HttpServer}; -use futures::FutureExt; +use actix_web::{web, App, HttpServer}; use location_tracking_service::common::utils::get_current_bucket; -use rdkafka::error::KafkaError; -use shared::incoming_api; -use shared::redis::types::{RedisConnectionPool, RedisSettings}; +use location_tracking_service::domain::api; +use location_tracking_service::environment::{AppConfig, AppState}; +use location_tracking_service::middleware::*; +use location_tracking_service::redis::commands::*; use shared::tools::error::AppError; use shared::utils::{logger::*, prometheus::*}; use std::env::var; -use std::time::Instant; -use tokio::{spawn, sync::Mutex, time::Duration}; -use tracing::Span; -use uuid::Uuid; - -use std::{collections::HashMap, sync::Arc}; - -use location_tracking_service::common::{geo_polygon::read_geo_polygon, types::*}; -use location_tracking_service::domain::api; - -use location_tracking_service::redis::commands::*; - -use rdkafka::config::ClientConfig; -use rdkafka::producer::FutureProducer; -use serde::Deserialize; -use tracing_actix_web::{DefaultRootSpanBuilder, RootSpanBuilder, TracingLogger}; - -pub struct DomainRootSpanBuilder; - -impl RootSpanBuilder for DomainRootSpanBuilder { - fn on_request_start(request: &ServiceRequest) -> Span { - let request_id = request.headers().get("x-request-id"); - let request_id = match request_id { - Some(request_id) => request_id.to_str().map(|str| str.to_string()), - None => Ok(Uuid::new_v4().to_string()), - } - .unwrap_or(Uuid::new_v4().to_string()); - tracing_actix_web::root_span!(request, request_id) - } - - fn on_request_end(span: Span, outcome: &Result, Error>) { - DefaultRootSpanBuilder::on_request_end(span, outcome); - } -} - -#[derive(Debug, Deserialize, Clone)] -pub struct AppConfig { - pub port: u16, - pub non_persistent_redis_cfg: RedisConfig, - pub drainer_delay: u64, - pub persistent_redis_cfg: RedisConfig, - pub auth_url: String, - pub auth_api_key: String, - pub bulk_location_callback_url: String, - pub auth_token_expiry: u32, - pub redis_expiry: u32, - pub min_location_accuracy: i32, - pub last_location_timstamp_expiry: u32, - pub location_update_limit: usize, - pub location_update_interval: u64, - pub kafka_cfg: KafkaConfig, - pub driver_location_update_topic: String, - pub driver_location_update_key: String, - pub batch_size: i64, - pub bucket_size: u64, - pub nearby_bucket_threshold: u64, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct KafkaConfig { - pub kafka_key: String, - pub kafka_host: String, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct RedisConfig { - pub redis_host: String, - pub redis_port: u16, - pub redis_pool_size: usize, - pub redis_partition: usize, - pub reconnect_max_attempts: u32, - pub reconnect_delay: u32, - pub default_ttl: u32, - pub default_hash_ttl: u32, - pub stream_read_count: u64, -} +use tokio::{spawn, time::Duration}; +use tracing_actix_web::TracingLogger; pub fn read_dhall_config(config_path: &str) -> Result { let config = serde_dhall::from_file(config_path).parse::(); @@ -101,94 +25,10 @@ pub fn read_dhall_config(config_path: &str) -> Result { } } -pub async fn make_app_state(app_config: AppConfig) -> AppState { - let non_persistent_redis = Arc::new( - RedisConnectionPool::new(&RedisSettings::new( - app_config.non_persistent_redis_cfg.redis_host, - app_config.non_persistent_redis_cfg.redis_port, - app_config.non_persistent_redis_cfg.redis_pool_size, - app_config.non_persistent_redis_cfg.redis_partition, - app_config.non_persistent_redis_cfg.reconnect_max_attempts, - app_config.non_persistent_redis_cfg.reconnect_delay, - app_config.non_persistent_redis_cfg.default_ttl, - app_config.non_persistent_redis_cfg.default_hash_ttl, - app_config.non_persistent_redis_cfg.stream_read_count, - )) - .await - .expect("Failed to create Location Redis connection pool"), - ); - - let persistent_redis = Arc::new( - RedisConnectionPool::new(&RedisSettings::new( - app_config.persistent_redis_cfg.redis_host, - app_config.persistent_redis_cfg.redis_port, - app_config.persistent_redis_cfg.redis_pool_size, - app_config.persistent_redis_cfg.redis_partition, - app_config.persistent_redis_cfg.reconnect_max_attempts, - app_config.persistent_redis_cfg.reconnect_delay, - app_config.persistent_redis_cfg.default_ttl, - app_config.persistent_redis_cfg.default_hash_ttl, - app_config.persistent_redis_cfg.stream_read_count, - )) - .await - .expect("Failed to create Generic Redis connection pool"), - ); - - let queue = Arc::new(Mutex::new(HashMap::new())); - - let geo_config_path = var("GEO_CONFIG").unwrap_or_else(|_| "./geo_config".to_string()); - let polygons = read_geo_polygon(&geo_config_path).expect("Failed to read geoJSON"); - - let producer: Option; - - let result: Result = ClientConfig::new() - .set( - app_config.kafka_cfg.kafka_key, - app_config.kafka_cfg.kafka_host, - ) - .set("compression.type", "lz4") - .create(); - - match result { - Ok(val) => { - producer = Some(val); - } - Err(err) => { - producer = None; - info!( - tag = "[Kafka Connection]", - "Error connecting to kafka config: {err}" - ); - } - } - - AppState { - non_persistent_redis, - persistent_redis, - drainer_delay: app_config.drainer_delay, - queue, - polygon: polygons, - auth_url: app_config.auth_url, - auth_api_key: app_config.auth_api_key, - bulk_location_callback_url: app_config.bulk_location_callback_url, - auth_token_expiry: app_config.auth_token_expiry, - min_location_accuracy: app_config.min_location_accuracy, - redis_expiry: app_config.redis_expiry, - last_location_timstamp_expiry: app_config.last_location_timstamp_expiry, - location_update_limit: app_config.location_update_limit, - location_update_interval: app_config.location_update_interval, - producer, - driver_location_update_topic: app_config.driver_location_update_topic, - driver_location_update_key: app_config.driver_location_update_key, - batch_size: app_config.batch_size, - bucket_size: app_config.bucket_size, - nearby_bucket_threshold: app_config.nearby_bucket_threshold, - } -} - async fn run_drainer(data: web::Data) -> Result<(), AppError> { let bucket = get_current_bucket(data.bucket_size)?; - let mut queue = data.queue.lock().await; + + let queue = data.get_and_clear_queue().await; for (dimensions, geo_entries) in queue.iter() { let merchant_id = &dimensions.merchant_id; @@ -208,9 +48,6 @@ async fn run_drainer(data: web::Data) -> Result<(), AppError> { info!(tag = "[Queued Entries For Draining]", length = %geo_entries.len(), "Queue: {:?}\nPushing to redis server", geo_entries); } } - queue.clear(); - - drop(queue); Ok(()) } @@ -226,9 +63,11 @@ async fn start_server() -> std::io::Result<()> { std::process::exit(1); }); - let app_state = make_app_state(app_config.clone()).await; + let port = app_config.port; + + let app_state = AppState::new(app_config).await; - let data = web::Data::new(app_state.clone()); + let data = web::Data::new(app_state); let thread_data = data.clone(); spawn(async move { @@ -243,27 +82,12 @@ async fn start_server() -> std::io::Result<()> { HttpServer::new(move || { App::new() .app_data(data.clone()) - .wrap_fn(|req, srv| { - let start_time = Instant::now(); - info!(tag = "[INCOMING REQUEST]", request_method = %req.method(), request_path = %req.path()); - srv.call(req) - .map(move |res: Result| { - let response = res?; - info!(tag = "[INCOMING API]", request_method = %response.request().method(), request_path = %response.request().path(), response_status = %response.status(), latency = format!("{:?}ms", start_time.elapsed().as_millis())); - incoming_api!( - response.request().method().as_str(), - response.request().uri().to_string().as_str(), - response.status().as_str(), - start_time - ); - Ok(response) - }) - }) + .wrap(IncomingRequestMetrics) .wrap(TracingLogger::::new()) .wrap(prometheus_metrics()) .configure(api::handler) }) - .bind(("0.0.0.0", app_config.port))? + .bind(("0.0.0.0", port))? .run() .await } diff --git a/crates/location_tracking_service/src/middleware.rs b/crates/location_tracking_service/src/middleware.rs new file mode 100644 index 0000000..82457a6 --- /dev/null +++ b/crates/location_tracking_service/src/middleware.rs @@ -0,0 +1,88 @@ +use actix::fut::{ready, Ready}; +use actix_web::{ + body::MessageBody, + dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, + Error, +}; +use futures::future::LocalBoxFuture; +use shared::{ + incoming_api, + utils::{logger::*, prometheus::INCOMING_API}, +}; +use tokio::time::Instant; +use tracing::Span; +use tracing_actix_web::{DefaultRootSpanBuilder, RootSpanBuilder}; +use uuid::Uuid; + +pub struct DomainRootSpanBuilder; + +impl RootSpanBuilder for DomainRootSpanBuilder { + fn on_request_start(request: &ServiceRequest) -> Span { + let request_id = request.headers().get("x-request-id"); + let request_id = match request_id { + Some(request_id) => request_id.to_str().map(|str| str.to_string()), + None => Ok(Uuid::new_v4().to_string()), + } + .unwrap_or(Uuid::new_v4().to_string()); + tracing_actix_web::root_span!(request, request_id) + } + + fn on_request_end(span: Span, outcome: &Result, Error>) { + DefaultRootSpanBuilder::on_request_end(span, outcome); + } +} + +pub struct IncomingRequestMetrics; + +impl Transform for IncomingRequestMetrics +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type InitError = (); + type Transform = IncomingRequestMetricsMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(IncomingRequestMetricsMiddleware { service })) + } +} + +pub struct IncomingRequestMetricsMiddleware { + service: S, +} + +impl Service for IncomingRequestMetricsMiddleware +where + S: Service, Error = Error>, + S::Future: 'static, + B: 'static, +{ + type Response = ServiceResponse; + type Error = Error; + type Future = LocalBoxFuture<'static, Result>; + + forward_ready!(service); + + fn call(&self, req: ServiceRequest) -> Self::Future { + let start_time = Instant::now(); + info!(tag = "[INCOMING REQUEST]", request_method = %req.method(), request_path = %req.path()); + + let fut = self.service.call(req); + + Box::pin(async move { + let response = fut.await?; + info!(tag = "[INCOMING API]", request_method = %response.request().method(), request_path = %response.request().path(), response_status = %response.status(), latency = format!("{:?}ms", start_time.elapsed().as_millis())); + incoming_api!( + response.request().method().as_str(), + response.request().uri().to_string().as_str(), + response.status().as_str(), + start_time + ); + Ok(response) + }) + } +} diff --git a/crates/location_tracking_service/src/redis/commands.rs b/crates/location_tracking_service/src/redis/commands.rs index cf849d6..04c4eef 100644 --- a/crates/location_tracking_service/src/redis/commands.rs +++ b/crates/location_tracking_service/src/redis/commands.rs @@ -6,12 +6,13 @@ the GNU Affero General Public License along with this program. If not, see . */ use crate::common::types::*; +use crate::environment::AppState; use crate::redis::keys::*; use actix_web::web::Data; use chrono::{DateTime, Utc}; use fred::types::{GeoPosition, GeoUnit, GeoValue, MultipleGeoValues, RedisValue, SortOrder}; use futures::Future; -use shared::utils::{logger::*, prometheus}; +use shared::utils::logger::*; use shared::{redis::types::RedisConnectionPool, tools::error::AppError}; use std::collections::HashSet; use std::sync::Arc; @@ -197,15 +198,12 @@ pub async fn push_drainer_driver_location( ) -> Result<(), AppError> { let geo_values: Vec = geo_entries .iter() - .map(|(lat, lon, driver_id)| { - prometheus::QUEUE_GUAGE.dec(); - GeoValue { - coordinates: GeoPosition { - latitude: *lat, - longitude: *lon, - }, - member: driver_id.into(), - } + .map(|(lat, lon, driver_id)| GeoValue { + coordinates: GeoPosition { + latitude: *lat, + longitude: *lon, + }, + member: driver_id.into(), }) .collect(); let multiple_geo_values: MultipleGeoValues = geo_values.into(); @@ -397,12 +395,15 @@ pub async fn get_all_drivers_ride_details( .into_iter() .map(|ride_details| { if let Some(ride_details) = ride_details { - Some( - serde_json::from_str::(&ride_details) - .map_err(|err| AppError::DeserializationError(err.to_string())) - .expect("Todo :: Handle") - .ride_status, - ) + let ride_details = serde_json::from_str::(&ride_details) + .map_err(|err| AppError::DeserializationError(err.to_string())); + match ride_details { + Ok(ride_details) => Some(ride_details.ride_status), + Err(err) => { + error!("RideDetails DeserializationError : {}", err); + None + } + } } else { None } @@ -417,25 +418,21 @@ pub async fn get_driver_ride_details( driver_id: &DriverId, merchant_id: &MerchantId, city: &CityName, -) -> Result { +) -> Result, AppError> { let ride_details: Option = data .persistent_redis .get_key(&on_ride_details_key(merchant_id, city, driver_id)) .await?; - let ride_details = match ride_details { - Some(ride_details) => ride_details, - None => { - return Err(AppError::InternalError( - "Driver ride details not found".to_string(), - )) - } - }; - - let ride_details = serde_json::from_str::(&ride_details) - .map_err(|err| AppError::InternalError(err.to_string()))?; + match ride_details { + Some(ride_details) => { + let ride_details = serde_json::from_str::(&ride_details) + .map_err(|err| AppError::InternalError(err.to_string()))?; - Ok(ride_details) + Ok(Some(ride_details)) + } + None => Ok(None), + } } pub async fn get_driver_details( @@ -551,26 +548,23 @@ pub async fn get_driver_id( pub async fn with_lock_redis( redis: Arc, - key: &str, + key: String, expiry: i64, callback: F, args: Args, -) -> Result<(), AppError> +) -> () where F: Fn(Args) -> Fut, Args: Send + 'static, - Fut: Future>, + Fut: Future, { - let lock = redis.setnx_with_expiry(key, true, expiry).await; + let lock = redis.setnx_with_expiry(&key, true, expiry).await; if lock.is_ok() { let resp = callback(args).await; - let _ = redis.delete_key(key).await; - resp? - } else { - return Err(AppError::HitsLimitExceeded); + let _ = redis.delete_key(&key).await; + resp } - Ok(()) } pub async fn get_all_driver_last_locations( @@ -592,11 +586,16 @@ pub async fn get_all_driver_last_locations( .map(|driver_all_details| { let driver_all_details: Option = if let Some(driver_all_details) = driver_all_details { - Some( + let driver_all_details = serde_json::from_str::(driver_all_details) - .map_err(|err| AppError::DeserializationError(err.to_string())) - .expect("Todo :: Handle"), - ) + .map_err(|err| AppError::DeserializationError(err.to_string())); + match driver_all_details { + Ok(driver_all_details) => Some(driver_all_details), + Err(err) => { + error!("DriverAllDetails DeserializationError : {}", err); + None + } + } } else { None }; diff --git a/dhall_config/location_tracking_service.dhall b/dhall_config/location_tracking_service.dhall index 9588763..cd6dd0a 100644 --- a/dhall_config/location_tracking_service.dhall +++ b/dhall_config/location_tracking_service.dhall @@ -30,6 +30,7 @@ let kafkaCfg = { in { non_persistent_redis_cfg, persistent_redis_cfg, + include_on_ride_driver_for_nearby = False, drainer_delay = 10, kafka_cfg = kafkaCfg, port = 8081, @@ -40,7 +41,7 @@ in { min_location_accuracy = 50, redis_expiry = 86400, last_location_timstamp_expiry = 86400, - location_update_limit = 5, + location_update_limit = 6000000000, location_update_interval = 60, driver_location_update_topic = "location-updates", driver_location_update_key = "loc",