Skip to content

Rocket Job Tabular batch processing

Reid Morrison edited this page Feb 20, 2017 · 6 revisions

Very often data that is being received is in a format very similar to that of a spreadsheet with rows and columns. Usually the first row is the header that describes what each column contains. The remaining rows are the actual data for processing.

In batch processing it is common to process CSV (Comma Separated Values) files. Tabular processing is built into Rocket Job Pro.

Tabular mix-in allows a job to transparently process the following input types:

  • CSV line / String
  • Array
  • Hash
  • Excel (via iostreams)

In Rocket Job batch processing the processing of each line from the CSV is passed as-is to the worker instead of parsing the CSV line when the file is loaded. This results in much higher performance since each worker parses the CSV line as part of its processing. If the CSV parsing was done while the file was being loaded, the single threaded upload process would be much slower due to the overhead of parsing the entire CSV file.

Later we want to use the same job to process data that is read from another source, such as a database. Without the Tabular mix-ins we would have to create CSV data as input into the original job. Since the Tabular mix-in is used we can just send the job an Array of values and it will seamlessly handle the input data.

Tabular

Common handling for efficiently processing tabular data such as CSV, spreadsheet or other table based files on a line by line basis.

Tabular information consists of a table of data where the first row is usually the header, and subsequent rows are the data elements.

Tabular can be used standalone to process files, for example using the default CSV parser:

tabular = RocketJob::Batch::Tabular.new

A header is required for both input and output data rendering.

Parse a header row / line:

tabular.parse_header("first field,Second,thirD")
# => ["first field", "Second", "thirD"]

Apply common cleansing rules to the header:

tabular.cleanse_header!
# => ["first_field", "second", "third"]

Return the input data as a Hash after applying the header to it:

tabular.as_hash("1,2,3")
# => {"first_field"=>"1", "second"=>"2", "third"=>"3"}

tabular.as_hash([1, 2, 3])
# => {"first_field"=>1, "second"=>2, "third"=>3}

Render processed data for output in CSV form:

tabular.render([5, 6, 9])
# => "5,6,9"

tabular.render({"third" => "3", "first_field" => "1"})
# => "1,,3"

Tabular supports any parser that responds to #parse and #render, see RocketJob::Utility::CSVRow for an example.

Tabular Input

When a job only receives Tabular data as input for processing, but does not output any tabular data.

class CSVJob < RocketJob::Job
  include RocketJob::Plugins::Batch
  include RocketJob::Plugins::Batch::Tabular::Input

  def perform(record)
    puts record.inspect
  end
end

job = CSVJob.new
job.upload('data.csv')

# Run the job now in this process
job.perform_now

# Or, queue it up for processing
job.save!

Tabular Output

When a job needs to output Tabular data.

class TabularJob < RocketJob::Job
  include RocketJob::Plugins::Batch
  include RocketJob::Plugins::Batch::Tabular::Output

  self.slice_size            = 1
  self.collect_output        = true

  # Specify the output header.
  self.tabular_output_header = ['First', 'Second', 'Third']

  # Specify the stage at which to write the output header,
  # or use a custom method to calculate a custom output header.
  before_batch :tabular_output_write_header

  # Add 1 to First, 2 to Second and 3 to Third
  def perform(record)
    [record[0].to_i + 1, record[1].to_i + 2, record[2].to_i + 3]
  end
end

job = TabularJob.new
job.upload do |stream|
  stream << [1, 2, 3]
  stream << [9, 1, 5]
  stream << [4, 5, 2]
end
job.perform_now
job

ios = StringIO.new
job.download(ios)
puts ios.string

Tabular (Input and Output)

class InputOutputJob < RocketJob::Job
  include RocketJob::Plugins::Batch
  include RocketJob::Plugins::Batch::Tabular
  
  self.tabular_output_header = ['First', 'Second', 'Third']
  
  # OR, dynamically set the output header based on the input header
  
  # Calculate the output when not already supplied
  # Return the output header
  # By Default it just returns the input header, override to replace
  def tabular_output_set_header
    if tabular_output_header.present?
      tabular_output_header
    else
      self.tabular_output_header = tabular_input_header
    end
  end
  
end