Skip to content

Commit

Permalink
Add tests for extracting paths from message in lambda batcher (#2757)
Browse files Browse the repository at this point in the history
* add sqs event ops for batcher lambda

* Apply auto-formatting rules

* oops missing file

* Apply auto-formatting rules

---------

Co-authored-by: Github on behalf of Wellcome Collection <wellcomedigitalplatform@wellcome.ac.uk>
  • Loading branch information
kenoir and weco-bot authored Nov 12, 2024
1 parent 8f72b5b commit fb32f8d
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package weco.pipeline.batcher
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import grizzled.slf4j.Logging
import com.amazonaws.services.lambda.runtime.events.SQSEvent
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage
import org.apache.pekko.actor.ActorSystem
import weco.messaging.typesafe.SNSBuilder
import weco.json.JsonUtil._
import com.typesafe.config.ConfigFactory
import weco.typesafe.config.builders.EnrichConfig.RichConfig

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.util.Try

object LambdaMain extends RequestHandler[SQSEvent, String] with Logging {
import weco.pipeline.batcher.lib.SQSEventOps._

private val config = ConfigFactory.load("application")

private val downstream = config.getString("batcher.use_downstream") match {
Expand All @@ -35,29 +35,13 @@ object LambdaMain extends RequestHandler[SQSEvent, String] with Logging {

PathsProcessor(
config.requireInt("batcher.max_batch_size"),
extractPathsFromEvent(event),
event.extractPaths,
downstream
)

"Done"
}

/** Messages consumed by this Lambda are taken from a queue populated by an
* SNS topic. The actual message we are interested in is a String containing
* the path. However, the matryoshka-like nature of these things means the
* lambda receives
* - an event containing
* - a `Records` list, each Record containing
* - an SQS Message with a JSON body containing
* - an SNS notification containing
* - a `Message`, which is the actual content we want
*/
private def extractPathsFromEvent(event: SQSEvent): List[String] =
event.getRecords.asScala.toList.flatMap(extractPathFromMessage)

private def extractPathFromMessage(message: SQSMessage): Option[String] =
ujson.read(message.getBody).obj.get("Message").map(_.toString)

private object SNSDownstream extends Downstream {
private val msgSender = SNSBuilder
.buildSNSMessageSender(config, subject = "Sent from batcher")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package weco.pipeline.batcher.lib

import com.amazonaws.services.lambda.runtime.events.SQSEvent
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage
import scala.collection.JavaConverters._

object SQSEventOps {

/** Messages consumed by Lambda from SQS are taken from a queue populated by
* an SNS topic. The actual message we are interested in is a String
* containing the path. However, the matryoshka-like nature of these things
* means the lambda receives
* - an event containing
* - a `Records` list, each Record containing
* - an SQS Message with a JSON body containing
* - an SNS notification containing
* - a `Message`, which is the actual content we want
*/
implicit class ExtractPathFromSqsEvent(event: SQSEvent) {
def extractPaths: List[String] =
event.getRecords.asScala.toList.flatMap(extractPathFromMessage)

private def extractPathFromMessage(message: SQSMessage): Option[String] =
ujson.read(message.getBody).obj.get("Message").flatMap(_.strOpt)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package weco.pipeline.batcher

import com.amazonaws.services.lambda.runtime.events.SQSEvent
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import scala.collection.JavaConverters._

class SQSEventOpsTest extends AnyFunSpec with Matchers {
import lib.SQSEventOps._

describe("Using the implicit class SQSEventOps") {
it("extracts paths from an SQSEvent") {
val fakeMessage = new SQSMessage()
fakeMessage.setBody("{\"Message\":\"A/C\"}")
val fakeSQSEvent = new SQSEvent()
fakeSQSEvent.setRecords(List(fakeMessage).asJava)

val paths = fakeSQSEvent.extractPaths

paths shouldBe List("A/C")
}
}
}

0 comments on commit fb32f8d

Please sign in to comment.