Skip to content

Latest commit

 

History

History
224 lines (191 loc) · 8.15 KB

tasks.adoc

File metadata and controls

224 lines (191 loc) · 8.15 KB

Task Guide

Tasks are a type of step that allow an Envelope pipeline to make side effects that are outside of the flow of data through the pipeline. Examples of tasks could include sending emails, or raising alerts, or updating external operational metadata.

Tasks can read in the data from the dependencies of the task step they are defined in, but they can not provide data to subsequent steps.

Envelope provides the exception task implementation. All other usages of a task step must reference a custom developed task.

Exception

The exception task will immediately throw a Java runtime exception. This can be used in testing pipelines to exit the pipeline with a non-zero return code in order to indicate to the test harness that the test produced a failure result. The mandatory message configuration provides the exception message.

Exception task example

...
steps {
  ...
  fail_test {
    type = task
    class = exception
    message = "Test for XYC did not match expected results"
  }
  ...
}

Impala DDL

The impala_ddl task allows users to run DDL operations against Impala (and thus also the Hive Metastore) which are commonly used in ingestion pipelines. Typical usages of this task are to update Impala with information about new tables, data and partitions added as part of the rest of the pipeline. The task can be effectively combined in a batch pipeline with looping steps to add partitions to Parquet and Kudu tables. The task avoids having to manage additional tasks outside of Envelope for metadata operations. Impala security is seamlessly handled in the task; in particular, Kerberos authentication is managed through keytabs in a completely isolated context independent of the wider Spark execution environment.

The following DDL operations are currently supported:

refresh

Update Impala’s metadata about new files in existing tables and partitions

invalidate

Notify Impala of new tables in the Hive metastore

add_partition

Alter a table to add a new HDFS partition or Kudu range partition

drop_partition

Alter a table to drop an HDFS partition or Kudu range partition

The refresh and invalidate operations notify Impala of new files within existing or new tables and partitions and should be used with the usual caution around the frequency of these operations. To keep the overhead of updating Impala metadata low, these operations should be used sparingly.

The *_partition operations allow the user to ensure that new or removed partitions are respected in Impala queries as soon as the pipeline is complete. For HDFS partitions, the task currently expects the user to supply the full partition specification as a string parameter in the task configuration. For Kudu range partitions, the user can supply either a single lower bound as a range partition value, or upper and lower range boundaries. By default range boundaries will be considered to be inclusive on the lower bound and exclusive on the upper, but this behavior is customizable through a range inclusitivity parameter which accepts all four options of lower and upper range inclusivity (inclusive or exclusive). See the configurations guide for full details of all these options.

Some representative examples are presented in the following sections.

Example 1: Inserting data into an un-partitioned table

In the first example we overwrite an existing table with new data and notify Impala of the changes through a metadata invalidation. Here is a relevant snippet, where the Impala DDL operation is defined by the ddl step.

  fsProcess {
    dependencies = [fsInput]
    deriver {
      type = sql
      query.literal = "SELECT id, foo, blah, ymd FROM fsInput"
    }
    planner = {
      type = overwrite
    }
    output = {
      type = filesystem
      path = /tmp/example-output-unpartitioned
      format = parquet
    }
  }
  ddl {
    dependencies = [fsProcess]
    type = task
    class = impala_ddl
    host = ${env.impala}
    query {
      type = "invalidate"
      table = "example_output"
    }
    debug = true
  }

Example 2: Inserting data into a partitioned HDFS table with looping

In this example we use looping to populate two HDFS partitions and update Impala of their existence. Note we perform the DDL after the data has been loaded into HDFS.

steps {
  loop_dates {
    type = loop
    mode = serial
    source = range
    range.start = 20190101
    range.end = 20190102
    parameter = processing_date
  }
  fsInput {
    dependencies = [loop_dates]
    input {
      type = filesystem
      path = "/tmp/example-input/example-input-${processing_date}.json"
      format = json
    }
  }
  fsProcess {
    dependencies = [fsInput, loop_dates]
    deriver {
      type = sql
      query.literal = "SELECT id,foo,blah,ymd FROM fsInput_${processing_date}"
    }
    planner = {
      type = overwrite
    }
    output = {
      type = filesystem
      path = "/tmp/example-output-partitioned/ymd=${processing_date}"
      format = parquet
    }
  }
  ddl {
    dependencies = [fsProcess, loop_dates]
    type = task
    class = impala_ddl
    host = ${env.impala}
    query {
      type = "add_partition"
      table = "example_output_part"
      partition.spec = "ymd=${processing_date}"
    }
    auth = kerberos
    krb-keytab = ${env.kerberos.keytab}
    krb-user-principal = ${env.kerberos.principal}
    debug = true
  }
}

Example 3: Inserting data into a partitioned Kudu table with looping

In this example we use looping to populate two Kudu range partitions and update Impala of their existence. Note we perform the DDL before the data has been loaded into Kudu to ensure it is written to the right tablets.

steps {
  loop_dates {
    type = loop
    mode = serial
    source = range
    range.start = 20190101
    range.end = 20190102
    parameter = processing_date
  }
  ddl {
    dependencies = [loop_dates]
    type = task
    class = impala_ddl
    host = ${env.impala}
    query {
      type = "add_partition"
      table = "example_output_kudu"
      partition.range.value = ${processing_date}
    }
    debug = true
  }
  fsInput {
    dependencies = [ddl, loop_dates]
    input {
      type = filesystem
      path = "/tmp/example-input/example-input-${processing_date}.json"
      format = json
    }
  }
  fsProcess {
    dependencies = [fsInput, loop_dates]
    deriver {
      type = sql
      query.literal = "SELECT * FROM fsInput_${processing_date}"
    }
    planner = {
      type = upsert
    }
    output = ${env.kudu} {
      type = kudu
      table.name = "impala::default.example_output_kudu"
    }
  }
}

Custom tasks

A custom task must implement the Task interface. The configure method receives the configuration of the task from the pipeline, and the run method is an execution of the task.

All tasks run in the driver process and so they should be designed to be fast, require little memory, and if they read the contents of dependency DataFrames then the data should be very small.

To develop a custom task:

  1. Create a new Java or Scala project that includes a dependency on the Envelope version you are using.

  2. Create an implementation of the Task interface (under com.cloudera.labs.envelope.task).

  3. Optionally, if you want to use an alias in your Envelope configuration files, first implement the ProvidesAlias interface, and next add a file to your project at location 'META-INF/services/com.cloudera.labs.envelope.task.Task' with your task’s fully qualified class name as the file contents.

  4. Compile your project to a jar. You do not need to include Envelope or Spark in your compiled artifact.

  5. Add task step to your pipeline by setting type to task, class to the fully qualified class name of your task, and any other configurations for your task.

  6. Run the pipeline with your task similarly to:

    spark-submit --jars yourtask.jar envelope-*.jar yourpipeline.conf
Note
CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.

Custom task example

application {
  name = Custom task
  ...
}

steps {
  ...
  run_task {
    dependencies = [...]
    type = task
    class = com.example.CustomTask
    value = hello
  }
  ...
}