Skip to content

Commit

Permalink
Add more comprehensive logging for GTFS import process
Browse files Browse the repository at this point in the history
  • Loading branch information
jzimbel-mbta committed Oct 15, 2024
1 parent b793df0 commit f847d4a
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 38 deletions.
95 changes: 64 additions & 31 deletions lib/arrow/gtfs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,75 @@ defmodule Arrow.Gtfs do
- `:ok` on successful import or dry-run, or skipped import due to unchanged version.
- `{:error, reason}` if the import or dry-run failed.
"""
@spec import(Unzip.t(), String.t(), String.t() | nil, boolean) :: :ok | {:error, term}
def import(unzip, new_version, current_version, dry_run? \\ false) do
result =
with :ok <- validate_required_files(unzip),
:ok <- validate_version_change(new_version, current_version) do
case import_transaction(unzip, dry_run?) do
{:ok, _} -> :ok
{:error, :dry_run_success} -> :ok
{:error, _reason} = error -> error
end
@spec import(Unzip.t(), String.t(), String.t() | nil, Oban.Job.t(), boolean) ::
:ok | {:error, term}
def import(unzip, new_version, current_version, job, dry_run? \\ false) do
with :ok <- validate_required_files(unzip),
:ok <- validate_version_change(new_version, current_version) do
case import_transaction(unzip, dry_run?) do
{:ok, _} ->
Logger.info("GTFS import success #{job_logging_params(job)}")
:ok

{:error, :dry_run_success} ->
Logger.info("GTFS validation success #{job_logging_params(job)}")
:ok

{:error, reason} = error ->
Logger.warn(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)

error
end

if result == :unchanged do
Logger.info("GTFS import skipped due to unchanged version version=\"#{current_version}\"")
:ok
else
result
:unchanged ->
Logger.info("GTFS import skipped due to unchanged version #{job_logging_params(job)}")

:ok

{:error, reason} = error ->
Logger.warn(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)

error
end
end

defp job_logging_params(job) do
s3_object_key =
job.args
|> Map.fetch!("s3_uri")
|> URI.parse()
|> then(& &1.path)

archive_version = Map.fetch!(job.args, "archive_version")

"job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\""
end

defp import_transaction(unzip, dry_run?) do
Repo.transaction(
fn ->
_ = Repo.query!("SET CONSTRAINTS ALL DEFERRED")

_ = truncate_all()
import_all(unzip)

if dry_run? do
_ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE")
Repo.rollback(:dry_run_success)
end
end,
timeout: @import_timeout_ms
)
transaction = fn ->
_ = Repo.query!("SET CONSTRAINTS ALL DEFERRED")

_ = truncate_all()
import_all(unzip)

if dry_run? do
_ = Repo.query!("SET CONSTRAINTS ALL IMMEDIATE")
Repo.rollback(:dry_run_success)
end
end

{elapsed_ms, result} =
fn -> Repo.transaction(transaction, timeout: @import_timeout_ms) end
|> :timer.tc(:millisecond)

action = if dry_run?, do: "validation", else: "import"
Logger.info("GTFS archive #{action} transaction completed elapsed_ms=#{elapsed_ms}")

result
end

defp truncate_all do
Expand All @@ -81,8 +115,7 @@ defmodule Arrow.Gtfs do
|> Enum.sort()
|> Enum.join(",")

Logger.warn("GTFS archive is missing required file(s) missing=#{missing}")
:error
{:error, "GTFS archive is missing required file(s) missing=#{missing}"}
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/arrow/gtfs/import_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ defmodule Arrow.Gtfs.ImportWorker do
import Ecto.Query

@impl Oban.Worker
def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}}) do
def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}} = job) do
current_version =
Arrow.Repo.one(
from info in Arrow.Gtfs.FeedInfo, where: info.id == "mbta-ma-us", select: info.version
)

with {:ok, unzip} <- Arrow.Gtfs.Archive.to_unzip_struct(s3_uri) do
Arrow.Gtfs.import(unzip, current_version, new_version)
Arrow.Gtfs.import(unzip, current_version, new_version, job)
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/arrow/gtfs/validation_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ defmodule Arrow.Gtfs.ValidationWorker do
]

@impl Oban.Worker
def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}}) do
def perform(%Oban.Job{args: %{"s3_uri" => s3_uri, "archive_version" => new_version}} = job) do
current_version = "doesn't matter for validation"

with {:ok, unzip} <- Arrow.Gtfs.Archive.to_unzip_struct(s3_uri) do
Arrow.Gtfs.import(unzip, current_version, new_version, true)
Arrow.Gtfs.import(unzip, current_version, new_version, job, true)
end
end

Expand Down
52 changes: 49 additions & 3 deletions lib/arrow_web/controllers/api/gtfs_import_controller.ex
Original file line number Diff line number Diff line change
@@ -1,24 +1,60 @@
defmodule ArrowWeb.API.GtfsImportController do
use ArrowWeb, :controller

require Logger
import Ecto.Query

@type error_tuple :: {:error, term} | {:error, status :: atom, term}

@doc """
When successful, responds with 200 status + JSON body `{"id": integer}` containing the ID of the enqueued job.
When unsuccessful, responds with non-200 status and an error message in plaintext.
"""
def enqueue_import(conn, _) do
enqueue_job(conn, Arrow.Gtfs.ImportWorker)
end

@doc """
When the requested job exists, responds with 200 status + JSON body `{"status": st}` where `st` is one of:
- "queued"
- "executing"
- "success"
- "failure"
- "cancelled"
Responds with 400 status if `id` request param is missing.
Responds with 404 status if no job exists with the requested `id`.
"""
def import_status(conn, params) do
case Map.fetch(params, "id") do
{:ok, id} -> check_status(conn, id, Arrow.Gtfs.ImportWorker, "import")
:error -> send_resp(conn, :bad_request, "missing `id` query parameter")
end
end

@doc """
When successful, responds with 200 status + JSON body `{"id": integer}` containing the ID of the enqueued job.
When unsuccessful, responds with non-200 status and an error message in plaintext.
"""
def enqueue_validation(conn, _) do
enqueue_job(conn, Arrow.Gtfs.ValidationWorker)
end

@doc """
When the requested job exists, responds with 200 status + JSON body `{"status": st}` where `st` is one of:
- "queued"
- "executing"
- "success"
- "failure"
- "cancelled"
Responds with 400 status if `id` request param is missing.
Responds with 404 status if no job exists with the requested `id`.
"""
def validation_status(conn, params) do
case Map.fetch(params, "id") do
{:ok, id} -> check_status(conn, id, Arrow.Gtfs.ValidationWorker, "validation")
Expand All @@ -29,9 +65,15 @@ defmodule ArrowWeb.API.GtfsImportController do
@spec to_resp({:ok, term} | error_tuple, Plug.Conn.t()) :: Plug.Conn.t()
defp to_resp(result, conn) do
case result do
{:ok, value} -> json(conn, value)
{:error, status, message} -> send_resp(conn, status, message)
{:error, message} -> to_resp({:error, :bad_request, message}, conn)
{:ok, value} ->
json(conn, value)

{:error, status, message} ->
Logger.warn("GtfsImportController unsuccessful request message=#{inspect(message)}")
send_resp(conn, status, message)

{:error, message} ->
to_resp({:error, :bad_request, message}, conn)
end
end

Expand All @@ -46,6 +88,10 @@ defmodule ArrowWeb.API.GtfsImportController do

case Oban.insert(changeset) do
{:ok, job} ->
Logger.info(
"Job enqueued for GTFS archive job_id=#{job.id} archive_version=\"#{version}\" worker=#{inspect(worker_mod)}"
)

{:ok, %{id: job.id}}

{:error, reason} ->
Expand Down

0 comments on commit f847d4a

Please sign in to comment.