Skip to content

Commit

Permalink
feat: GTFS feed import and validation (#1013)
Browse files Browse the repository at this point in the history
* feat: Create tables for GTFS-static archive import (#1006)

* feat: Create tables for GTFS-static archive import

* Clean up comments

* Fixes and adjustments to the gtfs_* migrations

* Add Ecto schemas for all GTFS-imported tables

* Add logic to read from GTFS archive saved at local path and import data into gtfs_* tables

* Make GTFS import function more communicative

* Add /import-GTFS endpoint

* tweak: Use `Repo.insert_all`, defer constraint checking

* Use Postgres COPY for larger tables; Undo all superfluous deferrable constraints; define and use Importable behaviour

* Remove now-unused `create_deferrable` migration helper

* Appease credo, except for one TODO that still needs DOing

* fix: increase length to 100MB for multipart uploads in Plug.Parsers (#1014)

* fix: increase length to 100MB for multipart uploads in Plug.Parsers

* ci: upgrade Github Action upload-artifact to fix deprecation error

* Tweak service/calendar related tables and schemas

* Remove TODO comment that is done

* Update test for service / calendar / calendar_dates tables

* Appease credo

* Keep GTFS timestamps as strings throughout; provide utility fns to convert timestamps to other representations

* Use Ecto.Enum for GTFS enum fields; convert corresponding CSV fields to int before casting

* Appease dialyzer

* Change how int-code and string enum values are defined in the DB

* Oops, forgot to commit auto-generated structure.sql in prev commit

* structure.sql was out of date somehow, even though migrations were already up??? Fixed now

* Update mix.lock

* Simplify some `execute/2` calls in GTFS migrations

* Install Oban

* Add GTFS import Oban workers; Add logic for job-based GTFS import

* Update/add API endpoints for async GTFS import

* Remove `Expires` header from S3 put_object requests, it doesn't do what I thought it did

* Add more comprehensive logging for GTFS import process

* Do not defer optionally-deferrable constraints during GTFS import/validation

* Set queue_target and queue_interval config for DB connection

* Fix duplicate import/validation job handling

* Add `/api/gtfs/check_jobs` endpoint

* fix: Use correct module for checking validation jobs

* Fix route

---------

Co-authored-by: meagharty <149533950+meagharty@users.noreply.github.com>
  • Loading branch information
jzimbel-mbta and meagharty authored Oct 18, 2024
1 parent 02a3885 commit e502c02
Show file tree
Hide file tree
Showing 49 changed files with 3,282 additions and 20 deletions.
12 changes: 11 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ config :arrow,
shape_storage_enabled?: false,
shape_storage_bucket: "mbta-arrow",
shape_storage_prefix: "shape-uploads/",
shape_storage_request_fn: {ExAws, :request}
shape_storage_request_fn: {ExAws, :request},
gtfs_archive_storage_enabled?: false,
gtfs_archive_storage_bucket: "mbta-arrow",
gtfs_archive_storage_prefix: "gtfs-archive-uploads/",
gtfs_archive_storage_request_fn: {ExAws, :request}

# Configures the endpoint
config :arrow, ArrowWeb.Endpoint,
Expand All @@ -47,6 +51,12 @@ config :arrow, ArrowWeb.Endpoint,
pubsub_server: Arrow.PubSub,
live_view: [signing_salt: "35DDvOCJ"]

# Configures Oban, the job processing library
config :arrow, Oban,
engine: Oban.Engines.Basic,
queues: [default: 10, gtfs_import: 1],
repo: Arrow.Repo

config :esbuild,
version: "0.17.11",
default: [
Expand Down
4 changes: 3 additions & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ config :arrow, dev_routes: true
# Set prefix env for s3 uploads
config :arrow,
shape_storage_enabled?: true,
shape_storage_prefix_env: "dev/local/"
shape_storage_prefix_env: "dev/local/",
gtfs_archive_storage_enabled?: true,
gtfs_archive_storage_prefix_env: "dev/local/"

# Do not include metadata nor timestamps in development logs
config :logger, :console, format: "[$level] $message\n"
Expand Down
3 changes: 2 additions & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import Config
# before starting your production server.

config :arrow,
shape_storage_enabled?: true
shape_storage_enabled?: true,
gtfs_archive_storage_enabled?: true

config :arrow, :websocket_check_origin, [
"https://*.mbta.com",
Expand Down
7 changes: 5 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ if config_env() == :prod do
port: port,
pool_size: pool_size,
# password set by `configure` callback below
configure: {Arrow.Repo, :before_connect, []}
configure: {Arrow.Repo, :before_connect, []},
queue_target: 30_000,
queue_interval: 120_000

config :arrow,
shape_storage_prefix_env: System.get_env("S3_PREFIX")
shape_storage_prefix_env: System.get_env("S3_PREFIX"),
gtfs_archive_storage_prefix_env: System.get_env("S3_PREFIX")
end
7 changes: 6 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import Config

config :arrow,
shape_storage_enabled?: false,
shape_storage_request_fn: {Arrow.Mock.ExAws.Request, :request}
shape_storage_request_fn: {Arrow.Mock.ExAws.Request, :request},
gtfs_archive_storage_enabled?: false,
gtfs_archive_storage_request_fn: {Arrow.Mock.ExAws.Request, :request}

# Configure your database
config :arrow, Arrow.Repo,
Expand All @@ -19,6 +21,9 @@ config :arrow, ArrowWeb.Endpoint,

config :arrow, ArrowWeb.AuthManager, secret_key: "test key"

# Prevent Oban from running jobs and plugins during test runs
config :arrow, Oban, testing: :inline

config :ueberauth, Ueberauth,
providers: [
cognito: {Arrow.Ueberauth.Strategy.Fake, []},
Expand Down
2 changes: 2 additions & 0 deletions lib/arrow/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Arrow.Application do
{Phoenix.PubSub, name: Arrow.PubSub},
# Start the Ecto repository
Arrow.Repo,
# Start Oban, the job processing library
{Oban, Application.fetch_env!(:arrow, Oban)},
# Start the endpoint when the application starts
ArrowWeb.Endpoint
] ++
Expand Down
157 changes: 157 additions & 0 deletions lib/arrow/gtfs.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
defmodule Arrow.Gtfs do
@moduledoc """
GTFS import logic.
"""

require Logger
alias Arrow.Gtfs.Importable
alias Arrow.Repo

@import_timeout_ms :timer.minutes(10)

@doc """
Loads a GTFS archive into Arrow's gtfs_* DB tables,
replacing the previous archive's data.
Setting `dry_run?` true causes the transaction to be rolled back
instead of committed, even if all queries succeed.
Returns:
- `:ok` on successful import or dry-run, or skipped import due to unchanged version.
- `{:error, reason}` if the import or dry-run failed.
"""
@spec import(Unzip.t(), String.t(), String.t() | nil, Oban.Job.t(), boolean) ::
:ok | {:error, term}
def import(unzip, new_version, current_version, job, dry_run? \\ false) do
Logger.info("GTFS import or validation job starting #{job_logging_params(job)}")

with :ok <- validate_required_files(unzip),
:ok <- validate_version_change(new_version, current_version) do
case import_transaction(unzip, dry_run?) do
{:ok, _} ->
Logger.info("GTFS import success #{job_logging_params(job)}")
:ok

{:error, :dry_run_success} ->
Logger.info("GTFS validation success #{job_logging_params(job)}")
:ok

{:error, reason} = error ->
Logger.warn(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)

error
end
else
:unchanged ->
Logger.info("GTFS import skipped due to unchanged version #{job_logging_params(job)}")

:ok

{:error, reason} = error ->
Logger.warn(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)

error
end
end

defp job_logging_params(job) do
s3_object_key =
job.args
|> Map.fetch!("s3_uri")
|> URI.parse()
|> then(& &1.path)

archive_version = Map.fetch!(job.args, "archive_version")

"job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\" job_worker=#{job.worker}"
end

defp import_transaction(unzip, dry_run?) do
transaction = fn ->
_ = truncate_all()
import_all(unzip)

if dry_run? do
# Set any deferred constraints to run now, instead of on transaction commit,
# since we don't actually commit the transaction in this case.
_ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE")
Repo.rollback(:dry_run_success)
end
end

{elapsed_ms, result} =
fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end
|> :timer.tc(:millisecond)

action = if dry_run?, do: "validation", else: "import"
Logger.info("GTFS archive #{action} transaction completed elapsed_ms=#{elapsed_ms}")

result
end

defp truncate_all do
tables = Enum.map_join(importable_schemas(), ", ", & &1.__schema__(:source))
Repo.query!("TRUNCATE #{tables}")
end

defp import_all(unzip) do
Enum.each(importable_schemas(), &Importable.import(&1, unzip))
end

defp validate_required_files(unzip) do
files =
unzip
|> Unzip.list_entries()
|> MapSet.new(& &1.file_name)

if MapSet.subset?(required_files(), files) do
:ok
else
missing =
MapSet.difference(required_files(), files)
|> Enum.sort()
|> Enum.join(",")

{:error, "GTFS archive is missing required file(s) missing=#{missing}"}
end
end

@spec validate_version_change(String.t(), String.t() | nil) :: :ok | :unchanged
defp validate_version_change(new_version, current_version)

defp validate_version_change(version, version), do: :unchanged
defp validate_version_change(_new_version, _current_version), do: :ok

defp importable_schemas do
# Listed in the order in which they should be imported.
[
Arrow.Gtfs.FeedInfo,
Arrow.Gtfs.Agency,
Arrow.Gtfs.Checkpoint,
Arrow.Gtfs.Level,
Arrow.Gtfs.Line,
Arrow.Gtfs.Service,
Arrow.Gtfs.Calendar,
Arrow.Gtfs.CalendarDate,
Arrow.Gtfs.Stop,
Arrow.Gtfs.Shape,
Arrow.Gtfs.ShapePoint,
Arrow.Gtfs.Route,
Arrow.Gtfs.Direction,
Arrow.Gtfs.RoutePattern,
Arrow.Gtfs.Trip,
Arrow.Gtfs.StopTime
]
end

defp required_files do
importable_schemas()
|> Enum.flat_map(& &1.filenames())
|> MapSet.new()
end
end
40 changes: 40 additions & 0 deletions lib/arrow/gtfs/agency.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Arrow.Gtfs.Agency do
@moduledoc """
Represents a row from agency.txt.
Changeset is intended for use only in CSV imports--
table contents should be considered read-only otherwise.
"""
use Arrow.Gtfs.Schema
import Ecto.Changeset

@type t :: %__MODULE__{
id: String.t(),
name: String.t(),
url: String.t(),
timezone: String.t(),
lang: String.t() | nil,
phone: String.t() | nil
}

schema "gtfs_agencies" do
field :name, :string
field :url, :string
field :timezone, :string
field :lang, :string
field :phone, :string

has_many :routes, Arrow.Gtfs.Route
end

def changeset(agency, attrs) do
attrs = remove_table_prefix(attrs, "agency")

agency
|> cast(attrs, ~w[id name url timezone lang phone]a)
|> validate_required(~w[id name url timezone]a)
end

@impl Arrow.Gtfs.Importable
def filenames, do: ["agency.txt"]
end
Loading

0 comments on commit e502c02

Please sign in to comment.