Skip to content

Commit

Permalink
Upgraded AWS, added EC2 dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
orbang committed Jul 24, 2023
1 parent 115523a commit 5931563
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 6 deletions.
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
name := "scase"

ThisBuild / organization := "io.jobial"
ThisBuild / scalaVersion := "2.13.8"
ThisBuild / scalaVersion := "2.11.12"
ThisBuild / crossScalaVersions := Seq("2.11.12", "2.12.15", "2.13.8")
ThisBuild / version := "1.3.0"
ThisBuild / scalacOptions += "-target:jvm-1.8"
Expand Down Expand Up @@ -46,7 +46,7 @@ lazy val CatsTestkitScalatestVersion = "1.0.0-RC1"
lazy val ScalaLoggingVersion = "3.9.2"
lazy val ScalatestVersion = "3.2.3"
lazy val SourcecodeVersion = "0.2.3"
lazy val AwsVersion = "1.11.557"
lazy val AwsVersion = "1.12.471"
lazy val AmazonSqsJavaExtendedClientLibVersion = "1.2.2"
lazy val AwsLambdaJavaCoreVersion = "1.2.1"
lazy val CommonsIoVersion = "2.8.0"
Expand Down Expand Up @@ -105,6 +105,7 @@ lazy val `scase-aws` = project
"com.amazonaws" % "aws-java-sdk-cloudformation" % AwsVersion excludeAll ("commons-logging"),
"com.amazonaws" % "aws-lambda-java-core" % AwsLambdaJavaCoreVersion excludeAll ("commons-logging"),
"com.amazonaws" % "aws-java-sdk-sts" % AwsVersion excludeAll ("commons-logging"),
"com.amazonaws" % "aws-java-sdk-ec2" % AwsVersion excludeAll ("commons-logging"),
"org.typelevel" %% "cats-core" % CatsVersion,
"org.typelevel" %% "cats-effect" % CatsVersion,
"com.typesafe.scala-logging" %% "scala-logging" % ScalaLoggingVersion
Expand Down Expand Up @@ -218,7 +219,9 @@ lazy val `scase-tools` = project
"io.lemonlabs" %% "scala-uri" % ScalaUriVersion,
"org.apache.pulsar" % "pulsar-client-admin" % PulsarVersion
),
Compile / packageBin / mappings ~= { _.filter(!_._1.getName.endsWith("logback.xml")) }
Compile / packageBin / mappings ~= {
_.filter(!_._1.getName.endsWith("logback.xml"))
}
)
.dependsOn(`scase-core` % "compile->compile;test->test")
.dependsOn(`scase-pulsar`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
*/
package io.jobial.scase.aws.client

import cats.Parallel
import cats.effect.Concurrent
import cats.effect.Timer

import java.util.concurrent.{ExecutionException, ExecutorService, Executors}
import com.amazonaws.ClientConfiguration
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.client.builder.{AwsAsyncClientBuilder, AwsSyncClientBuilder, ExecutorFactory}
import com.amazonaws.endpointdiscovery.DaemonThreadFactory
import io.jobial.scase.core.impl.CatsUtils
import io.jobial.scase.logging.Logging

import scala.concurrent.Future.failed
import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -32,7 +35,6 @@ trait AwsClient[F[_]] extends CatsUtils with Logging {

protected implicit def timer: Timer[F]


/**
* Clients are supposed to be thread safe: https://forums.aws.amazon.com/message.jspa?messageID=191621
*/
Expand Down
129 changes: 129 additions & 0 deletions scase-aws/src/main/scala/io/jobial/scase/aws/client/EC2Client.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package io.jobial.scase.aws.client

import cats.Parallel
import cats.effect.Concurrent
import cats.effect.Timer
import cats.implicits._
import com.amazonaws.client.builder.ExecutorFactory
import com.amazonaws.endpointdiscovery.DaemonThreadFactory
import com.amazonaws.services.ec2.AmazonEC2Async
import com.amazonaws.services.ec2.AmazonEC2AsyncClientBuilder
import com.amazonaws.services.ec2.model.DescribeFleetInstancesRequest
import com.amazonaws.services.ec2.model.DescribeFleetsRequest
import com.amazonaws.services.ec2.model.DescribeInstancesRequest
import com.amazonaws.services.ec2.model.DescribeSpotFleetInstancesRequest
import com.amazonaws.services.ec2.model.DescribeSpotFleetRequestsRequest
import com.amazonaws.services.ec2.model.FleetData
import com.amazonaws.services.ec2.model.ModifyFleetRequest
import com.amazonaws.services.ec2.model.ModifySpotFleetRequestRequest
import com.amazonaws.services.ec2.model.SpotFleetRequestConfig
import com.amazonaws.services.ec2.model.StartInstancesRequest
import com.amazonaws.services.ec2.model.StopInstancesRequest
import com.amazonaws.services.ec2.model.TargetCapacitySpecificationRequest
import io.jobial.scase.core.impl.CatsUtils

import scala.collection.JavaConverters._
import java.util.concurrent.Executors

object EC2Client {

val client = AmazonEC2AsyncClientBuilder.standard().withExecutorFactory(new ExecutorFactory {
def newExecutor = Executors.newCachedThreadPool(new DaemonThreadFactory)
}).build

def apply[F[_] : Concurrent : Timer : Parallel](implicit context: AwsContext) =
new S3Client[F] {
def awsContext = context

val concurrent = Concurrent[F]

val timer = Timer[F]

val parallel = Parallel[F]
}
}

trait EC2Client[F[_]] extends AwsClient[F] with CatsUtils {

def ec2Client = buildAwsAsyncClient[AmazonEC2AsyncClientBuilder, AmazonEC2Async](AmazonEC2AsyncClientBuilder.standard)

def getInstanceState(id: String) =
for {
r <- fromJavaFuture(ec2Client.describeInstancesAsync(
new DescribeInstancesRequest().withInstanceIds(id)
))
} yield r.getReservations.asScala.flatMap(_.getInstances.asScala).headOption.map(_.getState)

def startInstance(id: String) =
fromJavaFuture(ec2Client.startInstancesAsync(
new StartInstancesRequest().withInstanceIds(id)
))

def stopInstance(id: String) =
fromJavaFuture(ec2Client.stopInstancesAsync(
new StopInstancesRequest().withInstanceIds(id)
))

def describeSpotFleetRequests =
fromJavaFuture(ec2Client.describeSpotFleetRequestsAsync(
new DescribeSpotFleetRequestsRequest()
))

def describeFleets =
fromJavaFuture(ec2Client.describeFleetsAsync(
new DescribeFleetsRequest()
))

def modifySpotFleetRequests(request: ModifySpotFleetRequestRequest) =
fromJavaFuture(ec2Client
.modifySpotFleetRequestAsync(request))

def setSpotFleetTargetCapacity(id: String, capacity: Int) =
modifySpotFleetRequests(new ModifySpotFleetRequestRequest()
.withSpotFleetRequestId(id).withTargetCapacity(capacity))

def describeSpotFleetInstances(spotFleetRequestId: String) =
fromJavaFuture(ec2Client.describeSpotFleetInstancesAsync(
new DescribeSpotFleetInstancesRequest().withSpotFleetRequestId(spotFleetRequestId)
))

def describeFleetInstances(fleetId: String) =
fromJavaFuture(ec2Client.describeFleetInstancesAsync(
new DescribeFleetInstancesRequest().withFleetId(fleetId)
))

def targetCapacity(config: SpotFleetRequestConfig) =
config.getSpotFleetRequestConfig.getTargetCapacity.toInt

def spotTargetCapacity(config: FleetData) =
config.getTargetCapacitySpecification.getSpotTargetCapacity.toInt

def getSpotRequestSpotInstanceState(spotFleetRequestId: String) =
for {
r <- describeSpotFleetInstances(spotFleetRequestId)
state <- r.getActiveInstances.asScala.headOption.map(_.getInstanceId).map(getInstanceState)
.parSequence.map(_.flatten)
} yield state.map(_.getName)

def getFleetInstanceState(fleetId: String) =
for {
r <- describeFleetInstances(fleetId)
state <- r.getActiveInstances.asScala.headOption.map(_.getInstanceId).map(getInstanceState)
.parSequence.map(_.flatten)
} yield state.map(_.getName)

def setFleetSpotTargetCapacity(id: String, capacity: Int) =
for {
r <- fromJavaFuture(ec2Client.modifyFleetAsync(
new ModifyFleetRequest()
.withFleetId(id).withTargetCapacitySpecification(
new TargetCapacitySpecificationRequest()
.withSpotTargetCapacity(capacity)
.withTotalTargetCapacity(capacity)
))
)
} yield r

protected implicit def parallel: Parallel[F]
}

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package io.jobial.scase.aws.client

import cats.Parallel
import cats.effect.Concurrent
import cats.effect.Timer
import com.amazonaws.services.lambda.AWSLambdaAsync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,4 @@ class EndpointTest extends ServiceTestSupport {
test("tibrv://///subject", TibrvContext(), "subject")
test("tibrv://", TibrvContext(), "")
}


}

0 comments on commit 5931563

Please sign in to comment.