diff --git a/clossyne-py/README.md b/clossyne-py/README.md index 72e21bc..b9cd63d 100644 --- a/clossyne-py/README.md +++ b/clossyne-py/README.md @@ -40,8 +40,9 @@ False ``` ## Commands -SET: Inserts a key-value pair into the Clossyne store. Returns True if the insert was successful, False otherwise. -GET: Retrieves the value associated with a given key. Returns False if the key does not exist. -DEL: Deletes a key-value pair from the Clossyne store. Returns True if the deletion was successful, False otherwise. -RNG: Retrieves a range of key-value pairs from the Clossyne store. -EXT: Closes the connection to the Clossyne server. Sent automatically when the with context ends. + +- SET: Inserts a key-value pair into the Clossyne store. Returns True if the insert was successful, False otherwise. +- GET: Retrieves the value associated with a given key. Returns False if the key does not exist. +- DEL: Deletes a key-value pair from the Clossyne store. Returns True if the deletion was successful, False otherwise. +- RNG: Retrieves a range of key-value pairs from the Clossyne store. +- EXT: Closes the connection to the Clossyne server. Sent automatically when the with context ends. diff --git a/clossyne-py/clossyne/core.py b/clossyne-py/clossyne/core.py index a1888fc..e46d1ef 100644 --- a/clossyne-py/clossyne/core.py +++ b/clossyne-py/clossyne/core.py @@ -35,10 +35,12 @@ def disconnect(self): self.sock = None def handle_response(self, command, response): - if(command == "SET"): + if (command == "GET" or command == "RNG"): + # OperationResult event + return None if (response == "NS" or response == "NIL") else response + else: + # OperationFinished event return response == "OK" - elif (command == "GET"): - return response def send(self, command: str, *args): command = command.upper() @@ -48,7 +50,7 @@ def send(self, command: str, *args): try: # sendall guarantees that all the data is sent before returning - self.sock.sendall(str.encode(command + "\n")) + self.sock.sendall(str.encode(command)) response = self.sock.recv(1024).decode() return self.handle_response(command[:3], response) diff --git a/clossyne-py/test.py b/clossyne-py/test.py index bce8297..a1c2589 100644 --- a/clossyne-py/test.py +++ b/clossyne-py/test.py @@ -27,12 +27,53 @@ def get_next_pair(self): value = str({k:v for (k, v) in zip(self.reader.headers, row)}) return key, value - def perform(self): - with clossyne.Clossyne('localhost', 4297) as c: + def perform_delete_leaf(self): + with clossyne.Clossyne('0.0.0.0', 4297) as c: key, value = self.get_next_pair() if (c.set(key, value)): + print("Value has been saved") saved_value = c.get(key) - print(saved_value) + print("GET ", key, ": ", saved_value) + print("Delete ", key) + print(c.delete(key)) + print("GET ", key, ": ", c.get(key)) + + def perform_delete_child(self): + with clossyne.Clossyne('localhost', 4297) as c: + print(c.set("a", "val")) + print(c.set("b", "value")) + print(c.delete("a")) + print(c.get("a")) + print(c.get("b")) + + def perform_delete_children(self): + with clossyne.Clossyne('localhost', 4297) as c: + print(c.set("d", "val")) + print(c.set("b", "value")) + print(c.set("a", "value")) + print(c.set("c", "value")) + print(c.set("f", "value")) + print(c.set("e", "value")) + print(c.set("g", "value")) + print(c.delete("d")) + print(c.get("d")) + print(c.get("b")) + + with clossyne.Clossyne('localhost', 4297) as c: + print(c.get("d")) + print(c.get("b")) + + def perform_dataset(self, num): + with clossyne.Clossyne('0.0.0.0', 4297) as c: + for i in range(0, num): + key, value = self.get_next_pair() + c.set(key, value) + print("Delete Standing: ", c.delete("Standing")) + print("Delete Mike: ", c.delete("Mike")) + print("Get Ogren: ", c.get("Ogren")) + print("Get Bergman: ", c.get("Bergman")) + print("Get Nadia: ", c.get("Nadia")) + def clean(self): self.reader.close() @@ -40,6 +81,6 @@ def clean(self): if __name__ == '__main__': test_class = TestClass('test/data/names.csv') - test_class.perform() + test_class.perform_dataset(400) test_class.clean() diff --git a/clossyne/.idea/modules/clossyne-build.iml b/clossyne/.idea/modules/clossyne-build.iml index 1eedb48..fe54b4a 100644 --- a/clossyne/.idea/modules/clossyne-build.iml +++ b/clossyne/.idea/modules/clossyne-build.iml @@ -1,5 +1,5 @@ - + diff --git a/clossyne/README.md b/clossyne/README.md index 2881ca8..1ae20cd 100644 --- a/clossyne/README.md +++ b/clossyne/README.md @@ -39,7 +39,7 @@ java -jar target/scala-2.12/clossyne.jar Alternatively, you can also run the Docker image of Clossyne by using the following command: ``` -docker run -e CLOSSYNE_HOST=localhost -e CLOSSYNE_PORT=4297 -p 4297:4297 clossyne:latest +docker run -e CLOSSYNE_HOST="0.0.0.0" -p 4297:4297 christophsonntag/clossyne:1.0 ``` This will start the Clossyne service on the specified host and port, and make it accessible via the specified port on the Docker host. \ No newline at end of file diff --git a/clossyne/build.sbt b/clossyne/build.sbt index 15d555a..cf182b0 100644 --- a/clossyne/build.sbt +++ b/clossyne/build.sbt @@ -1,6 +1,6 @@ name := "clossyne" -version := "0.1" +version := "1.0" scalaVersion := "2.13.10" diff --git a/clossyne/src/main/resources/logback.xml b/clossyne/src/main/resources/logback.xml index 3f7b669..b6ea1fe 100644 --- a/clossyne/src/main/resources/logback.xml +++ b/clossyne/src/main/resources/logback.xml @@ -3,17 +3,21 @@ - DEBUG + INFO - [%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n + + [%date{ISO8601}] [%level] [%logger] %n%msg%n%n - target/myapp-dev.log + target/clossyne-test.log + + INFO + - [%date{ISO8601}] [%level] [%logger] [%marker] [%thread] - %msg MDC: {%mdc}%n + [%date{ISO8601}] [%level] [%logger]:%n%msg%n%n diff --git a/clossyne/src/main/scala/handler/MessageHandler.scala b/clossyne/src/main/scala/handler/MessageHandler.scala index ff4f886..ef1bbac 100644 --- a/clossyne/src/main/scala/handler/MessageHandler.scala +++ b/clossyne/src/main/scala/handler/MessageHandler.scala @@ -5,7 +5,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Props} import akka.util.ByteString import akka.io.{IO, Tcp} import operations._ -import operations.{OperationReply, OperationFinished, GetResult} +import operations.{OperationReply, OperationFinished, OperationResult} import searchtree.BinaryTree @@ -48,7 +48,7 @@ class MessageHandler(val client: ActorRef, val binaryTree: ActorRef) extends Act case OperationFinished(succeeded: Boolean, _: Option[String]) => val response: String = if(succeeded) "OK" else "NS" this.client ! Write(ByteString.fromString(response)) - case GetResult(succeeded: Boolean, value: Option[String]) => + case OperationResult(succeeded: Boolean, value: Option[String]) => val response: String = if (succeeded) value.getOrElse("NIL") else "NS" this.client ! Write(ByteString.fromString(response)) } diff --git a/clossyne/src/main/scala/operations/DeleteForward.scala b/clossyne/src/main/scala/operations/DeleteForward.scala new file mode 100644 index 0000000..0eb7fc4 --- /dev/null +++ b/clossyne/src/main/scala/operations/DeleteForward.scala @@ -0,0 +1,6 @@ +package com.christophsonntag.clossyne +package operations + +import akka.actor.ActorRef + +case class DeleteForward(requester: ActorRef, middleware: ActorRef, key: String) extends OperationForward diff --git a/clossyne/src/main/scala/operations/OperationForward.scala b/clossyne/src/main/scala/operations/OperationForward.scala new file mode 100644 index 0000000..ff115ce --- /dev/null +++ b/clossyne/src/main/scala/operations/OperationForward.scala @@ -0,0 +1,10 @@ +package com.christophsonntag.clossyne +package operations + +import akka.actor.ActorRef + +trait OperationForward { + def requester: ActorRef + def middleware: ActorRef + def key: String +} diff --git a/clossyne/src/main/scala/operations/OperationForwardReply.scala b/clossyne/src/main/scala/operations/OperationForwardReply.scala new file mode 100644 index 0000000..3910fae --- /dev/null +++ b/clossyne/src/main/scala/operations/OperationForwardReply.scala @@ -0,0 +1,10 @@ +package com.christophsonntag.clossyne +package operations + +import akka.actor.ActorRef + +sealed trait OperationForwardReply { + def succeeded: Boolean + def destination: ActorRef +} +case class OperationForwardFinished(succeeded: Boolean, destination: ActorRef) extends OperationForwardReply diff --git a/clossyne/src/main/scala/operations/OperationReply.scala b/clossyne/src/main/scala/operations/OperationReply.scala index 44abf51..6e65afc 100644 --- a/clossyne/src/main/scala/operations/OperationReply.scala +++ b/clossyne/src/main/scala/operations/OperationReply.scala @@ -6,4 +6,4 @@ sealed trait OperationReply { def value: Option[String] } case class OperationFinished(succeeded: Boolean, value: Option[String]) extends OperationReply -case class GetResult(succeeded: Boolean, value: Option[String]) extends OperationReply +case class OperationResult(succeeded: Boolean, value: Option[String]) extends OperationReply diff --git a/clossyne/src/main/scala/searchtree/BinaryTree.scala b/clossyne/src/main/scala/searchtree/BinaryTree.scala index de94703..a47638e 100644 --- a/clossyne/src/main/scala/searchtree/BinaryTree.scala +++ b/clossyne/src/main/scala/searchtree/BinaryTree.scala @@ -1,17 +1,40 @@ package com.christophsonntag.clossyne package searchtree -import akka.actor.{Actor, ActorRef, Props} -import operations.{Operation, OperationReply} +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import operations.{Delete, DeleteForward, Operation, OperationFinished, OperationForwardFinished, OperationForwardReply} +import scala.collection.immutable.Queue -class BinaryTree extends Actor { + +class BinaryTree extends Actor with ActorLogging { // immutable root - val root: ActorRef = context.actorOf(Props(classOf[BinaryTreeNode], "root", "root", true), "clossyneRootNode") + val root: ActorRef = context.actorOf(BinaryTreeNode.props("root", "root", ActorRef.noSender), "clossyneRootNode") + var pendingOperations: Queue[Operation] = Queue.empty[Operation] def receive: Receive = { - case op: Operation => root ! op - case opReply: OperationReply => context.parent ! opReply + case op: Operation => + op match { + case Delete(requester, key) => + log.debug("Delete received: Changing context now") + context.become(nodeDeletion) + root ! DeleteForward(requester, self, key) + case _ => root ! op + } + case opReply: OperationForwardFinished => opReply.destination ! OperationFinished(opReply.succeeded, None) + } + + def nodeDeletion: Receive = { + case op: Operation => + log.debug(s"Enqueuing operation ${op}") + pendingOperations.enqueue(op) + case opReply: OperationForwardFinished => + log.debug(s"Delete operation finished. Change context and send enqueued operations.") + context.become(receive) + + opReply.destination ! OperationFinished(opReply.succeeded, None) + pendingOperations.map(self ! _) + pendingOperations = Queue.empty } } diff --git a/clossyne/src/main/scala/searchtree/BinaryTreeNode.scala b/clossyne/src/main/scala/searchtree/BinaryTreeNode.scala index 55c8319..cf65400 100644 --- a/clossyne/src/main/scala/searchtree/BinaryTreeNode.scala +++ b/clossyne/src/main/scala/searchtree/BinaryTreeNode.scala @@ -1,7 +1,7 @@ package com.christophsonntag.clossyne package searchtree -import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props} import operations._ @@ -10,14 +10,23 @@ object BinaryTreeNode { case object Left extends Position case object Right extends Position - def props(key: String, value: String, initiallyRemoved: Boolean) = Props(classOf[BinaryTreeNode], key, value, initiallyRemoved) + // Events that are only sent between BinaryTreeNodes + case class DeleteChild(node: ActorRef, key: String, replyRoute: (ActorRef, ActorRef)) + case class ReplaceChild(original: ActorRef, replacement: ActorRef, key: String, replyRoute: (ActorRef, ActorRef)) + case class ReplaceParent(newParent: ActorRef) + case class MovePredecessor(newParent: ActorRef, newSubtrees: Map[Position, ActorRef], originalActor: ActorRef, originalKey: String, replyRoute: (ActorRef, ActorRef)) + + def props(k: String, v: String, parentActor: ActorRef) = Props(classOf[BinaryTreeNode], k, v, parentActor) } -class BinaryTreeNode(val key: String, val value: String, initiallyRemoved: Boolean) extends Actor with ActorLogging { +class BinaryTreeNode(val k: String, val v: String, parentActor: ActorRef) extends Actor with ActorLogging { import searchtree.BinaryTreeNode._ - var subtrees = Map[Position, ActorRef]() - var removed = initiallyRemoved + var subtrees: Map[Position, ActorRef] = Map[Position, ActorRef]() + var parent: ActorRef = parentActor + + val key: String = k + val value: String = v def getChildFor(key: String): Position = { /** @@ -28,14 +37,14 @@ class BinaryTreeNode(val key: String, val value: String, initiallyRemoved: Boole def receive: Receive = { case Set(requester, key, value) => - log.debug(s"Set ${key}:${value} as requested by ${requester}") - if (key != this.key || (key == this.key && this.removed)) { + log.debug(s"Set ${key}:${value}") + if (key != this.key) { val child = getChildFor(key) subtrees.get(child) match { - // Forward insert to child node if it exist, create one otherwise. + // Forward insert to child node if it exists, create one otherwise. case Some(actor) => actor ! Set(requester, key, value) case None => - subtrees += (child -> context.actorOf(Props(classOf[BinaryTreeNode], key, value, false))) + subtrees += (child -> context.actorOf(BinaryTreeNode.props(key, value, self), s"${key}")) requester ! OperationFinished(succeeded = true, None) } } else { @@ -43,14 +52,86 @@ class BinaryTreeNode(val key: String, val value: String, initiallyRemoved: Boole } case Get(requester, key) => - if (key != this.key || (key == this.key && this.removed)) { + log.debug(s"Get ${key}. Here is ${this.key}") + if (key != this.key) { val child = getChildFor(key) subtrees.get(child) match { case Some(actor) => actor ! Get(requester, key) - case None => requester ! GetResult(succeeded = false, None) + case None => requester ! OperationResult(succeeded = false, None) + } + } else { + log.info(s"Accessed pair on node ${self}:\n${key}: ${value}") + requester ! OperationResult(succeeded = true, Some(this.value)) + } + + case DeleteChild(node, key, replyRoute) => + if (subtrees.values.exists(_ == node)) { + // Check if requester is in subtree => Remove it + val child = getChildFor(key) + log.debug(s"Deleting the $child child") + subtrees = subtrees.-(child) + + if (replyRoute._1 != ActorRef.noSender) + replyRoute._1 ! OperationForwardFinished(succeeded = true, destination = replyRoute._2) + } + + case ReplaceChild(original, replacement, key, replyRoute) => + if (subtrees.values.exists(_ == original)) { + val position = getChildFor(key) + log.debug(s"Replacing the $position child with ${replacement}") + subtrees = subtrees.updated(position, replacement) + + replacement ! ReplaceParent(parent) + + replyRoute._1 ! OperationForwardFinished(succeeded = true, destination = replyRoute._2) + } + + case ReplaceParent(newParent) => + this.parent = newParent + + case MovePredecessor(newParent, newSubtrees, originalActor, originalKey, replyRoute) => + subtrees.get(Right) match { + case Some(actor) => actor ! MovePredecessor(newParent, newSubtrees, originalActor, originalKey, replyRoute) + case None => + this.parent ! DeleteChild(self, this.key, (ActorRef.noSender, ActorRef.noSender)) + this.parent = newParent + this.subtrees = newSubtrees + newParent ! ReplaceChild(originalActor, self, originalKey, replyRoute) + } + + case DeleteForward(requester, middleware, key) => + if (key != this.key) { + // Forward deletion message to respective child + log.debug(s"Search for ${key}, this is ${this.key}") + val child = getChildFor(key) + subtrees.get(child) match { + case Some(actor) => actor ! DeleteForward(requester, middleware, key); log.debug("Forward to next node. ") + case None => middleware ! OperationForwardFinished(succeeded = false, destination = requester) } } else { - requester ! GetResult(succeeded = true, Some(this.value)) + // This is the actor that should be deleted. + log.debug("Node for deletion found.") + log.info(s"Deleted pair on node ${self}:\n${key}: ${value}") + subtrees.size match { + case 0 => + // Leaf node: Send deletion message to parent + log.debug("Leaf node: Send deletion to parent") + if (parent != ActorRef.noSender) { + parent ! DeleteChild(self, key, replyRoute = (middleware, requester)) + } + context.stop(self) + case 1 => + // One child: Replace position in the parent node with this actor and its whole subtree + val (_, actor) = subtrees.head + log.debug(s"One child: Replace this position in parent with ${actor}") + parent ! ReplaceChild(self, actor, key, replyRoute = (middleware, requester)) + case 2 => + // Replace with maximum value in left subtree, then delete this leaf. + subtrees.get(Left) match { + case Some(actor) => actor ! MovePredecessor(parent, subtrees, self, key, replyRoute = (middleware, requester)) + case None => middleware ! OperationForwardFinished(succeeded = false, destination = requester) + } + } } } }