Skip to content

Commit

Permalink
finish stop times with new system
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed May 13, 2024
1 parent 9f56d0d commit eafb43b
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 69 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amtrak-gtfs-rt"
version = "0.2.1"
version = "0.2.2"
license = "AGPL-3.0"
description = "Converts Amtrak Track-A-Train to valid GTFS-rt vehicle and trip information"
edition = "2021"
Expand All @@ -19,6 +19,7 @@ amtrak-api = "0.1.0"
base64 = "0.21"
chrono = "0.4"
chrono-tz = "0.8"
futures = "0.3.30"
geojson = "0.24"
gtfs-rt = "0.5"
gtfs-structures = "0.41.2"
Expand All @@ -30,4 +31,4 @@ serde = "1.0"
serde_json = "1.0"

[dev-dependencies]
tokio = { version = "1.35", features = ["full"] }
tokio = { version = "1.35", features = ["full"] }
172 changes: 105 additions & 67 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@
//! .unwrap();
//!
//! let client = reqwest::Client::new();
//! loop {
//! let amtrak_gtfs_rt = amtrak_gtfs_rt::fetch_amtrak_gtfs_rt(&gtfs, &client).await.unwrap();
//! let amtrak_gtfs_rt = amtrak_gtfs_rt::fetch_amtrak_gtfs_rt(&gtfs, &client).await.unwrap();
//!
//! //extract the binary data
//! let vehicle_data = amtrak_gtfs_rt.vehicle_positions.encode_to_vec();
//! let trip_data = amtrak_gtfs_rt.trip_updates.encode_to_vec();
//!
//! std::thread::sleep(std::time::Duration::from_millis(500));
//! }
//! //extract the binary data
//! let vehicle_data = amtrak_gtfs_rt.vehicle_positions.encode_to_vec();
//! let trip_data = amtrak_gtfs_rt.trip_updates.encode_to_vec();
//!}
//!```
//!
Expand All @@ -39,11 +35,13 @@
//! Thus, we've included a function `filter_capital_corridor()` which takes in any `gtfs_rt::FeedMessage` and removes CC vehicles and trips.


use chrono::{Datelike, NaiveDateTime, TimeZone, Weekday};
use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, TimeZone, Weekday};
use geojson::FeatureCollection;
use gtfs_structures::Gtfs;
use std::collections::HashMap;
use std::time::SystemTime;
pub mod stop_times;
use crate::stop_times::RootTripData;

