Skip to content

Commit

Permalink
Feature/node selectors (#31)
Browse files Browse the repository at this point in the history
* use node selectors

* reduce to 2 pvc:  data and adminlogs
  • Loading branch information
alexcos20 authored Apr 28, 2021
1 parent 433473c commit d94a3b8
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 71 deletions.
35 changes: 26 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[![banner](https://raw.githubusercontent.com/oceanprotocol/art/master/github/repo-banner%402x.png)](https://oceanprotocol.com)

<h1 align="center">Operator-Enging</h1>
<h1 align="center">Operator-Engine</h1>

> Orchestrates a compute job
Expand Down Expand Up @@ -40,9 +40,8 @@ The Operator Engine is in charge of retrieving all the Workflows registered in a
* Orchestrate the flow of the execution
* Start the configuration pod in charge of download the workflow dependencies (datasets and algorithms)
* Start the pod including the algorithm to execute
* Start the publishing pod that publish the new assets created in the Ocean Protocol network.
* Start the publishing pod that uploads the results to a remote storage(ipfs or S3)

The Operator Engine doesn't provide any storage capability, all the state is stored directly in the K8s cluster.

## Getting Started

Expand All @@ -61,11 +60,9 @@ First is necessary to apply the `operator-engine` YAML defining the K8s deployme
```
$ kubectl create ns ocean-compute
$ kubectl config set-context --current --namespace ocean-compute
$ kubectl apply -f k8s_install/sa.yml
$ kubectl apply -f k8s_install/binding.yml
$ kubectl apply -f k8s_install/operator.yml
$ kubectl apply -f k8s_install/computejob-crd.yaml
$ kubectl apply -f k8s_install/workflow-crd.yaml
$ kubectl apply -f kubernetes/sa.yml
$ kubectl apply -f kubernetes/binding.yml
$ kubectl apply -f kubernetes/operator.yml
```

This will generate the `ocean-compute-operator` deployment in K8s. You can check the `Deployment` was created successfully
Expand Down Expand Up @@ -94,6 +91,7 @@ The following resources need attention:
| Variable | Description |
| ------------------------------------------------------ | ------------------------------------------------------------------------------------------- |
| `OPERATOR_PRIVATE_KEY` | Private key of address used to sign notifications and consume algo/inputs (operator service has the same address) |
| `IPFS_TYPE` | IPFS library to use. 'CLUSTER' to use ipfs-cluster, 'CLIENT' to use ipfs-client (default) |
| `IPFS_OUTPUT`, `IPFS_ADMINLOGS` | IPFS gateway to upload the output data (algorithm logs & algorithm output) and admin logs (logs from pod-configure & pod-publish)|
| `IPFS_OUTPUT_PREFIX`, `IPFS_ADMINLOGS_PREFIX` | Prefix used for the results files (see below) |
| `IPFS_EXPIRY_TIME` | Default expiry time for ipfs (see https://github.com/ipfs/ipfs-cluster/blob/dbca14e83295158558234e867477ce07a523b81b/CHANGELOG.md#rest-api-2_), with an expected value in Go's time format, i.e. 12h (optional)
Expand All @@ -104,6 +102,7 @@ The following resources need attention:
| `NOTIFY_START_URL` | URL to call when a new job starts. |
| `NOTIFY_STOP_URL` | URL to call when a new job ends. |
| `SERVICE_ACCOUNT` | K8 service account to run pods (same as the one used in deployment). Defaults to db-operator|
| `NODE_SELECTOR` | K8 node selector (if defined) |



Expand All @@ -122,12 +121,30 @@ The following resources need attention:
3. IPFS_EXPIRY_TIME = the default expiry time. "0" = unlimited

## Usage of NOTIFY_START_URL and NOTIFY_STOP_URL
Engine will JOSN POST the following for each action:
Engine will JSON POST the following for each action:
- algoDID: Algorithm DID (if any)
- jobId: Job ID
- secret: Secret value (exported to algo pod as secret env)
- DID: Array of input DIDs

## Usage of NODE_SELECTOR
If defined, all pods are going to contain the following selectors in the specs:
```
spec:
template:
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: scope
operator: In
values:
- $NODE_SELECTOR
```

This allows you to run C2D pods on specific nodes

## Storage class

Expand Down
4 changes: 3 additions & 1 deletion kubernetes/operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ spec:
value: http://yourserver/
- name: NOTIFY_STOP_URL
value: http://yourserver/
- name: NODE_SELECTOR
value: c2d
- name: POD_PUBLISH_CONTAINER
value: oceanprotocol/pod-publishing:v1.0.0
- name: POSTGRES_DB
Expand Down Expand Up @@ -82,7 +84,7 @@ spec:
configMapKeyRef:
key: POSTGRES_PORT
name: postgres-config
image: oceanprotocol/operator-engine:v1.0.0
image: oceanprotocol/operator-engine:v1.0.9
imagePullPolicy: Always
name: ocean-compute-operator
resources: {}
Expand Down
1 change: 1 addition & 0 deletions operator_engine/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class OperatorConfig:
NOTIFY_START_URL = getenv('NOTIFY_START_URL',None)
NOTIFY_STOP_URL = getenv('NOTIFY_STOP_URL',None)
OPERATOR_PRIVATE_KEY = getenv('OPERATOR_PRIVATE_KEY',None)
NODE_SELECTOR = getenv('NODE_SELECTOR',None)
SERVICE_ACCOUNT = getenv('SERVICE_ACCOUNT','db-operator')

class VolumeConfig:
Expand Down
106 changes: 46 additions & 60 deletions operator_engine/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,23 @@


def create_all_pvc(body, logger, resources):
create_pvc_input(body, logger, resources['inputVolumesize'])
create_pvc_output(body, logger, resources['outputVolumesize'])
create_pvc_data(body, logger, resources['inputVolumesize'])
create_pvc_adminlogs(body, logger, resources['adminlogsVolumesize'])


def create_pvc_output(body, logger, size):
def create_pvc_data(body, logger, size):
storage_class_name = VolumeConfig.STORAGE_CLASS
with open("templates/volume-template.yaml", 'r') as stream:
try:
volume = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
volume['metadata']['name'] = body['metadata']['name']+"-output"
volume['metadata']['name'] = body['metadata']['name']+"-data"
volume['metadata']['namespace'] = body['metadata']['namespace']
volume['spec']['resources']['requests']['storage'] = size
volume['spec']['storageClassName'] = storage_class_name
create_pvc(body,logger,volume)

#api = kubernetes.client.CoreV1Api()
#obj = api.create_namespaced_persistent_volume_claim(body['metadata']['namespace'], volume)
#logger.info(f"{obj.kind} {obj.metadata.name} created")


def create_pvc_input(body, logger, size):
storage_class_name = VolumeConfig.STORAGE_CLASS
with open("templates/volume-template.yaml", 'r') as stream:
try:
volume = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
volume['metadata']['name'] = body['metadata']['name']+"-input"
volume['metadata']['namespace'] = body['metadata']['namespace']
volume['spec']['resources']['requests']['storage'] = size
volume['spec']['storageClassName'] = storage_class_name
create_pvc(body,logger,volume)


def create_pvc_adminlogs(body, logger, size):
storage_class_name = VolumeConfig.STORAGE_CLASS
with open("templates/volume-template.yaml", 'r') as stream:
Expand Down Expand Up @@ -162,17 +142,10 @@ def create_configure_job(body, logger):
# Volumes
job['spec']['template']['spec']['volumes'] = []
job['spec']['template']['spec']['containers'][0]['volumeMounts'] = []
# Input volume
job['spec']['template']['spec']['volumes'].append(
{'name': 'input', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-input"}})
volume_mount = {'mountPath': '/data/inputs',
'name': 'input', 'readOnly': False}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
# Output volume
# Data volume
job['spec']['template']['spec']['volumes'].append(
{'name': 'output', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-output"}})
volume_mount = {'mountPath': '/data/', 'name': 'output', 'readOnly': False}
{'name': 'data', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-data"}})
volume_mount = {'mountPath': '/data/', 'name': 'data', 'readOnly': False}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
# Admin logs volume
Expand All @@ -196,6 +169,7 @@ def create_configure_job(body, logger):
'name': 'workflow', 'subPath': 'workflow.json'}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
job = create_node_selector(job,logger)
create_job(logger,body,job)

def create_algorithm_job(body, logger, resources):
Expand Down Expand Up @@ -267,15 +241,8 @@ def create_algorithm_job(body, logger, resources):

# Output volume
job['spec']['template']['spec']['volumes'].append(
{'name': 'output', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-output"}})
volume_mount = {'mountPath': '/data/', 'name': 'output', 'readOnly': False}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
# Input volume
job['spec']['template']['spec']['volumes'].append(
{'name': 'input', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-input"}})
volume_mount = {'mountPath': '/data/inputs',
'name': 'input', 'readOnly': True}
{'name': 'data', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-data"}})
volume_mount = {'mountPath': '/data/', 'name': 'data', 'readOnly': False}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
# Admin logs volume - Do not mount it here
Expand All @@ -290,6 +257,7 @@ def create_algorithm_job(body, logger, resources):
'name': 'workflow', 'subPath': 'workflow.yaml'}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
job = create_node_selector(job,logger)
create_job(logger,body,job)


Expand Down Expand Up @@ -363,15 +331,8 @@ def create_publish_job(body, logger):

# Output volume
job['spec']['template']['spec']['volumes'].append(
{'name': 'output', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-output"}})
volume_mount = {'mountPath': '/data/', 'name': 'output', 'readOnly': False}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
# Input volume
job['spec']['template']['spec']['volumes'].append(
{'name': 'input', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-input"}})
volume_mount = {'mountPath': '/data/inputs',
'name': 'input', 'readOnly': True}
{'name': 'data', 'persistentVolumeClaim': {'claimName': body['metadata']['name']+"-data"}})
volume_mount = {'mountPath': '/data/', 'name': 'data', 'readOnly': False}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
# Admin logs volume
Expand All @@ -397,6 +358,7 @@ def create_publish_job(body, logger):
'name': 'workflow', 'subPath': 'workflow.json'}
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
volume_mount)
job = create_node_selector(job,logger)
create_job(logger,body,job)


Expand All @@ -422,7 +384,9 @@ def wait_finish_job(namespace, pod_name,logger):
return False
except ApiException as e:
logger.debug(f"Exception when calling BatchV1Api->read_namespaced_job: {e}\n")
return False
if e.reason=='Not Found':
return True
return False

def cleanup_job(namespace, jobId, logger):
if OperatorConfig.DEBUG_NO_CLEANUP is None:
Expand Down Expand Up @@ -457,17 +421,11 @@ def cleanup_job(namespace, jobId, logger):
except ApiException as e:
logger.warning(f"Failed to remove admin logs pvc\n")
try:
name=jobId+"-input"
name=jobId+"-data"
logger.debug(f"Removing pvc {name}")
api.delete_namespaced_persistent_volume_claim(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1)
except ApiException as e:
logger.warning(f"Failed to remove input pvc\n")
try:
name=jobId+"-output"
logger.debug(f"Removing pvc {name}")
api.delete_namespaced_persistent_volume_claim(namespace=namespace, name=name, propagation_policy='Foreground',grace_period_seconds=1)
except ApiException as e:
logger.warning(f"Failed to remove output pvc\n")
logger.warning(f"Failed to remove data pvc\n")

#config map
try:
Expand Down Expand Up @@ -501,6 +459,34 @@ def update_sql_job_datefinished(jobId, logger):
cursor.close()
connection.close()

def create_node_selector(job, logger):
if OperatorConfig.NODE_SELECTOR is None:
return job
try:
job['spec']['template']['spec']['affinity'] = dict()
job['spec']['template']['spec']['affinity']['nodeAffinity']= dict()
affinity='''{
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "scope",
"operator": "In",
"values": [
"%s"
]
}
]
}
]
}
}''' % OperatorConfig.NODE_SELECTOR
job['spec']['template']['spec']['affinity']['nodeAffinity']=json.loads(affinity)
logger.error(job['spec']['template']['spec']['affinity'])
except Exception as e:
logger.error(e)
return job

def update_sql_job_istimeout(jobId, logger):
logger.error(f"Start update_sql_job_istimeout for {jobId}")
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.0
current_version = 1.0.1

[bdist_wheel]
universal = 1
Expand Down

0 comments on commit d94a3b8

Please sign in to comment.