Skip to content

Commit

Permalink
Merge pull request #21 from nebula-contrib/upgrade-client
Browse files Browse the repository at this point in the history
upgrade client to 3.6.1 and use testcontainers-neubula
  • Loading branch information
jxnu-liguobin authored Oct 10, 2023
2 parents 8bb5a91 + c485509 commit 89b6362
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 80 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ jobs:
- name: Checking Code style
run: sbt check

- name: Run NebulaGraph
run: docker-compose -f examples/src/main/resources/docker-compose.yaml up -d

- name: Run tests
run: sbt test

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ There are the version correspondence between zio-nebula and nebula-java:
|:-----:|:----------:|:-----------:|
| 2.0.x | 0.0.x | 3.6.0 |
| 2.0.x | 0.1.x | 3.6.0 |
| 2.0.x | 0.1.1 | 3.6.1 |


## Example
Expand All @@ -57,7 +58,7 @@ object NebulaSessionClientExample {
object NebulaSessionClientMain extends ZIOAppDefault {

override def run = (for {
_ <- ZIO.serviceWithZIO[NebulaSessionClient](_.init())
_ <- ZIO.serviceWithZIO[NebulaSessionClient](_.init()) // since 0.1.1, no need to call it manually.
_ <- ZIO.serviceWithZIO[NebulaSessionClientExample](
_.execute("""
|INSERT VERTEX person(name, age) VALUES
Expand Down
30 changes: 16 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
val zioVersion = "2.0.13"
val scala3_Version = "3.3.1"
val scala2_13Version = "2.13.12"
val scala2_12Version = "2.12.18"
val zioConfigVersion = "4.0.0-RC16"
val nebulaClientVersion = "3.6.0"
val logbackVersion = "1.4.11"
val silencerVersion = "1.4.2"
val zioVersion = "2.0.13"
val scala3_Version = "3.3.1"
val scala2_13Version = "2.13.12"
val scala2_12Version = "2.12.18"
val zioConfigVersion = "4.0.0-RC16"
val nebulaClientVersion = "3.6.1"
val logbackVersion = "1.4.11"
val silencerVersion = "1.4.2"
val testcontainersNebulaVersion = "0.1.0"

val supportCrossVersionList = Seq(scala3_Version, scala2_13Version, scala2_12Version)

Expand Down Expand Up @@ -42,13 +43,14 @@ lazy val core = project
name := "zio-nebula",
crossScalaVersions := supportCrossVersionList,
libraryDependencies ++= Seq(
"com.vesoft" % "client" % nebulaClientVersion,
"dev.zio" %% "zio-config-typesafe" % zioConfigVersion,
"dev.zio" %% "zio-config-magnolia" % zioConfigVersion,
"dev.zio" %% "zio" % zioVersion,
"com.vesoft" % "client" % nebulaClientVersion,
"dev.zio" %% "zio-config-typesafe" % zioConfigVersion,
"dev.zio" %% "zio-config-magnolia" % zioConfigVersion,
"dev.zio" %% "zio" % zioVersion,
// see https://github.com/zio/zio-config/issues/1245
"com.github.ghik" %% "silencer-lib" % silencerVersion % Provided cross CrossVersion.for3Use2_13,
"ch.qos.logback" % "logback-classic" % logbackVersion % Test
"com.github.ghik" %% "silencer-lib" % silencerVersion % Provided cross CrossVersion.for3Use2_13,
"ch.qos.logback" % "logback-classic" % logbackVersion % Test,
"io.github.jxnu-liguobin" %% "testcontainers-nebula" % testcontainersNebulaVersion % Test
) ++ _zioTests.map(_ % Test),
Test / parallelExecution := false,
testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/zio/nebula/NebulaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ final case class NebulaSessionPoolConfig(
intervalTimeMills: Int = 0,
healthCheckTimeSeconds: Int = 600,
cleanTimeSeconds: Int = 3600,
reconnect: Boolean = false
reconnect: Boolean = false,
useHttp2: Boolean = false
)

object NebulaConfig {
Expand Down
56 changes: 25 additions & 31 deletions core/src/main/scala/zio/nebula/NebulaSessionClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,36 +55,30 @@ trait NebulaSessionClient {

object NebulaSessionClient {

private def sessionLayer: ZLayer[NebulaSessionPoolConfig & Scope, Throwable, SessionPool] =
ZLayer.fromZIO {
ZIO.serviceWithZIO[NebulaSessionPoolConfig](nebulaConfig =>
ZIO.acquireRelease(
ZIO.attempt(
new SessionPool(
new SessionPoolConfig(
nebulaConfig.address.map(d => new HostAddress(d.host, d.port)).asJava,
nebulaConfig.spaceName,
nebulaConfig.auth.username,
nebulaConfig.auth.password
).setMaxSessionSize(nebulaConfig.maxSessionSize)
.setMinSessionSize(nebulaConfig.minSessionSize)
.setRetryTimes(nebulaConfig.retryTimes)
.setWaitTime(nebulaConfig.waitTimeMills)
.setIntervalTime(nebulaConfig.intervalTimeMills)
.setTimeout(nebulaConfig.timeoutMills)
.setCleanTime(nebulaConfig.cleanTimeSeconds)
.setReconnect(nebulaConfig.reconnect)
.setHealthCheckTime(nebulaConfig.healthCheckTimeSeconds)
)
)
)(release => ZIO.attempt(release.close()).onError(e => ZIO.logErrorCause(e)).ignoreLogged)
)
}

lazy val layer: ZLayer[NebulaSessionPoolConfig & Scope, Nothing, NebulaSessionClient] = {
val pool = ZLayer.fromZIO(
ZIO.serviceWith[SessionPool](new NebulaSessionClientLive(_))
)
(sessionLayer >>> pool).orDie
lazy val layer: ZLayer[Scope with NebulaSessionPoolConfig, Throwable, NebulaSessionClient] = ZLayer.fromZIO {
for {
nebulaConfig <- ZIO.service[NebulaSessionPoolConfig]
sessionPool <- ZIO.acquireRelease(
ZIO.attempt(
new SessionPool(
new SessionPoolConfig(
nebulaConfig.address.map(d => new HostAddress(d.host, d.port)).asJava,
nebulaConfig.spaceName,
nebulaConfig.auth.username,
nebulaConfig.auth.password
).setMaxSessionSize(nebulaConfig.maxSessionSize)
.setMinSessionSize(nebulaConfig.minSessionSize)
.setRetryTimes(nebulaConfig.retryTimes)
.setWaitTime(nebulaConfig.waitTimeMills)
.setIntervalTime(nebulaConfig.intervalTimeMills)
.setTimeout(nebulaConfig.timeoutMills)
.setCleanTime(nebulaConfig.cleanTimeSeconds)
.setReconnect(nebulaConfig.reconnect)
.setHealthCheckTime(nebulaConfig.healthCheckTimeSeconds)
.setUseHttp2(nebulaConfig.useHttp2)
)
)
)(release => ZIO.attempt(release.close()).ignoreLogged)
} yield new NebulaSessionClientLive(sessionPool)
}
}
3 changes: 3 additions & 0 deletions core/src/main/scala/zio/nebula/NebulaSessionClientLive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ private[nebula] final class NebulaSessionClientLive(underlying: SessionPool) ext

override def close(): Task[Unit] = ZIO.attempt(underlying.close())

@deprecated(
"init the SessionPool this function is moved into SessionPool's constructor, no need to call it manually."
)
override def init(): Task[Boolean] = ZIO.attempt(underlying.init())

}
2 changes: 1 addition & 1 deletion core/src/main/scala/zio/nebula/net/NebulaClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ object NebulaClient {
ZIO.attempt(d.close()).onError(e => ZIO.logErrorCause(e)).ignoreLogged
)

lazy val layer: ZLayer[Scope, Nothing, NebulaClient] =
lazy val layer: ZLayer[Scope, Throwable, NebulaClient] =
ZLayer.fromZIO(makePool.map(pool => new NebulaClientLive(pool)))
}
8 changes: 4 additions & 4 deletions core/src/main/scala/zio/nebula/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@ package object nebula {
type Storage = NebulaStorageClient
type Meta = NebulaMetaClient

lazy val SessionClientEnv: ZLayer[Scope, Nothing, SessionClient] = ZLayer.makeSome[Scope, SessionClient](
lazy val SessionClientEnv: ZLayer[Scope, Throwable, SessionClient] = ZLayer.makeSome[Scope, SessionClient](
NebulaSessionClient.layer,
NebulaConfig.sessionConfigLayer
)

lazy val ClientEnv: ZLayer[Scope, Nothing, Client] =
lazy val ClientEnv: ZLayer[Scope, Throwable, Client] =
ZLayer.makeSome[Scope, Client](
NebulaClient.layer,
NebulaConfig.poolConfigLayer,
NebulaConfig.sessionConfigLayer
)

lazy val StorageEnv: ZLayer[Scope, Nothing, Storage] = ZLayer.makeSome[Scope, Storage](
lazy val StorageEnv: ZLayer[Scope, Throwable, Storage] = ZLayer.makeSome[Scope, Storage](
NebulaStorageClient.layer,
NebulaConfig.storageConfigLayer
)

lazy val MetaEnv: ZLayer[Scope, Nothing, Meta] = ZLayer.makeSome[Scope, Meta](
lazy val MetaEnv: ZLayer[Scope, Throwable, Meta] = ZLayer.makeSome[Scope, Meta](
NebulaMetaClient.layer,
NebulaConfig.metaConfigLayer
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ trait NebulaStorageClient {

object NebulaStorageClient {

lazy val layer: ZLayer[NebulaStorageConfig & Scope, Nothing, NebulaStorageClient] =
lazy val layer: ZLayer[NebulaStorageConfig & Scope, Throwable, NebulaStorageClient] =
ZLayer.fromZIO {
for {
config <- ZIO.serviceWith[NebulaStorageConfig](_.underlying)
Expand All @@ -44,5 +44,5 @@ object NebulaStorageClient {
)(_.close().onError(e => ZIO.logErrorCause(e)).ignoreLogged)
} yield manger

}.orDie
}
}
49 changes: 33 additions & 16 deletions core/src/test/scala/zio/nebula/NebulaClientSpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package zio.nebula

import zio.ZIO
import zio.{ Scope, ZIO }
import zio.nebula.meta.NebulaMetaClient
import zio.nebula.net.{ NebulaClient, Stmt }
import zio.nebula.storage.{ NebulaStorageClient, ScanEdge }
import zio.test._

Expand Down Expand Up @@ -35,22 +34,10 @@ object NebulaClientSpec extends NebulaSpec {
|MATCH (p:person) RETURN p LIMIT 4;
|""".stripMargin

lazy val session = ZioNebulaEnvironment.defaultSession(container.graphdHostList.head, container.graphdPortList.head)

def specLayered: Spec[Nebula, Throwable] =
suite("nebula suite")(
suite("nebula session pool")(
test("create and query") {
for {
init <- ZIO.serviceWithZIO[NebulaSessionClient](_.init())
_ <- ZIO.logInfo(s"init session: $init")
res1 <- ZIO.serviceWithZIO[NebulaSessionClient](_.execute(insertVertexes))
_ <- ZIO.logInfo(s"exec insert vertex: ${res1.errorMessage}")
res2 <- ZIO.serviceWithZIO[NebulaSessionClient](_.execute(insertEdges))
_ <- ZIO.logInfo(s"exec insert edge: ${res2.errorMessage}")
res3 <- ZIO.serviceWithZIO[NebulaSessionClient](_.execute(query))
_ <- ZIO.logInfo(s"exec query ${res3.errorMessage}")
} yield assertTrue(res3.rows.size == 4)
}
),
suite("nebula meta manager")(
test("query") {
for {
Expand All @@ -72,6 +59,36 @@ object NebulaClientSpec extends NebulaSpec {
_ <- ZIO.logInfo(s"scan result: $scanResult")
} yield assertTrue(scanResult.hasNext)
}
),
suite("nebula session pool")(
test("create and query") {
for {
res1 <-
ZIO
.serviceWithZIO[NebulaSessionClient](_.execute(insertVertexes))
.provide(
Scope.default,
session
)
_ <- ZIO.logInfo(s"exec insert vertex: ${res1.errorMessage}")
res2 <-
ZIO
.serviceWithZIO[NebulaSessionClient](_.execute(insertEdges))
.provide(
Scope.default,
session
)
_ <- ZIO.logInfo(s"exec insert edge: ${res2.errorMessage}")
res3 <-
ZIO
.serviceWithZIO[NebulaSessionClient](_.execute(query))
.provide(
Scope.default,
session
)
_ <- ZIO.logInfo(s"exec query ${res3.errorMessage}")
} yield assertTrue(res3.rows.size == 4)
}
)
)

Expand Down
15 changes: 10 additions & 5 deletions core/src/test/scala/zio/nebula/NebulaSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ import zio.nebula.net.{ NebulaClient, Stmt }
import zio.test._
import zio.test.TestAspect._

import testcontainers.containers.NebulaSimpleClusterContainer

trait NebulaSpec extends ZIOSpecDefault {

type Nebula = Client with SessionClient with Storage with Meta with Scope
type Nebula = Client with Storage with Meta with Scope

val container: NebulaSimpleClusterContainer = new NebulaSimpleClusterContainer(subnetIp = "172.30.0.0/16")

container.start()

override def aspects: Chunk[TestAspectAtLeastR[TestEnvironment]] =
Chunk(TestAspect.fibers, TestAspect.timeout(180.seconds))
Expand All @@ -26,10 +32,9 @@ trait NebulaSpec extends ZIOSpecDefault {
) @@ sequential @@ eventually)
.provideShared(
Scope.default,
MetaEnv,
StorageEnv,
SessionClientEnv,
ClientEnv
ZioNebulaEnvironment.defaultMeta(container.metadHostList.head, container.metadPortList.head),
ZioNebulaEnvironment.defaultStorage(container.metadHostList.head, container.metadPortList.head),
ZioNebulaEnvironment.defaultClient(container.graphdHostList.head, container.graphdPortList.head)
)

def specLayered: Spec[Nebula, Throwable]
Expand Down
84 changes: 84 additions & 0 deletions core/src/test/scala/zio/nebula/ZioNebulaEnvironment.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package zio.nebula

import zio._
import zio.nebula.meta.NebulaMetaClient
import zio.nebula.net.NebulaClient
import zio.nebula.storage.NebulaStorageClient

/**
* This is the default configuration dedicated to testing.
*
* @author
* 梦境迷离
* @version 1.0,2023/9/19
*/
object ZioNebulaEnvironment {

var defaultUser = "root"
var defaultPwd = "nebula"
var defaultSpace = "test"

def defaultSession(host: String, port: Int): ZLayer[Scope, Throwable, SessionClient] =
ZLayer.makeSome[Scope, SessionClient](
NebulaSessionClient.layer,
ZLayer.succeed(
NebulaSessionPoolConfig(
List(NebulaHostAddress(host, port)),
NebulaAuth(defaultUser, defaultPwd),
defaultSpace
)
)
)

def defaultClient(host: String, port: Int): ZLayer[Scope, Throwable, Client] =
NebulaClient.layer ++ ZLayer.succeed(
NebulaPoolConfig(
timeoutMills = 60000,
enableSsl = false,
minConnsSize = 10,
maxConnsSize = 10,
intervalIdleMills = 100,
waitTimeMills = 100,
sslParam = None
)
) ++ ZLayer.fromZIO(
ZIO.attempt(
NebulaSessionPoolConfig(
List(NebulaHostAddress(host, port)),
NebulaAuth(defaultUser, defaultPwd),
defaultSpace
)
)
)

def defaultStorage(host: String, port: Int): ZLayer[Scope, Throwable, Storage] =
ZLayer.makeSome[Scope, Storage](
NebulaStorageClient.layer,
ZLayer.succeed(
NebulaStorageConfig(
NebulaConfig(
List(NebulaHostAddress(host, port)),
30000,
3,
3
)
)
)
)

def defaultMeta(host: String, port: Int): ZLayer[Scope, Throwable, Meta] =
ZLayer.makeSome[Scope, Meta](
NebulaMetaClient.layer,
ZLayer.succeed(
NebulaMetaConfig(
NebulaConfig(
List(NebulaHostAddress(host, port)),
30000,
3,
3
)
)
)
)

}
Loading

0 comments on commit 89b6362

Please sign in to comment.