Skip to content

Commit

Permalink
data serve wrapper & server [beta]
Browse files Browse the repository at this point in the history
  • Loading branch information
dasbd72 committed Sep 24, 2023
1 parent 3a33da1 commit b224731
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 61 deletions.
10 changes: 10 additions & 0 deletions buildup.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/bin/bash
docker rmi johnson684/manager:golang-socket -f
docker rmi dasbd72/data-serve:latest -f
docker rmi johnson684/image-scale:python-socket -f
docker rmi johnson684/image-recognition:python-socket -f
docker rmi johnson684/cache-deleter:python -f
Expand All @@ -9,6 +10,11 @@ docker build -t johnson684/manager:golang-socket .
docker push johnson684/manager:golang-socket
cd ..

cd data-serve
docker build -t dasbd72/data-serve:latest .
docker push dasbd72/data-serve:latest
cd ..

cd cache-deleter
docker build -t johnson684/cache-deleter:python .
docker push johnson684/cache-deleter:python
Expand Down Expand Up @@ -42,21 +48,25 @@ kubectl apply -f yamls/pvc-disk.yaml
kubectl delete -f yamls/apps.yaml
kubectl delete -f yamls/new-apps.yaml
kubectl delete -f yamls/manager.yaml
kubectl delete -f yamls/data-serve.yaml
kubectl delete -f yamls/cache-deleter.yaml


kubectl apply -f yamls/apps.yaml
kubectl apply -f yamls/new-apps.yaml
kubectl apply -f yamls/manager.yaml
kubectl apply -f yamls/data-serve.yaml
kubectl apply -f yamls/cache-deleter.yaml

# disk storage
kubectl delete -f yamls/apps-disk.yaml
kubectl delete -f yamls/manager-disk.yaml
kubectl delete -f yamls/data-serve-disk.yaml
kubectl delete -f yamls/cache-deleter-disk.yaml

kubectl apply -f yamls/apps-disk.yaml
kubectl apply -f yamls/manager-disk.yaml
kubectl apply -f yamls/data-serve-disk.yaml
kubectl apply -f yamls/cache-deleter-disk.yaml


98 changes: 98 additions & 0 deletions data-serve/cmd/serve/serve.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"bufio"
"encoding/json"
"io"
"log"
"net"
"os"
)

var (
storagePath string
hostIP string
dataServePort string
)

func init() {
// read storage path from environment variable
storagePath = os.Getenv("STORAGE_PATH")

// read host ip from environment variable
hostIP = os.Getenv("HOST_IP")
dataServePort = os.Getenv("DATA_SERVE_PORT")
log.Printf("IP:PORT %s:%s\n", hostIP, dataServePort)

// Write Data Serve IP:PORT to file
f, err := os.Create(storagePath + "/DATA_SERVE_IP")
if err != nil {
log.Fatal(err)
}
defer f.Close()

_, err = f.WriteString(hostIP + ":" + dataServePort)
if err != nil {
log.Fatal(err)
}
}

func main() {
handleConnections()
}

func handleConnections() {
listener, err := net.Listen("tcp", ":"+dataServePort)
if err != nil {
log.Fatal(err)
}
defer listener.Close()

log.Println("Data Server is running on " + hostIP + ":" + dataServePort)

for {
conn, err := listener.Accept()
if err != nil {
log.Println("[Error] listener.Accept(): ", err)
continue
}

go handleConnection(conn)
}
}

func handleConnection(conn net.Conn) {
var req Request
reader := bufio.NewReader(conn)

// read request from client
err := json.NewDecoder(reader).Decode(&req)
if err != nil {
log.Println(err)
return
}

// Handle the request
switch req.Type {
case "download":
// open file
file, err := os.Open(req.Body)
if err != nil {
log.Println(err)
return
}
defer file.Close()

// send file
_, err = io.Copy(conn, file)
if err != nil {
log.Println(err)
return
}

log.Printf("File %s sent to %d", req.Body, conn.RemoteAddr())
}

// close connection
conn.Close()
}
6 changes: 6 additions & 0 deletions data-serve/cmd/serve/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package main

type Request struct {
Type string `json:"type"`
Body string `json:"body"`
}
Empty file added data-serve/go.sum
Empty file.
92 changes: 31 additions & 61 deletions pkg/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,6 @@
)


# class Manager:
# def __init__(self, endpoint) -> None:
# self.exist = False
# self.endpoint: str = endpoint
# self.connection: socket.socket = None

# if "STORAGE_PATH" in os.environ.keys():
# self.storage_path = os.environ["STORAGE_PATH"]
# else:
# self.storage_path = None
# logging.info(f"STORAGE_PATH: {self.storage_path}")

# if os.path.exists(os.path.join(self.storage_path, "MANAGER_IP")):
# with open(os.path.join(self.storage_path, "MANAGER_IP"), "r") as f:
# self.manager_ip = f.read()
# else:
# self.manager_ip = None

