Skip to content

Latest commit

 

History

History
110 lines (79 loc) · 9.61 KB

README.md

File metadata and controls

110 lines (79 loc) · 9.61 KB

Events pipeline

This repository contains the events pipeline, a simple pipeline which extracts summarized events from various datasets we produce.

Running

Dependencies

You just need docker and docker-compose in your machine to run the pipeline. No other dependency is required.

Setup

The pipeline reads it's input from BigQuery, so you need to first authenticate with your google cloud account inside the docker images. To do that, you need to run this command and follow the instructions:

docker-compose run gcloud auth login

Configuration

This pipeline defines multiple dags, one for each event type, and uses a hierarchical scheme for defining configuration values. Each setting described here may be defined inside the pipe_events airflow JSON variable or the specific pipe_events.EVENT_TYPE airflow JSON variable, which may define or override certain settings for a particular event type dag. For example, you may define the postgres connection settings on pipe_events.postgres_connection_string, which means this setting will apply to all event type dags. You could also define pipe_events.anchorages.postgres_connection_string to override the connectin string but just for the anchorages dag.

In the documentation that follows, the settings are defined at the pipe_events level when they are designed to be generic stuff you most likely want to setup once for all event type dags, or at the pipe_events.EVENT_TYPE level when it's stuff intended to be setup for each event type. This is just a recommendation though, and nothing prevents you from setting an individual configuration on the parent or child levels if you need to. An example where this is useful is on the publish_to_postgres setting, which enables publishing the events to postgres to be consumed by the different API projects. This setting may be set to false at pipe_events.publish_to_postgres, meaning no event type will be published, or be set to true on pipe_events.publish_to_postgres and false on pipe_events.fishing.publish_to_postgres, so that all the event types except fishing are published to the postgres DB.

The pipeline exposes the following standard settings:

  • pipe_events.docker_run: Command to run docker inside the airflow server.
  • pipe_events.project_id: Google cloud project id containing all the resources that running this pipeline requires.
  • pipe_events.temp_bucket: GCS bucket where temp files may be stored to.
  • pipe_events.pipeline_bucket: GCS bucket where all final files generated by this pipeline may be stored to.
  • pipe_events.pipeline_dataset: BigQuery dataset containing various tables used in this pipeline.

The following are global airflow variables, not scoped to pipe_events:

  • INFERENCE_BUFFER_DAYS: Global airflow variable which determines the amount of days to reprocess because nnet scores might be avaialble. Defaults to 7.
  • FLEXIBLE_OPERATOR: Global airflow variable which determines the operator that will be used to process the events, the possible values could be bash or kubernetes. Defaults to bash.

The following are settings intended to be global to all the events pipelines:

  • pipe_events.postgres_instance: CloudSQL postgres instance where the data is published to. No default provided.
  • pipe_events.postgres_connection_string: Connection string for the postgres database to publish the events to. No default provided.
  • pipe_events.postgres_table: Table in postgres to publish the events to. No default provided.
  • pipe_events.source_dataset: BigQuery dataset which contains the different tables that are read to produce the summarized events. Defaults to PIPELINE_DATASET.
  • pipe_events.events_dataset: BigQuery dataset which will contain the published events. Defaults to EVENTS_DATASET.
  • pipe_events.publish_to_postgres: Flag to decide if the results are published to posgres. Defaults to false.
  • pipe_events.all_vessels_table: BigQuery dataset and table for the vessel database all vessels. Defaults to vessel_database.all_vessels_v20191001.
  • pipe_events.spatial_measures_table: BigQuery dataset and table for the spatial measures. Defaults to pipe_static.spatial_measures_20181025.
  • pipe_events.country_codes_table: BigQuery dataset and table for the country codes. Defaults to gfw_research.country_codes.
  • pipe_events.named_anchorages_table: BigQuery dataset and table for the named anchorages. Defaults to anchorages.named_anchorages_v20190827.
  • pipe_events.voyages: BigQuery table to read voyages from. Defaults to voyages
  • pipe_events.vessel_info: BigQuery table to read vessel information from. Defaults to vessel_info.
  • pipe_events.segment_vessel: BigQuery table containing segment vessel information. Defaults to segment_vessel.
  • pipe_events.segment_info: BigQuery table containing segment information. Defaults to segment_info.

