Skip to content

Commit

Permalink
Added carte xcom support
Browse files Browse the repository at this point in the history
Added carte xcom support.
  • Loading branch information
3pm authored Jul 2, 2024
1 parent 87895d1 commit 223ed08
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
30 changes: 26 additions & 4 deletions airflow_pentaho/operators/carte.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from airflow_pentaho.hooks.carte import PentahoCarteHook

XCOM_RETURN_KEY = 'return_value'

class CarteBaseOperator(BaseOperator):
"""Carte Base Operator"""
Expand All @@ -48,10 +49,18 @@ def _log_logging_string(self, raw_logging_string):
cdata = cdata.group(1) if cdata else raw_logging_string
decoded_lines = zlib.decompress(base64.b64decode(cdata),
16 + zlib.MAX_WBITS)
err_count = 0
output_line = ''
if decoded_lines:
for line in re.compile(r'\r\n|\n|\r').split(
decoded_lines.decode('utf-8')):
if "error" in line.lower():
err_count += 1
self.log.info("Errors: %s", err_count)
self.log.info(line)
if len(line)>0:
output_line = line
return output_line, err_count


class CarteJobOperator(CarteBaseOperator):
Expand All @@ -65,6 +74,7 @@ def __init__(self,
params=None,
pdi_conn_id=None,
level='Basic',
xcom_push=False,
**kwargs):
"""
Execute a Job in a remote Carte server from a PDI repository.
Expand All @@ -80,6 +90,7 @@ def __init__(self,
"""
super().__init__(*args, **kwargs)

self.xcom_push_flag = xcom_push
self.pdi_conn_id = pdi_conn_id
if not self.pdi_conn_id:
self.pdi_conn_id = self.DEFAULT_CONN_ID
Expand Down Expand Up @@ -115,12 +126,16 @@ def execute(self, context): # pylint: disable=unused-argument
status = status_job_rs['jobstatus']
status_desc = status['status_desc']
self.log.info(self.LOG_TEMPLATE, status_desc, self.job, job_id)
self._log_logging_string(status['logging_string'])
output, err_count = self._log_logging_string(status['logging_string'])

if status_desc not in self.END_STATUSES:
self.log.info('Sleeping 5 seconds before ask again')
time.sleep(5)


if self.xcom_push_flag:
self.xcom_push(context, key=XCOM_RETURN_KEY, value=output)
self.xcom_push(context, key='err_count', value=err_count)

if 'error_desc' in status and status['error_desc']:
self.log.error(self.LOG_TEMPLATE, status['error_desc'],
self.job, job_id)
Expand All @@ -130,7 +145,7 @@ def execute(self, context): # pylint: disable=unused-argument
self.log.error(self.LOG_TEMPLATE, status['status_desc'],
self.job, job_id)
raise AirflowException(status['status_desc'])


class CarteTransOperator(CarteBaseOperator):
"""Cart Transformation operator. Runs job on Carte service."""
Expand All @@ -143,6 +158,7 @@ def __init__(self,
params=None,
pdi_conn_id=None,
level='Basic',
xcom_push=False,
**kwargs):
"""
Execute a Transformation in a remote Carte server from a PDI
Expand All @@ -159,6 +175,7 @@ def __init__(self,
"""
super().__init__(*args, **kwargs)

self.xcom_push_flag = xcom_push
self.pdi_conn_id = pdi_conn_id
if not self.pdi_conn_id:
self.pdi_conn_id = self.DEFAULT_CONN_ID
Expand Down Expand Up @@ -196,11 +213,15 @@ def execute(self, context): # pylint: disable=unused-argument
trans_id = status['id']
status_desc = status['status_desc']
self.log.info(self.LOG_TEMPLATE, status_desc, self.trans)
self._log_logging_string(status['logging_string'])
output, err_count = self._log_logging_string(status['logging_string'])

if status_desc not in self.END_STATUSES:
self.log.info('Sleeping 5 seconds before ask again')
time.sleep(5)

if self.xcom_push_flag:
self.xcom_push(context, key=XCOM_RETURN_KEY, value=output)
self.xcom_push(context, key='err_count', value=err_count)

if 'error_desc' in status and status['error_desc']:
self.log.error(self.LOG_TEMPLATE, status['error_desc'], self.trans)
Expand All @@ -209,3 +230,4 @@ def execute(self, context): # pylint: disable=unused-argument
if status_desc in self.ERRORS_STATUSES:
self.log.error(self.LOG_TEMPLATE, status['status_desc'], self.trans)
raise AirflowException(status['status_desc'])

1 change: 1 addition & 0 deletions tests/unit/operators/test_carte_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class TestCarteJobOperator(OperatorTestBase):
def test_execute(self, mock_post, mock_get): # pylint: disable=unused-argument
op = CarteJobOperator(
task_id='test_carte_job_operator',
xcom_push=False,
job='/home/bi/test_job',
level='Debug')

Expand Down
1 change: 1 addition & 0 deletions tests/unit/operators/test_carte_trans.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class TestCarteTransOperator(OperatorTestBase):
def test_execute(self, mock_post, mock_get): # pylint: disable=unused-argument
op = CarteTransOperator(
task_id='test_carte_trans_operator',
xcom_push=False,
trans='/home/bi/test_trans',
level='Debug')

Expand Down

0 comments on commit 223ed08

Please sign in to comment.