# self.manager_port = os.environ.get("MANAGER_PORT")

# if self.storage_path is not None and self.manager_ip is not None and self.manager_port is not None:
# self.exist = True
# logging.info(f"MANAGER: {self.manager_ip}:{self.manager_port}")

# try:
# self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.connection.connect((self.manager_ip, self.manager_port))
# except:
# logging.error("connection failed")
# self.exist = False

# def close(self) -> None:
# if self.connection is not None:
# self.connection.close()

# def send_recv(self, data: str, bufsize: int = 128) -> str:
# if self.connection is None:
# logging.error("connection is None")
# return "{}"
# try:
# # Send data
# self.connection.send(data.encode("utf-8"))

# # Receive data
# result = self.connection.recv(bufsize).decode("utf-8")
# except:
# logging.error("send_recv failed")
# return "{}"
# else:
# result = json.JSONDecoder().decode(result)
# if "success" in result.keys() and result["success"]:
# return result["body"]
# else:
# return "{}"


class MinioWrapper(Minio):
"""Inherited Wrapper"""

Expand Down Expand Up @@ -122,10 +66,10 @@ def __init__(
# Read data serve ip:port from storage
if os.path.exists(os.path.join(self.storage_path, "DATA_SERVE_IP")):
with open(os.path.join(self.storage_path, "DATA_SERVE_IP"), "r") as f:
self.data_serve_ip = f.read()
self.data_serve_ip_port = f.read()
else:
self.data_serve_ip = None
logging.info(f"DATA_SERVE_IP: {self.data_serve_ip}")
self.data_serve_ip_port = None
logging.info(f"DATA_SERVE_IP: {self.data_serve_ip_port}")

def fput_object(
self,
Expand Down Expand Up @@ -167,7 +111,7 @@ def copy_to_local():

save_hash_to_file(calculate_hash(local_dst), self.get_hash_file_path(local_dst))

self.etcd_client.put(file_path, self.data_serve_ip)
self.etcd_client.put(file_path, self.data_serve_ip_port)
logging.info(
"read value from etcd:{}".format(
self.etcd_client.get(file_path)
Expand Down Expand Up @@ -239,7 +183,7 @@ def download_from_cluster():
if self.force_remote or remote_download:
return False

return False
return downloadFromDataServe(self.data_serve_ip_port, local_src, file_path)

if copy_from_local():
return
Expand Down Expand Up @@ -310,3 +254,29 @@ def verify_hash(file_path, hash_file_path, hash_algorithm="sha256"):
hash_value = file.read()
verifySuccess = hash_value == calculated_hash
return verifySuccess


def downloadFromDataServe(data_serve_ip_port: str, remote_path: str, file_path: str, chunk_size: int = 4096):
"""Download file from data serve."""
# TCP Connection
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
ip, port = data_serve_ip_port.split(":")
conn.connect((ip, port))
except Exception as e:
logging.error("Data Serve download {} failed {}".format(remote_path, e))
return False

# Send request
req = {"type": "download", "body": remote_path}
conn.send(json.dumps(req).encode('utf-8'))

# Receive file
with open(file_path, 'wb') as fi:
while True:
data = conn.recv(chunk_size)
if not data:
break
fi.write(data)

return True
47 changes: 47 additions & 0 deletions yamls/data-serve-disk.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: data-serve-daemonset-disk
labels:
app.kubernetes.io/name: data-serve-disk
spec:
selector:
matchLabels:
app: data-serve
template:
metadata:
labels:
app: data-serve
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: data-serve-container
image: dasbd72/data-serve:latest
imagePullPolicy: Always
ports:
- containerPort: 12348
hostPort: 12348
volumeMounts:
- name: shared-volume
mountPath: /shared
env:
- name: STORAGE_PATH
value: /shared
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: DATA_SERVE_PORT
value: "12348"
volumes:
- name: shared-volume
hostPath:
path: /storage-disk
47 changes: 47 additions & 0 deletions yamls/data-serve.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: data-serve-daemonset
labels:
app.kubernetes.io/name: data-serve
spec:
selector:
matchLabels:
app: data-serve
template:
metadata:
labels:
app: data-serve
spec:
tolerations:
# these tolerations are to have the daemonset runnable on control plane nodes
# remove them if your control plane nodes should not run pods
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
- key: node-role.kubernetes.io/master
operator: Exists
effect: NoSchedule
containers:
- name: data-serve-container
image: dasbd72/data-serve:latest
imagePullPolicy: Always
ports:
- containerPort: 12347
hostPort: 12347
volumeMounts:
- name: shared-volume
mountPath: /shared
env:
- name: STORAGE_PATH
value: /shared
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: DATA_SERVE_PORT
value: "12347"
volumes:
- name: shared-volume
hostPath:
path: "/dev/shm"

0 comments on commit b224731

Please sign in to comment.