Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: GTFS feed import #1013

Merged
merged 36 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b023be3
feat: Create tables for GTFS-static archive import (#1006)
jzimbel-mbta Sep 5, 2024
799e4e9
Fixes and adjustments to the gtfs_* migrations
jzimbel-mbta Sep 19, 2024
5cda8b4
Add Ecto schemas for all GTFS-imported tables
jzimbel-mbta Sep 19, 2024
da71fb2
Add logic to read from GTFS archive saved at local path and import da…
jzimbel-mbta Sep 19, 2024
e7832e7
Make GTFS import function more communicative
jzimbel-mbta Sep 19, 2024
4972ace
Add /import-GTFS endpoint
jzimbel-mbta Sep 19, 2024
8a9dfb4
tweak: Use `Repo.insert_all`, defer constraint checking
jzimbel-mbta Sep 24, 2024
a0aad60
Use Postgres COPY for larger tables; Undo all superfluous deferrable …
jzimbel-mbta Sep 26, 2024
f3eb075
Remove now-unused `create_deferrable` migration helper
jzimbel-mbta Sep 26, 2024
87bae04
Appease credo, except for one TODO that still needs DOing
jzimbel-mbta Sep 26, 2024
6fb7312
fix: increase length to 100MB for multipart uploads in Plug.Parsers (…
meagharty Sep 26, 2024
e614628
Tweak service/calendar related tables and schemas
jzimbel-mbta Oct 1, 2024
2de6323
Remove TODO comment that is done
jzimbel-mbta Oct 1, 2024
7348be9
Update test for service / calendar / calendar_dates tables
jzimbel-mbta Oct 1, 2024
1da6406
Appease credo
jzimbel-mbta Oct 1, 2024
9d355d2
Keep GTFS timestamps as strings throughout; provide utility fns to co…
jzimbel-mbta Oct 1, 2024
119887a
Use Ecto.Enum for GTFS enum fields; convert corresponding CSV fields …
jzimbel-mbta Oct 1, 2024
9dd52f1
Appease dialyzer
jzimbel-mbta Oct 1, 2024
c74b271
Change how int-code and string enum values are defined in the DB
jzimbel-mbta Oct 2, 2024
ed237f6
Oops, forgot to commit auto-generated structure.sql in prev commit
jzimbel-mbta Oct 2, 2024
2a83ffd
structure.sql was out of date somehow, even though migrations were al…
jzimbel-mbta Oct 2, 2024
34c7f2d
Merge branch 'master' into jz-import-gtfs-endpoint--ecto-schemas
jzimbel-mbta Oct 2, 2024
4c205b5
Update mix.lock
jzimbel-mbta Oct 2, 2024
fd60b41
Simplify some `execute/2` calls in GTFS migrations
jzimbel-mbta Oct 3, 2024
ad44969
Install Oban
jzimbel-mbta Oct 3, 2024
6607e6b
Add GTFS import Oban workers; Add logic for job-based GTFS import
jzimbel-mbta Oct 11, 2024
7b0660a
Update/add API endpoints for async GTFS import
jzimbel-mbta Oct 11, 2024
b793df0
Remove `Expires` header from S3 put_object requests, it doesn't do wh…
jzimbel-mbta Oct 15, 2024
f847d4a
Add more comprehensive logging for GTFS import process
jzimbel-mbta Oct 15, 2024
d8d4606
Merge branch 'master' into jz-import-gtfs-endpoint--ecto-schemas
jzimbel-mbta Oct 15, 2024
10f7267
Do not defer optionally-deferrable constraints during GTFS import/val…
jzimbel-mbta Oct 16, 2024
40de6b6
Set queue_target and queue_interval config for DB connection
jzimbel-mbta Oct 16, 2024
08dac72
Fix duplicate import/validation job handling
jzimbel-mbta Oct 16, 2024
1b0ddd1
Add `/api/gtfs/check_jobs` endpoint
jzimbel-mbta Oct 16, 2024
511b278
fix: Use correct module for checking validation jobs
jzimbel-mbta Oct 16, 2024
06a6d6f
Fix route
jzimbel-mbta Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ config :arrow,
shape_storage_enabled?: false,
shape_storage_bucket: "mbta-arrow",
shape_storage_prefix: "shape-uploads/",
shape_storage_request_fn: {ExAws, :request}
shape_storage_request_fn: {ExAws, :request},
gtfs_archive_storage_enabled?: false,
gtfs_archive_storage_bucket: "mbta-arrow",
gtfs_archive_storage_prefix: "gtfs-archive-uploads/",
Whoops marked this conversation as resolved.
Show resolved Hide resolved
gtfs_archive_storage_request_fn: {ExAws, :request}

# Configures the endpoint
config :arrow, ArrowWeb.Endpoint,
Expand All @@ -47,6 +51,12 @@ config :arrow, ArrowWeb.Endpoint,
pubsub_server: Arrow.PubSub,
live_view: [signing_salt: "35DDvOCJ"]

# Configures Oban, the job processing library
config :arrow, Oban,
engine: Oban.Engines.Basic,
queues: [default: 10, gtfs_import: 1],
repo: Arrow.Repo

config :esbuild,
version: "0.17.11",
default: [
Expand Down
4 changes: 3 additions & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ config :arrow, dev_routes: true
# Set prefix env for s3 uploads
config :arrow,
shape_storage_enabled?: true,
shape_storage_prefix_env: "dev/local/"
shape_storage_prefix_env: "dev/local/",
gtfs_archive_storage_enabled?: true,
gtfs_archive_storage_prefix_env: "dev/local/"

# Do not include metadata nor timestamps in development logs
config :logger, :console, format: "[$level] $message\n"
Expand Down
3 changes: 2 additions & 1 deletion config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import Config
# before starting your production server.

config :arrow,
shape_storage_enabled?: true
shape_storage_enabled?: true,
gtfs_archive_storage_enabled?: true

config :arrow, :websocket_check_origin, [
"https://*.mbta.com",
Expand Down
7 changes: 5 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ if config_env() == :prod do
port: port,
pool_size: pool_size,
# password set by `configure` callback below
configure: {Arrow.Repo, :before_connect, []}
configure: {Arrow.Repo, :before_connect, []},
queue_target: 30_000,
queue_interval: 120_000

config :arrow,
shape_storage_prefix_env: System.get_env("S3_PREFIX")
shape_storage_prefix_env: System.get_env("S3_PREFIX"),
gtfs_archive_storage_prefix_env: System.get_env("S3_PREFIX")
end
7 changes: 6 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import Config

config :arrow,
shape_storage_enabled?: false,
shape_storage_request_fn: {Arrow.Mock.ExAws.Request, :request}
shape_storage_request_fn: {Arrow.Mock.ExAws.Request, :request},
gtfs_archive_storage_enabled?: false,
gtfs_archive_storage_request_fn: {Arrow.Mock.ExAws.Request, :request}

# Configure your database
config :arrow, Arrow.Repo,
Expand All @@ -19,6 +21,9 @@ config :arrow, ArrowWeb.Endpoint,

config :arrow, ArrowWeb.AuthManager, secret_key: "test key"

# Prevent Oban from running jobs and plugins during test runs
config :arrow, Oban, testing: :inline

config :ueberauth, Ueberauth,
providers: [
cognito: {Arrow.Ueberauth.Strategy.Fake, []},
Expand Down
2 changes: 2 additions & 0 deletions lib/arrow/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ defmodule Arrow.Application do
{Phoenix.PubSub, name: Arrow.PubSub},
# Start the Ecto repository
Arrow.Repo,
# Start Oban, the job processing library
{Oban, Application.fetch_env!(:arrow, Oban)},
# Start the endpoint when the application starts
ArrowWeb.Endpoint
] ++
Expand Down
157 changes: 157 additions & 0 deletions lib/arrow/gtfs.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
defmodule Arrow.Gtfs do
@moduledoc """
GTFS import logic.
"""

require Logger
alias Arrow.Gtfs.Importable
alias Arrow.Repo

@import_timeout_ms :timer.minutes(10)

@doc """
Loads a GTFS archive into Arrow's gtfs_* DB tables,
replacing the previous archive's data.

Setting `dry_run?` true causes the transaction to be rolled back
instead of committed, even if all queries succeed.

Returns:

- `:ok` on successful import or dry-run, or skipped import due to unchanged version.
- `{:error, reason}` if the import or dry-run failed.
"""
@spec import(Unzip.t(), String.t(), String.t() | nil, Oban.Job.t(), boolean) ::
:ok | {:error, term}
def import(unzip, new_version, current_version, job, dry_run? \\ false) do
Logger.info("GTFS import or validation job starting #{job_logging_params(job)}")

with :ok <- validate_required_files(unzip),
:ok <- validate_version_change(new_version, current_version) do
case import_transaction(unzip, dry_run?) do
{:ok, _} ->
Logger.info("GTFS import success #{job_logging_params(job)}")
:ok

{:error, :dry_run_success} ->
Logger.info("GTFS validation success #{job_logging_params(job)}")
:ok

{:error, reason} = error ->
Logger.warn(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)

error
end
else
:unchanged ->
Logger.info("GTFS import skipped due to unchanged version #{job_logging_params(job)}")

:ok

{:error, reason} = error ->
Logger.warn(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)

error
end
end

defp job_logging_params(job) do
s3_object_key =
job.args
|> Map.fetch!("s3_uri")
|> URI.parse()
|> then(& &1.path)

archive_version = Map.fetch!(job.args, "archive_version")

"job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\" job_worker=#{job.worker}"
end

defp import_transaction(unzip, dry_run?) do
transaction = fn ->
_ = truncate_all()
import_all(unzip)

if dry_run? do
# Set any deferred constraints to run now, instead of on transaction commit,
# since we don't actually commit the transaction in this case.
_ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE")
Repo.rollback(:dry_run_success)
end
end

{elapsed_ms, result} =
fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end
|> :timer.tc(:millisecond)

action = if dry_run?, do: "validation", else: "import"
Logger.info("GTFS archive #{action} transaction completed elapsed_ms=#{elapsed_ms}")

result
end

defp truncate_all do
tables = Enum.map_join(importable_schemas(), ", ", & &1.__schema__(:source))
Repo.query!("TRUNCATE #{tables}")
jzimbel-mbta marked this conversation as resolved.
Show resolved Hide resolved
end

defp import_all(unzip) do
Enum.each(importable_schemas(), &Importable.import(&1, unzip))
end

defp validate_required_files(unzip) do
files =
unzip
|> Unzip.list_entries()
|> MapSet.new(& &1.file_name)

if MapSet.subset?(required_files(), files) do
:ok
else
missing =
MapSet.difference(required_files(), files)
|> Enum.sort()
|> Enum.join(",")

{:error, "GTFS archive is missing required file(s) missing=#{missing}"}
end
end

@spec validate_version_change(String.t(), String.t() | nil) :: :ok | :unchanged
defp validate_version_change(new_version, current_version)

defp validate_version_change(version, version), do: :unchanged
defp validate_version_change(_new_version, _current_version), do: :ok

defp importable_schemas do
# Listed in the order in which they should be imported.
[
Arrow.Gtfs.FeedInfo,
Arrow.Gtfs.Agency,
Arrow.Gtfs.Checkpoint,
Arrow.Gtfs.Level,
Arrow.Gtfs.Line,
Arrow.Gtfs.Service,
Arrow.Gtfs.Calendar,
Arrow.Gtfs.CalendarDate,
Arrow.Gtfs.Stop,
Arrow.Gtfs.Shape,
Arrow.Gtfs.ShapePoint,
Arrow.Gtfs.Route,
Arrow.Gtfs.Direction,
Arrow.Gtfs.RoutePattern,
Arrow.Gtfs.Trip,
Arrow.Gtfs.StopTime
]
end

defp required_files do
importable_schemas()
|> Enum.flat_map(& &1.filenames())
|> MapSet.new()
end
end
40 changes: 40 additions & 0 deletions lib/arrow/gtfs/agency.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Arrow.Gtfs.Agency do
@moduledoc """
Represents a row from agency.txt.

Changeset is intended for use only in CSV imports--
table contents should be considered read-only otherwise.
"""
use Arrow.Gtfs.Schema
import Ecto.Changeset

@type t :: %__MODULE__{
id: String.t(),
name: String.t(),
url: String.t(),
timezone: String.t(),
lang: String.t() | nil,
phone: String.t() | nil
}

schema "gtfs_agencies" do
field :name, :string
field :url, :string
field :timezone, :string
field :lang, :string
field :phone, :string

has_many :routes, Arrow.Gtfs.Route
end

def changeset(agency, attrs) do
attrs = remove_table_prefix(attrs, "agency")

agency
|> cast(attrs, ~w[id name url timezone lang phone]a)
|> validate_required(~w[id name url timezone]a)
end

@impl Arrow.Gtfs.Importable
def filenames, do: ["agency.txt"]
end
Loading
Loading