Skip to content

Commit

Permalink
Merge pull request #142 from davenverse/aggressivelyCloseSockets
Browse files Browse the repository at this point in the history
Aggressively Close Sockets
  • Loading branch information
ChristopherDavenport authored Apr 15, 2024
2 parents 30df9ac + b71d7b3 commit c04f08d
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ object RedisConnection{
) extends RedisConnection[F]{
def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = {
val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk, Defaults.maxBytes, redisRequestTimeout)
pool.take(()).use{
m => withSocket(m.value).timeout(redisRequestTimeout).attempt.flatTap{
m => explicitPipelineRequest[F](m.value, chunk, Defaults.maxBytes, redisRequestTimeout, m.canBeReused.set(Reusable.DontReuse)).timeout(redisRequestTimeout).attempt.flatTap{
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
Expand All @@ -60,7 +59,7 @@ object RedisConnection{
private[rediculous] case class DirectConnection[F[_]: Temporal](socket: Socket[F], commandTimeout: Duration, redisRequestTimeout: Duration) extends RedisConnection[F]{
def runRequest(inputs: Chunk[NonEmptyList[ByteVector]], key: Option[ByteVector]): F[Chunk[Resp]] = {
val chunk = Chunk.from(inputs.toList.map(Resp.renderRequest))
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk, Defaults.maxBytes, redisRequestTimeout)
def withSocket(socket: Socket[F]): F[Chunk[Resp]] = explicitPipelineRequest[F](socket, chunk, Defaults.maxBytes, redisRequestTimeout, socket.endOfOutput)
withSocket(socket)
}.timeoutTo(commandTimeout, Defer[F].defer(Temporal[F].raiseError[Chunk[Resp]](RedisError.CommandTimeoutException(commandTimeout))))
}
Expand All @@ -78,14 +77,18 @@ object RedisConnection{

// Guarantees With Socket That Each Call Receives a Response
// Chunk must be non-empty but to do so incurs a penalty
private[rediculous] def explicitPipelineRequest[F[_]: Temporal](socket: Socket[F], calls: Chunk[Resp], maxBytes: Int, redisRequestTimeout: Duration): F[Chunk[Resp]] = {
private[rediculous] def explicitPipelineRequest[F[_]: Temporal](socket: Socket[F], calls: Chunk[Resp], maxBytes: Int, redisRequestTimeout: Duration, removeConnection: F[Unit]): F[Chunk[Resp]] = {
val out = calls.flatMap(resp =>
Resp.CodecUtils.codec.encode(resp).toEither.traverse(bits => Chunk.byteVector(bits.bytes))
).sequence.leftMap(err => new Throwable(s"Failed To Encode Response $err")).liftTo[F]
out.flatMap{bytes =>

val request = socket.write(bytes) >>
Stream.eval(socket.read(maxBytes))
.evalTap{
case None => removeConnection
case _ => Applicative[F].unit
}
.repeat
.unNoneTerminate
.unchunks
Expand All @@ -95,6 +98,7 @@ object RedisConnection{
.to(Chunk)

request.timeoutTo(redisRequestTimeout, Defer[F].defer(Temporal[F].raiseError[Chunk[Resp]](RedisError.RedisRequestTimeoutException(redisRequestTimeout))))
.onError{ case _ => removeConnection }
}
}

Expand Down Expand Up @@ -497,7 +501,7 @@ object RedisConnection{
keypool.take(()).attempt.use{
case Right(m) =>
val out = chunk.map(_._2)
explicitPipelineRequest(m.value, out, Defaults.maxBytes, redisRequestTimeout)
explicitPipelineRequest(m.value, out, Defaults.maxBytes, redisRequestTimeout, m.canBeReused.set(Reusable.DontReuse))
.attempt
.timeout(redisRequestTimeout) // Apply Timeout To Call to Redis, this is independent of the timeout on individual calls
.flatTap{// Currently Guarantee Chunk.size === returnSize
Expand Down Expand Up @@ -718,7 +722,7 @@ object RedisConnection{
keypool.take(server).attempt.use{
case Right(m) =>
val out = Chunk.from(rest.map(_._5))
explicitPipelineRequest(m.value, out, Defaults.maxBytes, redisRequestTimeout).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
explicitPipelineRequest(m.value, out, Defaults.maxBytes, redisRequestTimeout, m.canBeReused.set(Reusable.DontReuse)).attempt.flatTap{// Currently Guarantee Chunk.size === returnSize
case Left(_) => m.canBeReused.set(Reusable.DontReuse)
case _ => Applicative[F].unit
}
Expand Down

0 comments on commit c04f08d

Please sign in to comment.