generated from interTwin-eu/repository-template
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
90 changed files
with
3,611 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
apiVersion: v1 | ||
kind: Pod | ||
metadata: | ||
name: client | ||
spec: | ||
containers: | ||
- name: client | ||
image: alpine:latest | ||
command: | ||
- /bin/sh | ||
- "-c" | ||
- "sleep infinity" | ||
|
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
import airflow.utils.dates | ||
from airflow.utils.log.logging_mixin import LoggingMixin | ||
from airflow import DAG | ||
from airflow.operators.dummy import DummyOperator | ||
from airflow.providers.http.sensors.http import HttpSensor | ||
from airflow.providers.http.operators.http import SimpleHttpOperator | ||
from airflow.operators.python import BranchPythonOperator | ||
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator | ||
from kubernetes.client import models as k8s | ||
|
||
import json | ||
from datetime import timedelta | ||
|
||
#baseapiurl="http://gflapi.glitchflow.svc.cluster.local:8000/" | ||
#apifrzendp="train" | ||
#apistat="stats" | ||
|
||
WAITMSG="WAITING" | ||
FRZMSG="FROZEN" | ||
MAXBUFSZ=2000 | ||
|
||
dag = DAG( | ||
dag_id="Trainpipe", | ||
start_date=airflow.utils.dates.days_ago(3), | ||
schedule_interval=None, | ||
description="DAG implementing the AI training data pipeline.", | ||
|
||
) | ||
|
||
|
||
##DAG CALLABLES | ||
def pick_branch(**context): | ||
|
||
jsmsg=context["task_instance"].xcom_pull( | ||
task_ids="send_frz_sign", key="return_value" | ||
) | ||
|
||
respob=json.loads(jsmsg) | ||
|
||
if(respob["resp"]==WAITMSG): | ||
return "next_sensor" | ||
else: | ||
return "next_metrics" | ||
|
||
|
||
|
||
def check_response(response): | ||
js = response.json() | ||
output=js | ||
|
||
flag=None | ||
|
||
|
||
if(js): | ||
LoggingMixin().log.info("Read json object") | ||
|
||
|
||
if(output['buff_size']>=MAXBUFSZ): | ||
flag=True | ||
else: | ||
flag=False | ||
|
||
|
||
return flag | ||
|
||
|
||
### DAG DEFS | ||
IniTrain = DummyOperator(task_id="start_training", dag=dag) | ||
|
||
sign_train = SimpleHttpOperator( | ||
task_id="send_frz_sign", | ||
method="POST", | ||
http_conn_id="testapp", | ||
endpoint="train", | ||
data=json.dumps({"user":"airflow","token":"airflow"}), | ||
headers={"Content-Type": "application/json"}, | ||
|
||
|
||
retries=3, | ||
retry_delay=timedelta(minutes=5), | ||
dag=dag, | ||
) | ||
|
||
|
||
|
||
chech_train_resp=BranchPythonOperator( | ||
task_id="check_frz_sign", | ||
python_callable=pick_branch, | ||
dag=dag | ||
) | ||
|
||
|
||
###SENSOR BRANCH | ||
next_sens = HttpSensor(task_id="next_sensor", | ||
http_conn_id="testapp", | ||
endpoint="stats", | ||
response_check=lambda response: check_response(response), | ||
poke_interval=10, | ||
timeout=3600, | ||
mode="reschedule", | ||
dag=dag | ||
) | ||
|
||
freeze= SimpleHttpOperator( | ||
task_id="freeze_ds", | ||
method="POST", | ||
http_conn_id="testapp", | ||
endpoint="train", | ||
data=json.dumps({"user":"airflow","token":"airflow"}), | ||
headers={"Content-Type": "application/json"}, | ||
|
||
|
||
retries=3, | ||
retry_delay=timedelta(minutes=5), | ||
dag=dag, | ||
) | ||
####################################### | ||
|
||
|
||
###BEST CASE BRANCH | ||
next_metrics = DummyOperator(task_id="next_metrics", dag=dag) | ||
########################################################## | ||
|
||
|
||
|
||
join_branch = DummyOperator(task_id="join_brc",trigger_rule="none_failed", dag=dag) | ||
|
||
|
||
preproc =KubernetesPodOperator( | ||
# unique id of the task within the DAG | ||
task_id="preQ", | ||
# the Docker image to launch | ||
image="romanoa77/preq:0.4.air", | ||
# launch the Pod on the same cluster as Airflow is running on | ||
in_cluster=True, | ||
# launch the Pod in the same namespace as Airflow is running in | ||
namespace="glitchflow", | ||
# Pod configuration | ||
# name the Pod | ||
name="airflow_preprocessor", | ||
|
||
# attach labels to the Pod, can be used for grouping | ||
labels={"app": "preq", "backend": "airflow"}, | ||
# reattach to worker instead of creating a new Pod on worker failure | ||
reattach_on_restart=True, | ||
# delete Pod after the task is finished | ||
is_delete_operator_pod=True, | ||
# get log stdout of the container as task logs | ||
get_logs=True, | ||
# log events in case of Pod failure | ||
log_events_on_failure=True, | ||
# enable xcom | ||
do_xcom_push=True, | ||
volumes = [k8s.V1Volume( | ||
name="gwdataln", | ||
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="gwdataln"), | ||
)], | ||
volume_mounts=[k8s.V1VolumeMount(mount_path="/app/data", name="gwdataln", sub_path=None, read_only=False) | ||
], | ||
dag=dag, | ||
|
||
#env_vars={"NAME_TO_GREET": f"{name}"}, | ||
) | ||
|
||
|
||
|
||
|
||
|
||
IniTrain>>sign_train>>chech_train_resp>>[next_sens,next_metrics] | ||
next_sens>>freeze | ||
[next_metrics,freeze]>>join_branch | ||
join_branch>>preproc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
|
||
envsetup.sh | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# syntax=docker/dockerfile:1 | ||
FROM python:3.8.10-alpine AS comp | ||
|
||
RUN python -m venv /opt/venv | ||
ENV PATH=/opt/venv/bin:$PATH | ||
|
||
COPY requirements.txt . | ||
RUN --mount=type=cache,target=/root/.cache/pip pip install -r requirements.txt | ||
|
||
FROM python:3.8.10-alpine | ||
|
||
### Env var and cmd line arguments#### | ||
ARG srvd=/var/www | ||
|
||
ENV ENDP_STAT=stat | ||
ENV ENDP_DUMP=dumpLogF | ||
ENV ENDP_DUMPF=dumpF | ||
ENV ENDP_SEND=sendF | ||
ENV DB_BURL=http://databuff-0.databuff.glitchflow.svc.cluster.local:8080/ | ||
ENV ENDP_DESC=dstat | ||
ENV ENDP_UPD_DESC=upddsc | ||
ENV ENDP_CLEAN=cleanall | ||
ENV MAX_SIZE=2000 | ||
|
||
|
||
|
||
|
||
ENV GREET="Hello from Gunicorn server!" | ||
ENV PATH=/opt/venv/bin:$PATH | ||
###################################### | ||
|
||
RUN mkdir ${srvd} | ||
|
||
COPY --from=comp /opt/venv /opt/venv | ||
COPY gunicorn.conf.py /etc | ||
COPY app ${srvd}/app | ||
|
||
WORKDIR ${srvd} | ||
|
||
|
||
|
||
CMD [ "gunicorn", "-c","/etc/gunicorn.conf.py", "app:create_app('default')" ] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import requests | ||
import json | ||
|
||
|
||
""" | ||
GIN CALLS | ||
router.GET("/stat", getStat(&StatDesc)) | ||
router.GET("/dumpLogF", getLogF()) | ||
router.POST("/sendF", postFile(&StatDesc)) | ||
""" | ||
|
||
""" | ||
Response struct | ||
response={'code':req.status_code,'resp_msg':req.json()} | ||
response={'code':req.status_code,'resp_msg':req.text} | ||
A data structure class could defined | ||
""" | ||
|
||
class RqCalls: | ||
|
||
def __init__(self,url): | ||
self.baseurl=url | ||
#self.max_time=1 | ||
self.RqHnd=requests.Session() | ||
|
||
def getReq(self,dest): | ||
|
||
|
||
|
||
req=self.RqHnd.get(self.baseurl+dest) | ||
|
||
|
||
response={'code':req.status_code,'resp_msg':req.json()} | ||
|
||
return response | ||
|
||
|
||
def postJson(self,dest,data): | ||
#input=json.loads(data) | ||
input=data | ||
|
||
|
||
req=self.RqHnd.post(self.baseurl+dest,input) | ||
|
||
response={'code':req.status_code,'resp_msg':req.text} | ||
|
||
return response | ||
|
||
|
||
|
||
|
||
|
Oops, something went wrong.