From fb32f8d9945c4042b1dbd3d5636c884074a5d5fb Mon Sep 17 00:00:00 2001 From: Robert Kenny Date: Tue, 12 Nov 2024 10:00:02 +0000 Subject: [PATCH] Add tests for extracting paths from message in lambda batcher (#2757) * add sqs event ops for batcher lambda * Apply auto-formatting rules * oops missing file * Apply auto-formatting rules --------- Co-authored-by: Github on behalf of Wellcome Collection --- .../weco/pipeline/batcher/LambdaMain.scala | 22 +++------------- .../pipeline/batcher/lib/SQSEventOps.scala | 26 +++++++++++++++++++ .../pipeline/batcher/SQSEventOpsTest.scala | 24 +++++++++++++++++ 3 files changed, 53 insertions(+), 19 deletions(-) create mode 100644 pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/SQSEventOps.scala create mode 100644 pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/SQSEventOpsTest.scala diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/LambdaMain.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/LambdaMain.scala index 075710a206..b588e2afc0 100644 --- a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/LambdaMain.scala +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/LambdaMain.scala @@ -3,18 +3,18 @@ package weco.pipeline.batcher import com.amazonaws.services.lambda.runtime.{Context, RequestHandler} import grizzled.slf4j.Logging import com.amazonaws.services.lambda.runtime.events.SQSEvent -import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage import org.apache.pekko.actor.ActorSystem import weco.messaging.typesafe.SNSBuilder import weco.json.JsonUtil._ import com.typesafe.config.ConfigFactory import weco.typesafe.config.builders.EnrichConfig.RichConfig -import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext import scala.util.Try object LambdaMain extends RequestHandler[SQSEvent, String] with Logging { + import weco.pipeline.batcher.lib.SQSEventOps._ + private val config = ConfigFactory.load("application") private val downstream = config.getString("batcher.use_downstream") match { @@ -35,29 +35,13 @@ object LambdaMain extends RequestHandler[SQSEvent, String] with Logging { PathsProcessor( config.requireInt("batcher.max_batch_size"), - extractPathsFromEvent(event), + event.extractPaths, downstream ) "Done" } - /** Messages consumed by this Lambda are taken from a queue populated by an - * SNS topic. The actual message we are interested in is a String containing - * the path. However, the matryoshka-like nature of these things means the - * lambda receives - * - an event containing - * - a `Records` list, each Record containing - * - an SQS Message with a JSON body containing - * - an SNS notification containing - * - a `Message`, which is the actual content we want - */ - private def extractPathsFromEvent(event: SQSEvent): List[String] = - event.getRecords.asScala.toList.flatMap(extractPathFromMessage) - - private def extractPathFromMessage(message: SQSMessage): Option[String] = - ujson.read(message.getBody).obj.get("Message").map(_.toString) - private object SNSDownstream extends Downstream { private val msgSender = SNSBuilder .buildSNSMessageSender(config, subject = "Sent from batcher") diff --git a/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/SQSEventOps.scala b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/SQSEventOps.scala new file mode 100644 index 0000000000..e27e19128a --- /dev/null +++ b/pipeline/relation_embedder/batcher/src/main/scala/weco/pipeline/batcher/lib/SQSEventOps.scala @@ -0,0 +1,26 @@ +package weco.pipeline.batcher.lib + +import com.amazonaws.services.lambda.runtime.events.SQSEvent +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage +import scala.collection.JavaConverters._ + +object SQSEventOps { + + /** Messages consumed by Lambda from SQS are taken from a queue populated by + * an SNS topic. The actual message we are interested in is a String + * containing the path. However, the matryoshka-like nature of these things + * means the lambda receives + * - an event containing + * - a `Records` list, each Record containing + * - an SQS Message with a JSON body containing + * - an SNS notification containing + * - a `Message`, which is the actual content we want + */ + implicit class ExtractPathFromSqsEvent(event: SQSEvent) { + def extractPaths: List[String] = + event.getRecords.asScala.toList.flatMap(extractPathFromMessage) + + private def extractPathFromMessage(message: SQSMessage): Option[String] = + ujson.read(message.getBody).obj.get("Message").flatMap(_.strOpt) + } +} diff --git a/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/SQSEventOpsTest.scala b/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/SQSEventOpsTest.scala new file mode 100644 index 0000000000..01b45f7838 --- /dev/null +++ b/pipeline/relation_embedder/batcher/src/test/scala/weco/pipeline/batcher/SQSEventOpsTest.scala @@ -0,0 +1,24 @@ +package weco.pipeline.batcher + +import com.amazonaws.services.lambda.runtime.events.SQSEvent +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage +import org.scalatest.funspec.AnyFunSpec +import org.scalatest.matchers.should.Matchers +import scala.collection.JavaConverters._ + +class SQSEventOpsTest extends AnyFunSpec with Matchers { + import lib.SQSEventOps._ + + describe("Using the implicit class SQSEventOps") { + it("extracts paths from an SQSEvent") { + val fakeMessage = new SQSMessage() + fakeMessage.setBody("{\"Message\":\"A/C\"}") + val fakeSQSEvent = new SQSEvent() + fakeSQSEvent.setRecords(List(fakeMessage).asJava) + + val paths = fakeSQSEvent.extractPaths + + paths shouldBe List("A/C") + } + } +}