diff --git a/README.md b/README.md index c3da23f..e21ab37 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,6 @@ The following resources need attention: | `IPFS_TYPE` | IPFS library to use. 'CLUSTER' to use ipfs-cluster, 'CLIENT' to use ipfs-client (default) | | `IPFS_OUTPUT`, `IPFS_ADMINLOGS` | IPFS gateway to upload the output data (algorithm logs & algorithm output) and admin logs (logs from pod-configure & pod-publish)| | `IPFS_OUTPUT_PREFIX`, `IPFS_ADMINLOGS_PREFIX` | Prefix used for the results files (see below) | -| `IPFS_EXPIRY_TIME` | Default expiry time for ipfs (see https://github.com/ipfs/ipfs-cluster/blob/dbca14e83295158558234e867477ce07a523b81b/CHANGELOG.md#rest-api-2_), with an expected value in Go's time format, i.e. 12h (optional) | `IPFS_API_KEY`, `IPFS_API_CLIENT ` | IPFS API Key and Client ID for authentication purpose (optional) | | `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION` | S3 credentials for the logs and output buckets. | | `AWS_BUCKET_OUTPUT` | Bucket that will hold the output data (algorithm logs & algorithm output). | @@ -104,6 +103,9 @@ The following resources need attention: | `NOTIFY_STOP_URL` | URL to call when a new job ends. | | `SERVICE_ACCOUNT` | K8 service account to run pods (same as the one used in deployment). Defaults to db-operator| | `NODE_SELECTOR` | K8 node selector (if defined) | +| `PULL_SECRET` | ImagesPullSecret (if defined) (see https://kubernetes.io/docs/concepts/containers/images/#referring-to-an-imagepullsecrets-on-a-pod)| +| `PULL_POLICY` | imagePullPolicy (if defined) (see https://kubernetes.io/docs/concepts/configuration/overview/#container-images)| +| `FILTERING_CONTAINER` | Filtering pod image to use for filtering (if defined)| @@ -113,12 +115,16 @@ The following resources need attention: ## Usage of IPFS_OUTPUT and IPFS_OUTPUT_PREFIX (IPFS_ADMINLOGS/IPFS_ADMINLOGS_PREFIX) This will allow you to have the following scenarios: - 1. IPFS_OUTPUT=ipfs.oceanprotocol.com:5001 , IPFS_OUTPUT_PREFIX=ipfs.oceanprotocol.com:8080/ipfs/ - - Port 5001 will be used to call addFIle, but the result will look like "ipfs.oceanprotocol.com:8080/ipfs/HASH" - 2. IPFS_OUTPUT=ipfs.oceanprotocol.com:5001 , IPFS_OUTPUT_PREFIX=ipfs:// - - Port 5001 will be used to call addFIle, but the result will look like "ipfs://HASH" (you will hide your ipfs deployment) + 1. - `IPFS_OUTPUT`=http://ipfs.oceanprotocol.com:5001 + - `IPFS_OUTPUT_PREFIX`=http://ipfs.oceanprotocol.com:8080/ipfs/ + + Port 5001 will be used to call addFIle, but the result will look like `ipfs.oceanprotocol.com:8080/ipfs/HASH` + + 2. - `IPFS_OUTPUT`=http://ipfs.oceanprotocol.com:5001 + - `IPFS_OUTPUT_PREFIX`=ipfs:// + + Port 5001 will be used to call addFIle, but the result will look like "ipfs://HASH" (you will hide your ipfs deployment) + 3. IPFS_EXPIRY_TIME = the default expiry time. "0" = unlimited ## Usage of NOTIFY_START_URL and NOTIFY_STOP_URL @@ -128,6 +134,9 @@ The following resources need attention: - secret: Secret value (exported to algo pod as secret env) - DID: Array of input DIDs +## Storage Expiry + Op-engine will pass a ENV variable called STORAGE_EXPIRY to pod-publishing (the env is defined in op-service and passed through from there). + ## Usage of NODE_SELECTOR If defined, all pods are going to contain the following selectors in the specs: ``` @@ -188,6 +197,11 @@ reclaimPolicy: Retain For more information, please visit https://kubernetes.io/docs/concepts/storage/storage-classes/ +## Usage of FILTERING_CONTAINER + After an algorithm job is done, you can run your own custom image that can do an analysis of the output folder. + That image could detect data leaks and overwrite the output folder if needed + Format is the usual docker image notation. + ## Customizing job templates diff --git a/kubernetes/operator.yml b/kubernetes/operator.yml index 86152b6..43645aa 100644 --- a/kubernetes/operator.yml +++ b/kubernetes/operator.yml @@ -38,11 +38,11 @@ spec: - name: IPFS_OUTPUT value: http://youripfsserver:5001 - name: IPFS_OUTPUT_PREFIX - value: http://youripfsserver:5001/ipfs/ + value: http://youripfsserver:8080/ipfs/ - name: IPFS_ADMINLOGS value: http://youradminipfsserver:5001 - name: IPFS_ADMINLOGS_PREFIX - value: http://youradminipfsserver:5001/ipfs/ + value: http://youradminipfsserver:8080/ipfs/ - name: IPFS_EXPIRY_TIME value: "3600" - name: IPFS_API_KEY @@ -54,13 +54,13 @@ spec: - name: LOG_LEVEL value: DEBUG - name: POD_CONFIGURATION_CONTAINER - value: oceanprotocol/pod-configuration:v1.0.1 + value: oceanprotocol/pod-configuration:v1.0.10 - name: NOTIFY_START_URL value: http://yourserver/ - name: NOTIFY_STOP_URL value: http://yourserver/ - name: POD_PUBLISH_CONTAINER - value: oceanprotocol/pod-publishing:v1.0.0 + value: oceanprotocol/pod-publishing:v1.0.1 - name: POSTGRES_DB valueFrom: configMapKeyRef: @@ -86,7 +86,7 @@ spec: configMapKeyRef: key: POSTGRES_PORT name: postgres-config - image: oceanprotocol/operator-engine:v1.0.1 + image: oceanprotocol/operator-engine:v1.0.2 imagePullPolicy: Always name: ocean-compute-operator resources: {} diff --git a/operator_engine/constants.py b/operator_engine/constants.py index dc72519..a70243e 100644 --- a/operator_engine/constants.py +++ b/operator_engine/constants.py @@ -90,6 +90,10 @@ class OperatorConfig: OPERATOR_PRIVATE_KEY = getenv('OPERATOR_PRIVATE_KEY',None) NODE_SELECTOR = getenv('NODE_SELECTOR',None) SERVICE_ACCOUNT = getenv('SERVICE_ACCOUNT','db-operator') + NODE_SELECTOR = getenv('NODE_SELECTOR',None) + PULL_SECRET = getenv('PULL_SECRET',None) + PULL_POLICY = getenv('PULL_POLICY','Always') + FILTERING_CONTAINER = getenv('FILTERING_CONTAINER',None) class VolumeConfig: VOLUME_SIZE = getenv('VOLUME_SIZE', '2Gi') diff --git a/operator_engine/operator_main.py b/operator_engine/operator_main.py index 75bd8d8..2004b52 100644 --- a/operator_engine/operator_main.py +++ b/operator_engine/operator_main.py @@ -12,50 +12,60 @@ from resources import create_configmap_workflow, notify_start, notify_stop +logging.basicConfig(format='%(asctime)s %(message)s') logger = logging.getLogger('ocean-operator') #logger.setLevel(OperatorConfig.LOG_LEVEL) logger.setLevel(logging.DEBUG) + kubernetes.config.load_incluster_config() #current_namespace = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace").read() #start the sql thread def handle_new_job(jobId,logger): + api = kubernetes.client.BatchV1Api() sql_body=get_sql_job_workflow(jobId,logger) if sql_body is None: - logging.error(f'Sql workflow is empty for {jobId}') + logger.error(f'Sql workflow is empty for {jobId}') return body = json.loads(sql_body) if not isinstance(body,dict): - logging.error(f'Error loading dict workflow for {jobId}') + logger.error(f'Error loading dict workflow for {jobId}') return - + namespace = body['metadata']['namespace'] #check if we already have a jobid - sqlstatus=get_sql_job_status(body['metadata']['name'],logging) + sqlstatus=get_sql_job_status(body['metadata']['name'],logger) if sqlstatus>10: - logging.error(f"Creating workflow failed, already in db!!!") + logger.error(f"Creating workflow failed, already in db!!!") return {'message': "Creating workflow failed, already in db"} notify_start(body, logger) update_sql_job_status(body['metadata']['name'],20,logger) # Configmap for workflow - logging.debug(f"Job: {jobId} Creating config map") + logger.info(f"Job: {jobId} Creating config map") create_configmap_workflow(body, logger) # Volume - logging.debug(f"Job: {jobId} Creating volumes") + logger.info(f"Job: {jobId} Creating volumes") create_all_pvc(body, logger, body['spec']['metadata']['stages'][0]['compute']['resources']) # Configure pod - logging.error(f"Job: {jobId} Start conf pod") + logger.info(f"Job: {jobId} Start conf pod") create_configure_job(body, logger) # Wait configure pod to finish - while not wait_finish_job(body['metadata']['namespace'], f"{body['metadata']['name']}-configure-job",logger): - logging.error(f"Job: {jobId} Waiting for configure pod to finish") + while not wait_finish_job(namespace, f"{body['metadata']['name']}-configure-job",logger): + logger.debug(f"Job: {jobId} Waiting for configure pod to finish") time.sleep(5.0) #we should check for a timeout - + # Terminate configure job + if OperatorConfig.DEBUG_NO_CLEANUP is None: + try: + name=body['metadata']['name']+"-configure-job" + logger.info(f"Removing job {name}") + api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1) + except ApiException as e: + logger.warning(f"Failed to remove configure job\n") sqlstatus=get_sql_job_status(body['metadata']['name'],logger) # Run the algo if status == 30, else configure failed.. if sqlstatus==30: @@ -64,46 +74,75 @@ def handle_new_job(jobId,logger): create_algorithm_job(body, logger, body['spec']['metadata']['stages'][0]['compute']['resources']) starttime=int(time.time()) # Wait configure pod to finish - while not wait_finish_job(body['metadata']['namespace'], f"{body['metadata']['name']}-algorithm-job",logger): + while not wait_finish_job(namespace, f"{body['metadata']['name']}-algorithm-job",logger): duration=int(time.time())-starttime shouldstop=False - logging.debug(f"Job: {jobId} Waiting for algorithm pod to finish, {duration} seconds of running so far") + logger.debug(f"Job: {jobId} Waiting for algorithm pod to finish, {duration} seconds of running so far") #Check if algo is taking too long if 'maxtime' in body['spec']['metadata']['stages'][0]['compute']: if isinstance(body['spec']['metadata']['stages'][0]['compute']['maxtime'], int): if duration>body['spec']['metadata']['stages'][0]['compute']['maxtime']: - logging.info("Algo is taking too long. Kill IT!") + logger.info("Algo is taking too long. Kill IT!") shouldstop=True update_sql_job_istimeout(body['metadata']['name'],logger) #Check if stop was requested if check_sql_stop_requested(body['metadata']['name'],logger) is True: - logging.info(f"Job: {jobId} Algo has a stop request. Kill IT!") + logger.info(f"Job: {jobId} Algo has a stop request. Kill IT!") shouldstop=True #Stop it if needed if shouldstop is True: - stop_specific_job(body['metadata']['namespace'],body['metadata']['name']+"-algorithm-job",logger) + stop_specific_job(namespace,body['metadata']['name']+"-algorithm-job",logger) break time.sleep(5.0) else: - logging.info(f"Job: {jobId} Configure failed, algo was skipped") + logger.info(f"Job: {jobId} Configure failed, algo was skipped") + # Terminate algorithm job + if OperatorConfig.DEBUG_NO_CLEANUP is None: + try: + name=body['metadata']['name']+"-algorithm-job" + logger.info(f"Removing job {name}") + api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1) + except ApiException as e: + logger.warning(f"Failed to remove algorithm job\n") + # Filtering pod + if OperatorConfig.FILTERING_CONTAINER: + update_sql_job_status(body['metadata']['name'],50,logger) + create_filter_job(body, logger, body['spec']['metadata']['stages'][0]['compute']['resources']) + while not wait_finish_job(namespace, f"{body['metadata']['name']}-filter-job",logger): + logger.debug(f"Job: {jobId} Waiting for filtering pod to finish") + time.sleep(5.0) + if OperatorConfig.DEBUG_NO_CLEANUP is None: + try: + name=body['metadata']['name']+"-filter-job" + logger.info(f"Removing job {name}") + api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1) + except ApiException as e: + logger.warning(f"Failed to remove filter job\n") # Publish job # Update status only if algo was runned if sqlstatus==30: update_sql_job_status(body['metadata']['name'],60,logger) create_publish_job(body, logger) # Wait configure pod to finish - while not wait_finish_job(body['metadata']['namespace'], f"{body['metadata']['name']}-publish-job",logger): - logging.error(f"Job: {jobId} Waiting for publish pod to finish") + while not wait_finish_job(namespace, f"{body['metadata']['name']}-publish-job",logger): + logger.debug(f"Job: {jobId} Waiting for publish pod to finish") time.sleep(5.0) #we should check for a timeout - + # Terminate publish job + if OperatorConfig.DEBUG_NO_CLEANUP is None: + try: + name=body['metadata']['name']+"-publish-job" + logger.info(f"Removing job {name}") + api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1) + except ApiException as e: + logger.warning(f"Failed to remove algorithm job\n") if sqlstatus==30: update_sql_job_status(body['metadata']['name'],70,logger) update_sql_job_datefinished(body['metadata']['name'],logger) - logging.info(f"Job: {jobId} Finished") - cleanup_job(body['metadata']['namespace'], jobId, logger) + logger.info(f"Job: {jobId} Finished") + cleanup_job(namespace, jobId, logger) notify_stop(body, logger) return {'message': "Creating workflow finished"} diff --git a/operator_engine/resources.py b/operator_engine/resources.py index db5b5d8..d3e25cc 100644 --- a/operator_engine/resources.py +++ b/operator_engine/resources.py @@ -169,7 +169,7 @@ def create_configure_job(body, logger): 'name': 'workflow', 'subPath': 'workflow.json'} job['spec']['template']['spec']['containers'][0]['volumeMounts'].append( volume_mount) - job = create_node_selector(job,logger) + job = jobs_common_params(job,logger) create_job(logger,body,job) def create_algorithm_job(body, logger, resources): @@ -257,10 +257,80 @@ def create_algorithm_job(body, logger, resources): 'name': 'workflow', 'subPath': 'workflow.yaml'} job['spec']['template']['spec']['containers'][0]['volumeMounts'].append( volume_mount) - job = create_node_selector(job,logger) + job = jobs_common_params(job,logger) create_job(logger,body,job) +def create_filter_job(body, logger, resources): + metadata = body['spec']['metadata'] + logger.info(f"create_filter_job:{metadata}") + # attributes = metadata['service'][0]['attributes'] + with open("templates/filter-job-template.yaml", 'r') as stream: + try: + job = yaml.safe_load(stream) + except yaml.YAMLError as exc: + print(exc) + + job['metadata']['labels']['app'] = body['metadata']['name'] + job['metadata']['labels']['workflow'] = body['metadata']['labels']['workflow'] + job['metadata']['labels']['component'] = 'filter' + + job['metadata']['name'] = f"{body['metadata']['name']}-filter-job" + job['metadata']['namespace'] = body['metadata']['namespace'] + + job['spec']['template']['metadata']['labels']['workflow'] = body['metadata']['labels']['workflow'] + job['spec']['template']['metadata']['labels']['component'] = 'filter' + + job['spec']['template']['spec']['containers'][0]['image'] = OperatorConfig.FILTERING_CONTAINER + + # Env + dids = list() + for inputs in metadata['stages'][0]['input']: + logger.info(f"{inputs} as inputs") + id = inputs['id'] + id = id.replace('did:op:', '') + dids.append(id) + dids = json.dumps(dids) + did_transformation = metadata['stages'][0]['algorithm'] + env_transformation = did_transformation['id'].replace('did:op:', '') + job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'DIDS', + 'value': dids}) + job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'TRANSFORMATION_DID', + 'value': env_transformation}) + job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'VOLUME', + 'value': '/data'}) + job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'LOGS', + 'value': '/data/logs'}) + job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'INPUTS', + 'value': '/data/inputs'}) + job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'OUTPUTS', + 'value': '/data/outputs'}) + job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'secret','value': body['metadata']['secret']}) + + # Volumes + job['spec']['template']['spec']['volumes'] = [] + job['spec']['template']['spec']['containers'][0]['volumeMounts'] = [] + + # Output volume + job['spec']['template']['spec']['volumes'].append( + {'name': 'data', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-data"}}) + volume_mount = {'mountPath': '/data/', 'name': 'data', 'readOnly': False} + job['spec']['template']['spec']['containers'][0]['volumeMounts'].append( + volume_mount) + + # set the account + job['spec']['template']['spec']['serviceAccount']=OperatorConfig.SERVICE_ACCOUNT + job['spec']['template']['spec']['serviceAccountName']=OperatorConfig.SERVICE_ACCOUNT + # Workflow config volume + job['spec']['template']['spec']['volumes'].append( + {'name': 'workflow', 'configMap': {'defaultMode': 420, 'name': body['metadata']['name']}}) + volume_mount = {'mountPath': '/workflow.yaml', + 'name': 'workflow', 'subPath': 'workflow.yaml'} + job['spec']['template']['spec']['containers'][0]['volumeMounts'].append( + volume_mount) + job = jobs_common_params(job,logger) + create_job(logger,body,job) + def create_publish_job(body, logger): init_script = OperatorConfig.POD_PUBLISH_INIT_SCRIPT @@ -321,8 +391,7 @@ def create_publish_job(body, logger): job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'IPFS_OUTPUT_PREFIX','value': OperatorConfig.IPFS_OUTPUT_PREFIX}) if OperatorConfig.IPFS_ADMINLOGS_PREFIX is not None: job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'IPFS_ADMINLOGS_PREFIX','value': OperatorConfig.IPFS_ADMINLOGS_PREFIX}) - if OperatorConfig.IPFS_EXPIRY_TIME is not None: - job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'IPFS_EXPIRY_TIME','value': OperatorConfig.IPFS_EXPIRY_TIME}) + job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'STORAGE_EXPIRY','value': str(body['spec']['metadata']['stages'][0]['compute']['storageExpiry'])}) if OperatorConfig.IPFS_API_KEY is not None: job['spec']['template']['spec']['containers'][0]['env'].append({'name': 'IPFS_API_KEY','value': OperatorConfig.IPFS_API_KEY}) if OperatorConfig.IPFS_API_CLIENT is not None: @@ -362,7 +431,7 @@ def create_publish_job(body, logger): 'name': 'workflow', 'subPath': 'workflow.json'} job['spec']['template']['spec']['containers'][0]['volumeMounts'].append( volume_mount) - job = create_node_selector(job,logger) + job = jobs_common_params(job,logger) create_job(logger,body,job) @@ -394,28 +463,6 @@ def wait_finish_job(namespace, pod_name,logger): def cleanup_job(namespace, jobId, logger): if OperatorConfig.DEBUG_NO_CLEANUP is None: - api = kubernetes.client.BatchV1Api() - #jobs and pods - try: - name=jobId+"-configure-job" - logger.debug(f"Removing job {name}") - api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1) - except ApiException as e: - logger.warning(f"Failed to remove configure job\n") - try: - name=jobId+"-algorithm-job" - logger.debug(f"Removing job {name}") - api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1) - except ApiException as e: - logger.warning(f"Failed to remove algo job\n") - try: - name=jobId+"-publish-job" - logger.debug(f"Removing job {name}") - api.delete_namespaced_job(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1) - except ApiException as e: - logger.warning(f"Failed to remove publish job\n") - logger.debug(f"Sleeping while pods are deleted...") - time.sleep(5.0) api = kubernetes.client.CoreV1Api() #pvc claims try: @@ -444,14 +491,11 @@ def cleanup_job(namespace, jobId, logger): return def update_sql_job_datefinished(jobId, logger): - logger.error(f"Start update_sql_job_datefinished for {jobId}") - connection = getpgconn() + connection = getpgconn(logger) try: cursor = connection.cursor() postgres_update_query = """ UPDATE jobs SET dateFinished=NOW() WHERE workflowId=%s""" record_to_update = (jobId,) - logger.info(f'Got select_query: {postgres_update_query}') - logger.info(f'Got params: {record_to_update}') cursor.execute(postgres_update_query, record_to_update) connection.commit() except (Exception, psycopg2.Error) as error: @@ -463,6 +507,12 @@ def update_sql_job_datefinished(jobId, logger): cursor.close() connection.close() +def jobs_common_params(job,logger): + job = create_node_selector(job, logger) + job = update_imagePullSecrets(job, logger) + job = update_imagePullPolicy(job, logger) + return job + def create_node_selector(job, logger): if OperatorConfig.NODE_SELECTOR is None: return job @@ -487,20 +537,29 @@ def create_node_selector(job, logger): } }''' % OperatorConfig.NODE_SELECTOR job['spec']['template']['spec']['affinity']['nodeAffinity']=json.loads(affinity) - logger.error(job['spec']['template']['spec']['affinity']) except Exception as e: logger.error(e) return job +def update_imagePullSecrets(job, logger): + if OperatorConfig.PULL_SECRET is None: + return job + job['spec']['template']['spec']['imagePullSecrets'] = list() + job['spec']['template']['spec']['imagePullSecrets'].append({ 'name': OperatorConfig.PULL_SECRET}) + return job + +def update_imagePullPolicy(job, logger): + if OperatorConfig.PULL_POLICY is None: + return job + job['spec']['template']['spec']['containers'][0]['imagePullPolicy'] = OperatorConfig.PULL_POLICY + return job + def update_sql_job_istimeout(jobId, logger): - logger.error(f"Start update_sql_job_istimeout for {jobId}") - connection = getpgconn() + connection = getpgconn(logger) try: cursor = connection.cursor() postgres_update_query = """ UPDATE jobs SET stopreq=2 WHERE workflowId=%s""" record_to_update = (jobId,) - logger.info(f'Got select_query: {postgres_update_query}') - logger.info(f'Got params: {record_to_update}') cursor.execute(postgres_update_query, record_to_update) connection.commit() except (Exception, psycopg2.Error) as error: @@ -514,8 +573,7 @@ def update_sql_job_istimeout(jobId, logger): def update_sql_job_status(jobId, status, logger): - logger.error(f"Start update_sql_job_status for {jobId} : {status}") - connection = getpgconn() + connection = getpgconn(logger) try: switcher = { 10: "Job started", @@ -529,8 +587,6 @@ def update_sql_job_status(jobId, status, logger): cursor = connection.cursor() postgres_update_query = """ UPDATE jobs SET status=%s,statusText=%s WHERE workflowId=%s""" record_to_update = (status, statusText, jobId) - logger.info(f'Got select_query: {postgres_update_query}') - logger.info(f'Got params: {record_to_update}') cursor.execute(postgres_update_query, record_to_update) connection.commit() except (Exception, psycopg2.Error) as error: @@ -543,15 +599,12 @@ def update_sql_job_status(jobId, status, logger): def get_sql_job_status(jobId, logger): - logger.error(f"Start get_sql_job_status for {jobId}") - connection = getpgconn() + connection = getpgconn(logger) try: cursor = connection.cursor() params = dict() select_query = "SELECT status FROM jobs WHERE workflowId=%(jobId)s LIMIT 1" params['jobId'] = jobId - logger.info(f'Got select_query: {select_query}') - logger.info(f'Got params: {params}') cursor.execute(select_query, params) returnstatus = -1 while True: @@ -566,19 +619,15 @@ def get_sql_job_status(jobId, logger): if(connection): cursor.close() connection.close() - logger.error(f'get_sql_job_status goes back with {returnstatus}') return returnstatus def get_sql_job_workflow(jobId, logger): - logger.error(f"Start get_sql_job_status for {jobId}") - connection = getpgconn() + connection = getpgconn(logger) try: cursor = connection.cursor() params = dict() select_query = "SELECT workflow FROM jobs WHERE workflowId=%(jobId)s LIMIT 1" params['jobId'] = jobId - logger.info(f'Got select_query: {select_query}') - logger.info(f'Got params: {params}') cursor.execute(select_query, params) returnstatus = None while True: @@ -593,13 +642,12 @@ def get_sql_job_workflow(jobId, logger): if(connection): cursor.close() connection.close() - logger.error(f'get_sql_job_status goes back with {returnstatus}') return returnstatus def get_sql_pending_jobs(logger): #logger.debug(f"Start get_sql_pending_jobs") - connection = getpgconn() + connection = getpgconn(logger) returnstatus = [] try: cursor = connection.cursor() @@ -625,7 +673,7 @@ def get_sql_pending_jobs(logger): def check_sql_stop_requested(jobId, logger): - connection = getpgconn() + connection = getpgconn(logger) returnstatus = False try: cursor = connection.cursor() @@ -649,7 +697,7 @@ def check_sql_stop_requested(jobId, logger): return returnstatus -def getpgconn(): +def getpgconn(logger): try: connection = psycopg2.connect(user=PGConfig.POSTGRES_USER, password=PGConfig.POSTGRES_PASSWORD, diff --git a/operator_engine/templates/filter--job-template.yaml b/operator_engine/templates/filter--job-template.yaml new file mode 100644 index 0000000..66fee64 --- /dev/null +++ b/operator_engine/templates/filter--job-template.yaml @@ -0,0 +1,28 @@ +apiVersion: batch/v1 +kind: Job +metadata: + labels: + app: "" + workflow: "" + name: "" + namespace: "" +spec: + template: + metadata: + labels: + workflow: "" + spec: + containers: + - args: [] + command: [] + image: "" + imagePullPolicy: Always + name: "workflow-executor" + resources: {} + env: [] + restartPolicy: OnFailure + schedulerName: default-scheduler + securityContext: {} + serviceAccount: db-operator + serviceAccountName: db-operator +status: {}