v0.3 by Epsilon DataLabs
Normandy is a python framework for data pipelines, which main objective is standardizing your team code and provide a data treatment methodology flexible to your team needs.
Normandy is available via PIP using:
pip install normandy
With Normandy you can define your data pipeline system and within it several data flows with each one use or produce different kind of data.
First let's explain the basic Normandy terminology:
-
Flow: Is a complete data process (ETL), you may have any number of flows on a Normandy project and each one may or may not share code or data.
-
Steps: A Normandy flow is composed by any number of steps, this steps are one or more processes, the steps works as a guideline of priorities, processes from one step cannot start before the processes of the previous step has ended.
-
Processes: This are the lower level of granularity in Normandy, each process belong to a step, and all processes from one step are processed in parallel, which mean that they can share the input data but you must be cautious that each process have different outputs.
So, Normandy let you:
-
Create multiple data flows, and share code between then avoiding repeat code.
-
Parallel processing at process level, allowing you, for example, to read from several sources or produce several outputs from the same data set simultaneously.
-
A standardized form of coding.
-
A clean and understandable data pipeline structure.
-
Easily share parameters between processes.
Normandy offer a easy files structure, nevertheless, you may create it using the create project
command with the path where to create the project.
normandy --create-project -file-path write/project/path
If the given path do not exist Normandy will create it.
This command will also create a template of the pipeline_conf.yml
file used to configure Normandy behavior.
The Normandy behavior is configured in the pipeline_conf.yml
file, it has two sections:
-
flows: In this section is defined every aspect of each flow you want to use.
-
confs: Here are settled general configurations like environments.
Each flow has two options that need to be settled, those are tags, and steps.
In the tags sections you must specify with which tags the flow will run, those work to carefully select which flows you want to run in each command.
If several flows have the same tag, all flows with that tag will run when the tag is selected, and they will run in the order they are defined on the pipeline_conf.yml
file.
At least one flow should have the tag default, this flow will run when no tags are selected.
To set the steps is enough to list then with the process they should run.
Here may be defined global parameters as well, this parameters are available at all process.
A configured flow, should looks like:
my_flow:
tags:
- default
- sr1
params:
version: Andromeda
steps:
read:
- read_file
process:
- process_data
finish:
- prints
A step is a individual process or group of processes from a pipeline, this are used to secure order, each process on a step must end before the processes of the next step start.
A step defined in the pipeline_conf.yml
is a reference to a sub folder with the same name on the pipeline
folder, by example, the previously defined flow has three steps: read, process and finish, then the pipeline folder should look like:
- pipeline
- read
- read_file.py
- process
- process_data.py
- finish
- prints.py
- read
Steps have no further configurations excepts the process within it, note that you may have several process (python files) on each step, but if you don´t listed on the step definition it will not run, this allow you to create several flows using the same step, but the step itself in each flow may be different if different processes are called.
The processes are the smaller unit of Normandy pipelines, and here is where your code must be in.
You may define a process on the pipeline_conf.yml
file on two ways:
-
As a list: Use this method if you do not want to add any configuration out of the defaults, you must use the list version even if the step has just one process.
-
As a dictionary: With this method you can make more in depth configurations to the specific process, the allowed configurations are:
-
avoid_tags: With this setup you can told Normandy to do not run this process f a specific tags has been use, this is very useful if you have several flows very similar between then then you can define just one flow and use the tags to shape the flow.
-
error_tolerance: If your process may fail and that does not affect the rest of the data flow, you may activate this feature, so, in case the process fail, the flow would still run.
-
params: If your process need some extra parameters you may listed here as a dictionary, then inside the process you can invoke them as
params["params_name"]
. If the same parameter is defined globally and at a process the last one would be chosen. -
iter_param: this help you if you need to run a process several time but with a different value of a parameters, you must past the name and values to take. Note: this would run as several different processes of the same steps, this mean they will run in parallel.
-
Note that if you want to use one of the special setups on one process of a step, you must specify all others process as a dictionary too, even if in one of the processes no special setup is used, on this case the setup ignore
is used.
A complete example:
my_flow:
tags:
- default
- salarians
params:
version: Andromeda
steps:
read:
- read_file
- read_database
process:
main_processing:
avoid_tags:
- hammerhead
params:
limit: 100
flag: True
version: Trilogy
side_processing:
avoid_tags:
- Shepard
error_tolerance: True
iter_param:
name: locations
values:
- Thessia
- SurKesh
- Rannoch
- Tuchanka
- Palaven
finish:
write_data:
- ignore
backup_writing:
error_tolerance: True
The main objective of this section is to configure your pipeline settings, like environment distinctions.
You are free to make all configurations you need over the environment section of the configuration file, just must use the keyword "envs", in the example below the read and write folder are distinguished based on the environment.
Other settings are:
-
path: This is the project path, and must be defined.
-
threads: Maximum number of threads allowed to run simultaneously, this is used in case a step have several processes, the default value is 8.
-
log_level: Level of detail in log files, may be info, warnings or erros, the default value is info.
confs:
path: your/project/path
threads: 4
log_level: errors
envs:
dev:
read:
data/dev/raw/
write:
data/dev/processed/
prod:
read:
data/prod/raw/
write:
data/prod/processed
As mentioned before, each process listed on the pipeline_conf.yml
file is making a reference to a python file inside the correspondent step folder, whatever, this file must have defined inside it a process function which need to parameter pipe
and log
.
Pipe is a Normandy pipeline object, at the stage is only function is to use the environment configurations defined on pipeline_conf.yml
, to get this use the get_env_confs()
method, which returns a dictionary with the mentioned configurations.
log is a Normandy logger object, the main function is to easily log your code.
Process snippet:
from normandy.engine.variables_storage import variables_storage
def process(pipe, log, params):
# Configurations
env_confs = pipe.get_confs()
var_str = variables_storage()
log.info("Configuration ready")
# Code here your process
# Use params as any dict ej:
# flag = params["flag"]
# ...
return
Normandy provide a fully settled logger.
Each flow execution will have a separated log folder inside the log folder of the project root directory, the folder would be called as {flow_name}_{execution_datetime}
, inside it a bunch of file would be created, one file for each Normandy object, with the intention to let you easily get in depth logs or just a general one. The main flow log would have the logs of each of the children's objects, this means, in the main log would be the flow logs, the steps logs and even the processes logs.
The Normandy logger have three basic methods to use:
-
info
-
warning
-
error
Each one just receive one parameter which is the message you want to log.
Normandy also provide a variable storage object to let you share the data between processes.
It's very easily to use, with just two methods:
-
get: It let you use a previously stored variable
-
update: It let you store a new variable or overwrite it if it already exists.
A usage example:
import pandas as pd
from normandy.engine.variables_storage import variables_storage
def process(pipe, log, params):
# Get confs
read_path = pipe.get_env_confs()["read_path"]
var_str = variables_storage()
log.info("Configuration ready")
try:
main_df = pd.read_csv(f"{read_path}{params["filename"]}.csv")
except Exception as e:
log.error(e)
raise e
log.info("Data read")
var_str.update("main_df", main_df)
log.info("Data stored")
return
In case user defined modules are needed, they can be loaded using the modules_importer
class, this class have the load_module
method, which load the full selected module inside the instance, it's important to take into account that each instance of the class may have just one module loaded at a time.
The functions should be on the tools
folder on the root project directory.
A full project directory should looks like:
- project_name/
- logs/
- pipeline/
- my_first_step/
- my_process_1.py
- my_process_2.py
- my_second_step/
- my_process3.py
- my_process4.py
- my_process5.py
- pipeline_conf.yml
- my_first_step/
- temp/
- tools/
- my_functions.py
If a function named my_function
is defined on my_functions.py
file it can be use following this example:
from normandy.engine.tools import modules_importer
def process(pipe, log, params):
my_module = modules_importer(pipe).load_module("my_functions")
my_module.module.my_function()
return
To run the Normandy pipeline use the command run-pipeline
as below from the project directory:
normandy --run-pipeline
With this command the default flow would run on the defined dev environment.
The run-pipeline
has several arguments as:
-
tags: Specify any number of tags.
-
env: Specify the environment to run on.
-
params: Used to overwrite or define global parameters on all running flows.
-
log level: Overwrite the log_level setting.
-
threads: Overwrite the maximum number of threads setting.
A complete example:
normandy --run-pipeline -tags my_tag -tags sr2 -env prod -param version Trilogy -log-level warnings -threads 16