forked from codeborne/klite
-
Notifications
You must be signed in to change notification settings - Fork 0
/
PostgresNotifier.kt
56 lines (48 loc) · 2.09 KB
/
PostgresNotifier.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package klite.jdbc
import klite.Extension
import klite.Server
import klite.register
import klite.require
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import org.postgresql.PGConnection
import org.postgresql.PGNotification
import java.sql.Connection
import javax.sql.DataSource
import kotlin.concurrent.thread
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
@Deprecated("Experimental", level = DeprecationLevel.WARNING)
class PostgresNotifier<K: Any>(val events: Iterable<K>): Extension {
private val channels = events.associate { it.toString() to Channel<String>(UNLIMITED) }
private lateinit var db: DataSource
fun send(event: K, payload: String = "") = db.notify(event.toString(), payload)
suspend fun receive(event: K) = channels[event.toString()]!!.receive()
override fun install(server: Server) = server.run {
db = require<DataSource>()
val listener = thread(name = this::class.simpleName) {
db.consumeNotifications(events.map { it.toString() }) {
channels[it.name]?.trySend(it.parameter)
}
}
register(this)
server.onStop { listener.interrupt() }
}
}
/** Send Postgres notification to the specified channel. Delivered after commit */
fun DataSource.notify(channel: String, payload: String = "") = withStatement("select pg_notify(?, ?)") {
setAll(sequenceOf(channel, payload))
executeQuery().run { next() }
}
/** Dedicate a separate thread to listen to Postgres notifications and send them to the corresponding channels. */
fun DataSource.consumeNotifications(events: Iterable<String>, timeout: Duration = 10.seconds, consumer: (notification: PGNotification) -> Unit) = withConnection {
listen(events)
while (!Thread.interrupted()) {
pgNotifications(timeout).forEach { consumer(it) }
}
}
fun Connection.listen(events: Iterable<String>) = createStatement().use { s ->
events.forEach { s.execute("listen $it") }
}
fun Connection.pgNotifications(timeout: Duration): Array<PGNotification> =
unwrap(PGConnection::class.java).getNotifications(timeout.inWholeMilliseconds.toInt())