//Written by Kyler Chin - Catenary Transit Initiatives.
pub fn filter_capital_corridor(input: gtfs_rt::FeedMessage) -> gtfs_rt::FeedMessage {
Expand Down Expand Up @@ -125,32 +123,6 @@ pub struct AmtrakArrivalJson {
estdepcmnt: Option<String>,
}

fn feature_to_amtrak_arrival_structs(feature: &geojson::Feature) -> Vec<AmtrakArrivalJson> {
let mut amtrak_arrival_jsons = vec![];

for i in 0i32..100i32 {
let mut key = String::from("Station");
key.push_str(&i.to_string());

match feature.properties.as_ref().unwrap().get(key.as_str()) {
Some(station_text) => match station_text {
serde_json::value::Value::String(station_text) => {
let amtrak_arrival: Result<AmtrakArrivalJson, serde_json::Error> =
serde_json::from_str(&station_text);

if amtrak_arrival.is_ok() {
amtrak_arrival_jsons.push(amtrak_arrival.unwrap());
}
}
_ => {}
},
_ => {}
};
}

amtrak_arrival_jsons
}

fn get_speed(feature: &geojson::Feature) -> Option<f32> {
match feature.properties.as_ref().unwrap().get("Velocity") {
Some(speed_text) => match speed_text {
Expand Down Expand Up @@ -183,7 +155,7 @@ fn get_bearing(feature: &geojson::Feature) -> Option<f32> {
}
}

pub fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature) -> gtfs_rt::FeedEntity {
fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature, stop_times: &HashMap<(String, NaiveDate), RootTripData>) -> gtfs_rt::FeedEntity {
let geometry = feature.geometry.as_ref().unwrap();
let point: Option<geojson::PointType> = match geometry.value.clone() {
geojson::Value::Point(x) => Some(x),
Expand Down Expand Up @@ -214,34 +186,6 @@ pub fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature) -> gtfs_

let speed: Option<f32> = get_speed(feature);

let arrivals: Vec<gtfs_rt::trip_update::StopTimeUpdate> =
feature_to_amtrak_arrival_structs(feature)
.iter()
.map(|feature| gtfs_rt::trip_update::StopTimeUpdate {
stop_sequence: None,
stop_id: Some(feature.code.clone()),
arrival: match &feature.estarr {
Some(estarr) => Some(gtfs_rt::trip_update::StopTimeEvent {
delay: None,
time: Some(time_and_tz_to_unix(&estarr, feature.tz)),
uncertainty: None,
}),
None => None,
},
departure: match &feature.estdep {
Some(estdep) => Some(gtfs_rt::trip_update::StopTimeEvent {
delay: None,
time: Some(time_and_tz_to_unix(&estdep, feature.tz)),
uncertainty: None,
}),
None => None,
},
departure_occupancy_status: None,
schedule_relationship: None,
stop_time_properties: None,
})
.collect::<Vec<gtfs_rt::trip_update::StopTimeUpdate>>();

//unix time seconds
let timestamp: Option<u64> = match feature.properties.as_ref().unwrap().get("updated_at") {
Some(timestamp_text) => match timestamp_text {
Expand Down Expand Up @@ -279,8 +223,58 @@ pub fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature) -> gtfs_
}
.unwrap();

let arrivals: Vec<gtfs_rt::trip_update::StopTimeUpdate> = match &trip_name {
Some(trip_name) => {
match stop_times.get(&(trip_name.clone(), NaiveDate::parse_from_str(origin_time_string.split(" ").nth(0).unwrap(), "%m/%d/%Y").unwrap())) {
Some(amtrak_rt_trip_data) => {
match amtrak_rt_trip_data.data.len() {
0 => vec![],
_ => {
let stops = &amtrak_rt_trip_data.data[0].stops;

stops
.iter()
.map(|amtrak_stop_time|
gtfs_rt::trip_update::StopTimeUpdate {
stop_sequence: None,
stop_id:Some(amtrak_stop_time.station.code.clone()),
arrival: amtrak_stop_time.arrival.as_ref().map(|arrival| {
gtfs_rt::trip_update::StopTimeEvent {
delay: None,
time: match &arrival.status_info.date_time {
Some(x) => Some(DateTime::parse_from_rfc3339(x.as_str()).unwrap().timestamp()),
None => None
},
uncertainty: None,
}
}),
departure: amtrak_stop_time.departure.as_ref().map(|departure| {
gtfs_rt::trip_update::StopTimeEvent {
delay: None,
time: match &departure.status_info.date_time {
Some(x) => Some(DateTime::parse_from_rfc3339(x.as_str()).unwrap().timestamp()),
None => None
},
uncertainty: None,
}
}),
departure_occupancy_status: None,
schedule_relationship: None,
stop_time_properties: None,
}
).collect::<Vec<_>>()
}
}
},
None => vec![]
}
}, None => vec![]
};

let origin_local_time = origin_departure(&origin_time_string, origin_tz);

let starting_yyyy_mm_dd_in_new_york = origin_local_time.with_timezone(&chrono_tz::America::New_York).format("%Y-%m-%d").to_string();

let origin_weekday = origin_local_time.weekday();

let trip_id: Option<String> = match trip_name {
Expand Down Expand Up @@ -358,7 +352,7 @@ pub fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature) -> gtfs_
route_id,
direction_id: None,
start_time: None,
start_date: None,
start_date: Some(starting_yyyy_mm_dd_in_new_york.clone()),
modified_trip: None,
schedule_relationship: None,
};
Expand Down Expand Up @@ -511,14 +505,54 @@ pub async fn fetch_amtrak_gtfs_rt_joined(
let features_collection: FeatureCollection =
FeatureCollection::try_from(geojson)?;

let list_of_train_ids = features_collection
.features
.iter()
.map(|feature| {
{ let train_num = match feature.properties.as_ref().unwrap().get("TrainNum") {
Some(a) => match a {
serde_json::value::Value::String(x) => Some(x.clone()),
_ => None,
},
_ => None,
};
let starting_date = match feature.properties.as_ref().unwrap().get("OrigSchDep") {
Some(a) => match a {
serde_json::value::Value::String(x) => {
let first_half = NaiveDate::parse_from_str(x.split(" ").nth(0).unwrap(), "%m/%d/%Y");

match first_half {
Ok(first_half) => {
Some(first_half)
},
Err(_) => None,
}
},
_ => None,
},
_ => None,
};

match (train_num, starting_date) {
(Some(train_num), Some(starting_date)) => Some((train_num, starting_date)),
_ => None
}
}
}).flatten().collect::<Vec<(String, NaiveDate)>>();

//query the stop times all simultaniously and put into hashmap
//query_all_trips_simultaniously

let stop_times = stop_times::query_all_trips_simultaniously(&list_of_train_ids).await;

//println!("Successfully decrypted");
//println!("{}", decrypted_string);
Ok(GtfsAmtrakResultsJoined {
unified_feed: gtfs_rt::FeedMessage {
entity: features_collection
.features
.iter()
.map(|feature: &geojson::Feature| feature_to_gtfs_unified(&gtfs, feature))
.map(|feature: &geojson::Feature| feature_to_gtfs_unified(&gtfs, feature, &stop_times))
.collect::<Vec<gtfs_rt::FeedEntity>>(),
header: make_gtfs_header(),
},
Expand Down Expand Up @@ -553,6 +587,10 @@ mod tests {

assert!(amtrak_results.is_ok());

println!("{:#?}", amtrak_results.unwrap());
for entity in amtrak_results.unwrap().unified_feed.entity {
println!("{:?}", entity.trip_update);
}

// println!("{:?}", amtrak_results.unwrap());
}
}
Loading

0 comments on commit eafb43b

Please sign in to comment.