The following are anchorages specific settings:

  • pipe_events.anchorages.source_table: BigQuery table containing the scored messages to read from. Defaults to port_events_.
  • pipe_events.anchorages.source_filter: Optional filter applied to the source table to restrict which records are processed by default. Defaults to an empty value, might be set to any WHERE clause condition, such as vessel_id IN (...).
  • pipe_events.anchorages.enabled: Setting which enables or disables the anchorages pipeline. Defaults to false.
  • pipe_events.anchorages.events_table: Table to push results to. Defaults to published_events_ports.

The following are carrier encounter specific settings:

  • pipe_events.carrier_encounters.source_table: BigQuery table containing the scored messages to read from. Defaults to encounters.
  • pipe_events.carrier_encounters.source_filter: Optional filter applied to the source table to restrict which records are processed by default. Defaults to an empty value, might be set to any WHERE clause condition, such as vessel_id IN (...).
  • pipe_events.carrier_encounters.max_median_speed_knots: Maximum median speed in knots for the encounters to be included in the results. Defaults to 2.
  • pipe_events.carrier_encounters.enabled: Setting which enables or disables the carrier encounters pipeline. Defaults to false.
  • pipe_events.carrier_encounters.fishing_vessels_table: BigQuery dataset and table containing the list of fishing vessels to use. Only encounters between a carrier and one of these fishing vessels are going to be included in the results. Defaults to proj_carrier_portal_pew.carrier_portal_fishing_vessels_v20190916.
  • pipe_events.carrier_encounters.events_table: Table to push results to. Defaults to published_events_encounters.

The following are encounter specific settings:

  • pipe_events.encounters.source_table: BigQuery table containing the scored messages to read from. Defaults to encounters.
  • pipe_events.encounters.source_filter: Optional filter applied to the source table to restrict which records are processed by default. Defaults to an empty value, might be set to any WHERE clause condition, such as vessel_id IN (...).
  • pipe_events.encounters.max_median_speed_knots: Maximum median speed in knots for the encounters to be included in the results. Defaults to 2.
  • pipe_events.encounters.enabled: Setting which enables or disables the encounters pipeline. Defaults to false.
  • pipe_events.encounters.events_table: BigQuery table to publish the encounters to. Defaults to published_events_encounters.

The following are fishing specific settings:

  • pipe_events.fishing.source_table: BigQuery table containing the scored messages to read from. Defaults to messages_scored_.
  • pipe_events.fishing.source_filter: Optional filter applied to the source table to restrict which records are processed by default. Defaults to an empty value, might be set to any WHERE clause condition, such as vessel_id IN (...).
  • pipe_events.fishing.min_event_duration: BigQuery table containing the minimum amount of seconds to consider a fishing event actually a fishing event. Defaults to 300.
  • pipe_events.fishing.enabled: Setting which enables or disables the fishing events pipeline. Defaults to false.
  • pipe_events.fishing.events_table: BigQuery table to publish the fishing events to. Defaults to published_events_fishing.

The following are gap specific settings:

  • pipe_events.gaps.source_table: BigQuery table to read gaps from. Defaults to position_messages_.
  • pipe_events.gaps.source_filter: Optional filter applied to the source table to restrict which records are processed by default. Defaults to an empty value, might be set to any WHERE clause condition, such as vessel_id IN (...).
  • pipe_events.gaps.events_table: BigQuery table to publish gap events to. Defaults to published_events_gaps.
  • pipe_events.gaps.gap_min_pos_count: Only consider segments with a given minimum amount of positions. Defaults to 3.
  • pipe_events.gaps.gap_min_dist: Minimum distance to shore to consider the transponder off events. Defaults to 10000.
  • pipe_events.gaps.enabled: Setting which enables or disables the gap events pipeline. Defaults to false.

License

Copyright 2017 Global Fishing Watch

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.