diff --git a/website/docs/client/python.md b/website/docs/client/python.md index d3a20f481b..72d127bb24 100644 --- a/website/docs/client/python.md +++ b/website/docs/client/python.md @@ -358,7 +358,15 @@ In `generate_events.py`, import the Python client and the methods needed to crea ```python from openlineage.client import OpenLineageClient -from openlineage.client.run import RunEvent, RunState, Run, Job, Dataset +from openlineage.client.event_v2 import ( + Dataset, + InputDataset, + Job, + OutputDataset, + Run, + RunEvent, + RunState, +) from openlineage.client.uuid import generate_new_uuid from datetime import datetime ``` @@ -396,16 +404,18 @@ job = Job(namespace="food_delivery", name="example.order_data") To create a run object you’ll need to specify a unique ID: ```python -run = Run(str(generate_new_uuid())) +run = Run(runId=str(generate_new_uuid())) ``` a START run event: ```python client.emit( RunEvent( - RunState.START, - datetime.now().isoformat(), - run, job, producer + eventType=RunState.START, + eventTime=datetime.now().isoformat(), + run=run, + job=job, + producer=producer, ) ) ``` @@ -414,9 +424,9 @@ and, finally, a COMPLETE run event: ```python client.emit( RunEvent( - RunState.COMPLETE, - datetime.now().isoformat(), - run, job, producer, + eventType=RunState.COMPLETE, + eventTime=datetime.now().isoformat(), + run=run, job=job, producer=producer, inputs=[inventory], outputs=[menus, orders], ) @@ -440,51 +450,64 @@ When you click on the job, you will see a new map displaying the job, input and ```python #!/usr/bin/env python3 -from openlineage.client.run import ( - RunEvent, - RunState, - Run, - Job, +from datetime import datetime, timedelta, timezone +from random import random + +from openlineage.client.client import OpenLineageClient, OpenLineageClientOptions +from openlineage.client.event_v2 import ( Dataset, - OutputDataset, InputDataset, + Job, + OutputDataset, + Run, + RunEvent, + RunState, ) -from openlineage.client.client import OpenLineageClient, OpenLineageClientOptions -from openlineage.client.facet import ( - SqlJobFacet, - SchemaDatasetFacet, - SchemaField, - OutputStatisticsOutputDatasetFacet, - SourceCodeLocationJobFacet, - NominalTimeRunFacet, - DataQualityMetricsInputDatasetFacet, - ColumnMetric, +from openlineage.client.facet_v2 import ( + nominal_time_run, + schema_dataset, + source_code_location_job, + sql_job, ) from openlineage.client.uuid import generate_new_uuid -from datetime import datetime, timezone, timedelta -import time -from random import random -PRODUCER = f"https://github.com/openlineage-user" +PRODUCER = "https://github.com/openlineage-user" namespace = "python_client" dag_name = "user_trends" +# update to your host url = "http://mymarquez.host:5000" api_key = "1234567890ckcu028rzu5l" client = OpenLineageClient( url=url, - # optional api key in case marquez requires it. When running marquez in - # your local environment, you usually do not need this. + # optional api key in case marquez requires it. When running marquez in + # your local environment, you usually do not need this. options=OpenLineageClientOptions(api_key=api_key), ) +# If you want to log to a file instead of Marquez +# from openlineage.client import OpenLineageClient +# from openlineage.client.transport.file import FileConfig, FileTransport +# +# file_config = FileConfig( +# log_file_path="ol.json", +# append=True, +# ) +# +# client = OpenLineageClient(transport=FileTransport(file_config)) + + # generates job facet def job(job_name, sql, location): - facets = {"sql": SqlJobFacet(sql)} + facets = {"sql": sql_job.SQLJobFacet(query=sql)} if location != None: facets.update( - {"sourceCodeLocation": SourceCodeLocationJobFacet("git", location)} + { + "sourceCodeLocation": source_code_location_job.SourceCodeLocationJobFacet( + "git", location + ) + } ) return Job(namespace=namespace, name=job_name, facets=facets) @@ -494,8 +517,9 @@ def run(run_id, hour): return Run( runId=run_id, facets={ - "nominalTime": NominalTimeRunFacet( - nominalStartTime=f"2022-04-14T{twoDigits(hour)}:12:00Z" + "nominalTime": nominal_time_run.NominalTimeRunFacet( + nominalStartTime=f"2022-04-14T{twoDigits(hour)}:12:00Z", + # nominalEndTime=None ) }, ) @@ -507,13 +531,16 @@ def dataset(name, schema=None, ns=namespace): facets = {} else: facets = {"schema": schema} - return Dataset(namespace, name, facets) + return Dataset(namespace=ns, name=name, facets=facets) # generates output dataset def outputDataset(dataset, stats): output_facets = {"stats": stats, "outputStatistics": stats} - return OutputDataset(dataset.namespace, dataset.name, dataset.facets, output_facets) + return OutputDataset(dataset.namespace, + dataset.name, + facets=dataset.facets, + outputFacets=output_facets) # generates input dataset @@ -521,7 +548,9 @@ def inputDataset(dataset, dq): input_facets = { "dataQuality": dq, } - return InputDataset(dataset.namespace, dataset.name, dataset.facets, input_facets) + return InputDataset(dataset.namespace, dataset.name, + facets=dataset.facets, + inputFacets=input_facets) def twoDigits(n): @@ -567,12 +596,8 @@ def runEvents(job_name, sql, inputs, outputs, hour, min, location, duration): # add run event to the events list -def addRunEvents( - events, job_name, sql, inputs, outputs, hour, minutes, location=None, duration=2 -): - (start, complete) = runEvents( - job_name, sql, inputs, outputs, hour, minutes, location, duration - ) +def addRunEvents(events, job_name, sql, inputs, outputs, hour, minutes, location=None, duration=2): + (start, complete) = runEvents(job_name, sql, inputs, outputs, hour, minutes, location, duration) events.append(start) events.append(complete) @@ -581,43 +606,46 @@ events = [] # create dataset data for i in range(0, 5): - user_counts = dataset("tmp_demo.user_counts") user_history = dataset( "temp_demo.user_history", - SchemaDatasetFacet( + schema_dataset.SchemaDatasetFacet( fields=[ - SchemaField(name="id", type="BIGINT", description="the user id"), - SchemaField( + schema_dataset.SchemaDatasetFacetFields( + name="id", type="BIGINT", description="the user id" + ), + schema_dataset.SchemaDatasetFacetFields( name="email_domain", type="VARCHAR", description="the user id" ), - SchemaField(name="status", type="BIGINT", description="the user id"), - SchemaField( + schema_dataset.SchemaDatasetFacetFields( + name="status", type="BIGINT", description="the user id" + ), + schema_dataset.SchemaDatasetFacetFields( name="created_at", type="DATETIME", description="date and time of creation of the user", ), - SchemaField( + schema_dataset.SchemaDatasetFacetFields( name="updated_at", type="DATETIME", description="the last time this row was updated", ), - SchemaField( + schema_dataset.SchemaDatasetFacetFields( name="fetch_time_utc", type="DATETIME", description="the time the data was fetched", ), - SchemaField( + schema_dataset.SchemaDatasetFacetFields( name="load_filename", type="VARCHAR", description="the original file this data was ingested from", ), - SchemaField( + schema_dataset.SchemaDatasetFacetFields( name="load_filerow", type="INT", description="the row number in the original file", ), - SchemaField( + schema_dataset.SchemaDatasetFacetFields( name="load_timestamp", type="DATETIME", description="the time the data was ingested", @@ -628,10 +656,10 @@ for i in range(0, 5): ) create_user_counts_sql = """CREATE OR REPLACE TABLE TMP_DEMO.USER_COUNTS AS ( - SELECT DATE_TRUNC(DAY, created_at) date, COUNT(id) as user_count - FROM TMP_DEMO.USER_HISTORY - GROUP BY date - )""" + SELECT DATE_TRUNC(DAY, created_at) date, COUNT(id) as user_count + FROM TMP_DEMO.USER_HISTORY + GROUP BY date + )""" # location of the source code location = "https://github.com/some/airflow/dags/example/user_trends.py" @@ -656,6 +684,7 @@ for event in events: print(Serde.to_json(event)) # time.sleep(1) client.emit(event) + ``` The resulting lineage events received by Marquez would look like this.