Skip to content

Commit

Permalink
Updates python client example (OpenLineage#3086)
Browse files Browse the repository at this point in the history
* Update Python Client example code

The python client was using deprecated code paths.
This updates to the latest client objects.

Signed-off-by: Stefan Krawczyk <stefan@dagworks.io>

* Fixes missed code

Signed-off-by: Stefan Krawczyk <stefan@dagworks.io>

---------

Signed-off-by: Stefan Krawczyk <stefan@dagworks.io>
  • Loading branch information
skrawcz authored Sep 14, 2024
1 parent b1275a7 commit 047fbf8
Showing 1 changed file with 87 additions and 58 deletions.
145 changes: 87 additions & 58 deletions website/docs/client/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,15 @@ In `generate_events.py`, import the Python client and the methods needed to crea

```python
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job, Dataset
from openlineage.client.event_v2 import (
Dataset,
InputDataset,
Job,
OutputDataset,
Run,
RunEvent,
RunState,
)
from openlineage.client.uuid import generate_new_uuid
from datetime import datetime
```
Expand Down Expand Up @@ -396,16 +404,18 @@ job = Job(namespace="food_delivery", name="example.order_data")

To create a run object you’ll need to specify a unique ID:
```python
run = Run(str(generate_new_uuid()))
run = Run(runId=str(generate_new_uuid()))
```

a START run event:
```python
client.emit(
RunEvent(
RunState.START,
datetime.now().isoformat(),
run, job, producer
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
run=run,
job=job,
producer=producer,
)
)
```
Expand All @@ -414,9 +424,9 @@ and, finally, a COMPLETE run event:
```python
client.emit(
RunEvent(
RunState.COMPLETE,
datetime.now().isoformat(),
run, job, producer,
eventType=RunState.COMPLETE,
eventTime=datetime.now().isoformat(),
run=run, job=job, producer=producer,
inputs=[inventory],
outputs=[menus, orders],
)
Expand All @@ -440,51 +450,64 @@ When you click on the job, you will see a new map displaying the job, input and

```python
#!/usr/bin/env python3
from openlineage.client.run import (
RunEvent,
RunState,
Run,
Job,
from datetime import datetime, timedelta, timezone
from random import random
from openlineage.client.client import OpenLineageClient, OpenLineageClientOptions
from openlineage.client.event_v2 import (
Dataset,
OutputDataset,
InputDataset,
Job,
OutputDataset,
Run,
RunEvent,
RunState,
)
from openlineage.client.client import OpenLineageClient, OpenLineageClientOptions
from openlineage.client.facet import (
SqlJobFacet,
SchemaDatasetFacet,
SchemaField,
OutputStatisticsOutputDatasetFacet,
SourceCodeLocationJobFacet,
NominalTimeRunFacet,
DataQualityMetricsInputDatasetFacet,
ColumnMetric,
from openlineage.client.facet_v2 import (
nominal_time_run,
schema_dataset,
source_code_location_job,
sql_job,
)
from openlineage.client.uuid import generate_new_uuid
from datetime import datetime, timezone, timedelta
import time
from random import random
PRODUCER = f"https://github.com/openlineage-user"
PRODUCER = "https://github.com/openlineage-user"
namespace = "python_client"
dag_name = "user_trends"
# update to your host
url = "http://mymarquez.host:5000"
api_key = "1234567890ckcu028rzu5l"
client = OpenLineageClient(
url=url,
# optional api key in case marquez requires it. When running marquez in
# your local environment, you usually do not need this.
# optional api key in case marquez requires it. When running marquez in
# your local environment, you usually do not need this.
options=OpenLineageClientOptions(api_key=api_key),
)
# If you want to log to a file instead of Marquez
# from openlineage.client import OpenLineageClient
# from openlineage.client.transport.file import FileConfig, FileTransport
#
# file_config = FileConfig(
# log_file_path="ol.json",
# append=True,
# )
#
# client = OpenLineageClient(transport=FileTransport(file_config))
# generates job facet
def job(job_name, sql, location):
facets = {"sql": SqlJobFacet(sql)}
facets = {"sql": sql_job.SQLJobFacet(query=sql)}
if location != None:
facets.update(
{"sourceCodeLocation": SourceCodeLocationJobFacet("git", location)}
{
"sourceCodeLocation": source_code_location_job.SourceCodeLocationJobFacet(
"git", location
)
}
)
return Job(namespace=namespace, name=job_name, facets=facets)
Expand All @@ -494,8 +517,9 @@ def run(run_id, hour):
return Run(
runId=run_id,
facets={
"nominalTime": NominalTimeRunFacet(
nominalStartTime=f"2022-04-14T{twoDigits(hour)}:12:00Z"
"nominalTime": nominal_time_run.NominalTimeRunFacet(
nominalStartTime=f"2022-04-14T{twoDigits(hour)}:12:00Z",
# nominalEndTime=None
)
},
)
Expand All @@ -507,21 +531,26 @@ def dataset(name, schema=None, ns=namespace):
facets = {}
else:
facets = {"schema": schema}
return Dataset(namespace, name, facets)
return Dataset(namespace=ns, name=name, facets=facets)
# generates output dataset
def outputDataset(dataset, stats):
output_facets = {"stats": stats, "outputStatistics": stats}
return OutputDataset(dataset.namespace, dataset.name, dataset.facets, output_facets)
return OutputDataset(dataset.namespace,
dataset.name,
facets=dataset.facets,
outputFacets=output_facets)
# generates input dataset
def inputDataset(dataset, dq):
input_facets = {
"dataQuality": dq,
}
return InputDataset(dataset.namespace, dataset.name, dataset.facets, input_facets)
return InputDataset(dataset.namespace, dataset.name,
facets=dataset.facets,
inputFacets=input_facets)
def twoDigits(n):
Expand Down Expand Up @@ -567,12 +596,8 @@ def runEvents(job_name, sql, inputs, outputs, hour, min, location, duration):
# add run event to the events list
def addRunEvents(
events, job_name, sql, inputs, outputs, hour, minutes, location=None, duration=2
):
(start, complete) = runEvents(
job_name, sql, inputs, outputs, hour, minutes, location, duration
)
def addRunEvents(events, job_name, sql, inputs, outputs, hour, minutes, location=None, duration=2):
(start, complete) = runEvents(job_name, sql, inputs, outputs, hour, minutes, location, duration)
events.append(start)
events.append(complete)
Expand All @@ -581,43 +606,46 @@ events = []
# create dataset data
for i in range(0, 5):
user_counts = dataset("tmp_demo.user_counts")
user_history = dataset(
"temp_demo.user_history",
SchemaDatasetFacet(
schema_dataset.SchemaDatasetFacet(
fields=[
SchemaField(name="id", type="BIGINT", description="the user id"),
SchemaField(
schema_dataset.SchemaDatasetFacetFields(
name="id", type="BIGINT", description="the user id"
),
schema_dataset.SchemaDatasetFacetFields(
name="email_domain", type="VARCHAR", description="the user id"
),
SchemaField(name="status", type="BIGINT", description="the user id"),
SchemaField(
schema_dataset.SchemaDatasetFacetFields(
name="status", type="BIGINT", description="the user id"
),
schema_dataset.SchemaDatasetFacetFields(
name="created_at",
type="DATETIME",
description="date and time of creation of the user",
),
SchemaField(
schema_dataset.SchemaDatasetFacetFields(
name="updated_at",
type="DATETIME",
description="the last time this row was updated",
),
SchemaField(
schema_dataset.SchemaDatasetFacetFields(
name="fetch_time_utc",
type="DATETIME",
description="the time the data was fetched",
),
SchemaField(
schema_dataset.SchemaDatasetFacetFields(
name="load_filename",
type="VARCHAR",
description="the original file this data was ingested from",
),
SchemaField(
schema_dataset.SchemaDatasetFacetFields(
name="load_filerow",
type="INT",
description="the row number in the original file",
),
SchemaField(
schema_dataset.SchemaDatasetFacetFields(
name="load_timestamp",
type="DATETIME",
description="the time the data was ingested",
Expand All @@ -628,10 +656,10 @@ for i in range(0, 5):
)
create_user_counts_sql = """CREATE OR REPLACE TABLE TMP_DEMO.USER_COUNTS AS (
SELECT DATE_TRUNC(DAY, created_at) date, COUNT(id) as user_count
FROM TMP_DEMO.USER_HISTORY
GROUP BY date
)"""
SELECT DATE_TRUNC(DAY, created_at) date, COUNT(id) as user_count
FROM TMP_DEMO.USER_HISTORY
GROUP BY date
)"""
# location of the source code
location = "https://github.com/some/airflow/dags/example/user_trends.py"
Expand All @@ -656,6 +684,7 @@ for event in events:
print(Serde.to_json(event))
# time.sleep(1)
client.emit(event)
```
The resulting lineage events received by Marquez would look like this.

Expand Down

0 comments on commit 047fbf8

Please sign in to comment.