diff --git a/buildup.sh b/buildup.sh index 04e064a..0747c2a 100755 --- a/buildup.sh +++ b/buildup.sh @@ -1,5 +1,6 @@ #!/bin/bash docker rmi johnson684/manager:golang-socket -f +docker rmi dasbd72/data-serve:latest -f docker rmi johnson684/image-scale:python-socket -f docker rmi johnson684/image-recognition:python-socket -f docker rmi johnson684/cache-deleter:python -f @@ -9,6 +10,11 @@ docker build -t johnson684/manager:golang-socket . docker push johnson684/manager:golang-socket cd .. +cd data-serve +docker build -t dasbd72/data-serve:latest . +docker push dasbd72/data-serve:latest +cd .. + cd cache-deleter docker build -t johnson684/cache-deleter:python . docker push johnson684/cache-deleter:python @@ -42,21 +48,25 @@ kubectl apply -f yamls/pvc-disk.yaml kubectl delete -f yamls/apps.yaml kubectl delete -f yamls/new-apps.yaml kubectl delete -f yamls/manager.yaml +kubectl delete -f yamls/data-serve.yaml kubectl delete -f yamls/cache-deleter.yaml kubectl apply -f yamls/apps.yaml kubectl apply -f yamls/new-apps.yaml kubectl apply -f yamls/manager.yaml +kubectl apply -f yamls/data-serve.yaml kubectl apply -f yamls/cache-deleter.yaml # disk storage kubectl delete -f yamls/apps-disk.yaml kubectl delete -f yamls/manager-disk.yaml +kubectl delete -f yamls/data-serve-disk.yaml kubectl delete -f yamls/cache-deleter-disk.yaml kubectl apply -f yamls/apps-disk.yaml kubectl apply -f yamls/manager-disk.yaml +kubectl apply -f yamls/data-serve-disk.yaml kubectl apply -f yamls/cache-deleter-disk.yaml diff --git a/data-serve/cmd/serve/serve.go b/data-serve/cmd/serve/serve.go new file mode 100644 index 0000000..f753332 --- /dev/null +++ b/data-serve/cmd/serve/serve.go @@ -0,0 +1,98 @@ +package main + +import ( + "bufio" + "encoding/json" + "io" + "log" + "net" + "os" +) + +var ( + storagePath string + hostIP string + dataServePort string +) + +func init() { + // read storage path from environment variable + storagePath = os.Getenv("STORAGE_PATH") + + // read host ip from environment variable + hostIP = os.Getenv("HOST_IP") + dataServePort = os.Getenv("DATA_SERVE_PORT") + log.Printf("IP:PORT %s:%s\n", hostIP, dataServePort) + + // Write Data Serve IP:PORT to file + f, err := os.Create(storagePath + "/DATA_SERVE_IP") + if err != nil { + log.Fatal(err) + } + defer f.Close() + + _, err = f.WriteString(hostIP + ":" + dataServePort) + if err != nil { + log.Fatal(err) + } +} + +func main() { + handleConnections() +} + +func handleConnections() { + listener, err := net.Listen("tcp", ":"+dataServePort) + if err != nil { + log.Fatal(err) + } + defer listener.Close() + + log.Println("Data Server is running on " + hostIP + ":" + dataServePort) + + for { + conn, err := listener.Accept() + if err != nil { + log.Println("[Error] listener.Accept(): ", err) + continue + } + + go handleConnection(conn) + } +} + +func handleConnection(conn net.Conn) { + var req Request + reader := bufio.NewReader(conn) + + // read request from client + err := json.NewDecoder(reader).Decode(&req) + if err != nil { + log.Println(err) + return + } + + // Handle the request + switch req.Type { + case "download": + // open file + file, err := os.Open(req.Body) + if err != nil { + log.Println(err) + return + } + defer file.Close() + + // send file + _, err = io.Copy(conn, file) + if err != nil { + log.Println(err) + return + } + + log.Printf("File %s sent to %d", req.Body, conn.RemoteAddr()) + } + + // close connection + conn.Close() +} diff --git a/data-serve/cmd/serve/types.go b/data-serve/cmd/serve/types.go new file mode 100644 index 0000000..02ffb4f --- /dev/null +++ b/data-serve/cmd/serve/types.go @@ -0,0 +1,6 @@ +package main + +type Request struct { + Type string `json:"type"` + Body string `json:"body"` +} diff --git a/data-serve/go.sum b/data-serve/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/pkg/wrapper.py b/pkg/wrapper.py index 13bed56..de17921 100644 --- a/pkg/wrapper.py +++ b/pkg/wrapper.py @@ -16,62 +16,6 @@ ) -# class Manager: -# def __init__(self, endpoint) -> None: -# self.exist = False -# self.endpoint: str = endpoint -# self.connection: socket.socket = None - -# if "STORAGE_PATH" in os.environ.keys(): -# self.storage_path = os.environ["STORAGE_PATH"] -# else: -# self.storage_path = None -# logging.info(f"STORAGE_PATH: {self.storage_path}") - -# if os.path.exists(os.path.join(self.storage_path, "MANAGER_IP")): -# with open(os.path.join(self.storage_path, "MANAGER_IP"), "r") as f: -# self.manager_ip = f.read() -# else: -# self.manager_ip = None - -# self.manager_port = os.environ.get("MANAGER_PORT") - -# if self.storage_path is not None and self.manager_ip is not None and self.manager_port is not None: -# self.exist = True -# logging.info(f"MANAGER: {self.manager_ip}:{self.manager_port}") - -# try: -# self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -# self.connection.connect((self.manager_ip, self.manager_port)) -# except: -# logging.error("connection failed") -# self.exist = False - -# def close(self) -> None: -# if self.connection is not None: -# self.connection.close() - -# def send_recv(self, data: str, bufsize: int = 128) -> str: -# if self.connection is None: -# logging.error("connection is None") -# return "{}" -# try: -# # Send data -# self.connection.send(data.encode("utf-8")) - -# # Receive data -# result = self.connection.recv(bufsize).decode("utf-8") -# except: -# logging.error("send_recv failed") -# return "{}" -# else: -# result = json.JSONDecoder().decode(result) -# if "success" in result.keys() and result["success"]: -# return result["body"] -# else: -# return "{}" - - class MinioWrapper(Minio): """Inherited Wrapper""" @@ -122,10 +66,10 @@ def __init__( # Read data serve ip:port from storage if os.path.exists(os.path.join(self.storage_path, "DATA_SERVE_IP")): with open(os.path.join(self.storage_path, "DATA_SERVE_IP"), "r") as f: - self.data_serve_ip = f.read() + self.data_serve_ip_port = f.read() else: - self.data_serve_ip = None - logging.info(f"DATA_SERVE_IP: {self.data_serve_ip}") + self.data_serve_ip_port = None + logging.info(f"DATA_SERVE_IP: {self.data_serve_ip_port}") def fput_object( self, @@ -167,7 +111,7 @@ def copy_to_local(): save_hash_to_file(calculate_hash(local_dst), self.get_hash_file_path(local_dst)) - self.etcd_client.put(file_path, self.data_serve_ip) + self.etcd_client.put(file_path, self.data_serve_ip_port) logging.info( "read value from etcd:{}".format( self.etcd_client.get(file_path) @@ -239,7 +183,7 @@ def download_from_cluster(): if self.force_remote or remote_download: return False - return False + return downloadFromDataServe(self.data_serve_ip_port, local_src, file_path) if copy_from_local(): return @@ -310,3 +254,29 @@ def verify_hash(file_path, hash_file_path, hash_algorithm="sha256"): hash_value = file.read() verifySuccess = hash_value == calculated_hash return verifySuccess + + +def downloadFromDataServe(data_serve_ip_port: str, remote_path: str, file_path: str, chunk_size: int = 4096): + """Download file from data serve.""" + # TCP Connection + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + ip, port = data_serve_ip_port.split(":") + conn.connect((ip, port)) + except Exception as e: + logging.error("Data Serve download {} failed {}".format(remote_path, e)) + return False + + # Send request + req = {"type": "download", "body": remote_path} + conn.send(json.dumps(req).encode('utf-8')) + + # Receive file + with open(file_path, 'wb') as fi: + while True: + data = conn.recv(chunk_size) + if not data: + break + fi.write(data) + + return True diff --git a/yamls/data-serve-disk.yaml b/yamls/data-serve-disk.yaml new file mode 100644 index 0000000..4ab4ced --- /dev/null +++ b/yamls/data-serve-disk.yaml @@ -0,0 +1,47 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: data-serve-daemonset-disk + labels: + app.kubernetes.io/name: data-serve-disk +spec: + selector: + matchLabels: + app: data-serve + template: + metadata: + labels: + app: data-serve + spec: + tolerations: + # these tolerations are to have the daemonset runnable on control plane nodes + # remove them if your control plane nodes should not run pods + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + - key: node-role.kubernetes.io/master + operator: Exists + effect: NoSchedule + containers: + - name: data-serve-container + image: dasbd72/data-serve:latest + imagePullPolicy: Always + ports: + - containerPort: 12348 + hostPort: 12348 + volumeMounts: + - name: shared-volume + mountPath: /shared + env: + - name: STORAGE_PATH + value: /shared + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: DATA_SERVE_PORT + value: "12348" + volumes: + - name: shared-volume + hostPath: + path: /storage-disk diff --git a/yamls/data-serve.yaml b/yamls/data-serve.yaml new file mode 100644 index 0000000..146e05b --- /dev/null +++ b/yamls/data-serve.yaml @@ -0,0 +1,47 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: data-serve-daemonset + labels: + app.kubernetes.io/name: data-serve +spec: + selector: + matchLabels: + app: data-serve + template: + metadata: + labels: + app: data-serve + spec: + tolerations: + # these tolerations are to have the daemonset runnable on control plane nodes + # remove them if your control plane nodes should not run pods + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + - key: node-role.kubernetes.io/master + operator: Exists + effect: NoSchedule + containers: + - name: data-serve-container + image: dasbd72/data-serve:latest + imagePullPolicy: Always + ports: + - containerPort: 12347 + hostPort: 12347 + volumeMounts: + - name: shared-volume + mountPath: /shared + env: + - name: STORAGE_PATH + value: /shared + - name: HOST_IP + valueFrom: + fieldRef: + fieldPath: status.hostIP + - name: DATA_SERVE_PORT + value: "12347" + volumes: + - name: shared-volume + hostPath: + path: "/dev/shm"