Skip to content

Commit

Permalink
feat: implement interface-level discovery of dubbo registry-zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Nov 10, 2023
1 parent 6e1b3bd commit 5130b53
Show file tree
Hide file tree
Showing 14 changed files with 3,233 additions and 6 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/cloudwego/kitex v0.6.2-0.20230815060351-88ea60530d40
github.com/cloudwego/thriftgo v0.3.0
github.com/dubbogo/tools v1.0.9
github.com/go-zookeeper/zk v1.0.3
github.com/stretchr/testify v1.8.2
golang.org/x/net v0.17.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,8 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
Expand Down
11 changes: 5 additions & 6 deletions pkg/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package dubbo
import (
"context"
"fmt"
"github.com/kitex-contrib/codec-dubbo/registries"

"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/codec"
Expand Down Expand Up @@ -101,12 +102,10 @@ func (m *DubboCodec) encodeRequestPayload(ctx context.Context, message remote.Me
service := &dubbo_spec.Service{
ProtocolVersion: dubbo_spec.DEFAULT_DUBBO_PROTOCOL_VERSION,
Path: m.opt.JavaClassName,
// todo: kitex mapping
Version: "",
Method: message.RPCInfo().Invocation().MethodName(),
Timeout: message.RPCInfo().Config().RPCTimeout(),
// todo: kitex mapping
Group: "",
Version: message.RPCInfo().To().DefaultTag(registries.DubboServiceVersionKey, ""),
Method: message.RPCInfo().Invocation().MethodName(),
Timeout: message.RPCInfo().Config().RPCTimeout(),
Group: message.RPCInfo().To().DefaultTag(registries.DubboServiceGroupKey, ""),
}
if err = m.messageServiceInfo(ctx, service, encoder); err != nil {
return nil, err
Expand Down
67 changes: 67 additions & 0 deletions registries/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package registries

import (

Check failure on line 22 in registries/common.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
"github.com/cloudwego/kitex/pkg/discovery"
"net/url"

Check failure on line 24 in registries/common.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
)

const (
DubboServiceProtocolKey = "dubbo-service-protocol"
DubboServiceGroupKey = "dubbo-service-group"
DubboServiceVersionKey = "dubbo-service-version"

DefaultDubboServiceWeight = 100
)

var RegistryServicesKey = "/%s/%s/providers"

type URL struct {
protocol string
host string
interfacePath string

Check failure on line 40 in registries/common.go

View workflow job for this annotation

GitHub Actions / lint

field `interfacePath` is unused (unused)

Check failure on line 40 in registries/common.go

View workflow job for this annotation

GitHub Actions / staticcheck

[staticcheck] reported by reviewdog 🐶 field interfacePath is unused Raw Output: {"source":{"name":"staticcheck","url":"https://staticcheck.io"},"message":"field interfacePath is unused","code":{"value":"U1000","url":"https://staticcheck.io/docs/checks#U1000"},"location":{"path":"/data00/runner/kitex-contrib-sg-runner-05/codec-dubbo/codec-dubbo/registries/common.go","range":{"start":{"line":40,"column":2}}},"severity":"ERROR"}
params url.Values
}

func (u *URL) FromString(raw string) error {
decodedRaw, err := url.PathUnescape(raw)
if err != nil {
return err
}
rawURL, err := url.Parse(decodedRaw)
if err != nil {
return err
}
u.protocol = rawURL.Scheme
u.host = rawURL.Host
u.params = rawURL.Query()
return nil
}

func (u *URL) ToInstance() discovery.Instance {
params := map[string]string{
DubboServiceProtocolKey: u.protocol,
DubboServiceGroupKey: u.params.Get("group"),
DubboServiceVersionKey: u.params.Get("version"),
}
// todo(DMwangnima): figure out dubbo weight mechanism and set default weight here temporarily.
return discovery.NewInstance("tcp", u.host, DefaultDubboServiceWeight, params)
}
111 changes: 111 additions & 0 deletions registries/zookeeper/resolver/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package resolver

import (
"time"
)

type Options struct {
Servers []string
InterfaceName string
RegistryGroup string
ServiceGroup string
ServiceVersion string
SessionTimeout time.Duration
}

func (o *Options) Apply(opts []Option) {
for _, opt := range opts {
opt.F(o)
}
}

func newOptions(opts []Option) *Options {
o := &Options{}

o.Apply(opts)

if len(o.Servers) <= 0 {
panic("Please specify at least one zookeeper server address. e.g. WithServers(\"127.0.0.1:2181\")")
}
if o.InterfaceName == "" {
panic("Please specify target InterfaceName. e.g. WithInterfaceName(\"org.cloudwego.kitex.samples.api.GreetProvider\")")
}
if o.SessionTimeout == 0 {
o.SessionTimeout = defaultSessionTimeout
}
return o
}

type Option struct {
F func(o *Options)
}

// WithServers configures target zookeeper servers that zookeeperResolver would connect to.
// Please specify at least one server address, e.g. WithServers("127.0.0.1:2181")
func WithServers(servers ...string) Option {
return Option{F: func(o *Options) {
o.Servers = servers
}}
}

// WithInterfaceName configures the Interface of the target dubbo Service.
// This configuration must be set, e.g. WithInterfaceName("org.cloudwego.kitex.samples.api.GreetProvider")
func WithInterfaceName(name string) Option {
return Option{F: func(o *Options) {
o.InterfaceName = name
}}
}

// WithRegistryGroup configures the group of the zookeepers serving the target dubbo Service.
// In dubbo side, this group is referred to RegistryConfig.version.
func WithRegistryGroup(group string) Option {
return Option{F: func(o *Options) {
o.RegistryGroup = group
}}
}

// WithServiceGroup configures the group of the target dubbo Service.
// In dubbo side, this group is referred to ServiceConfig.group.
func WithServiceGroup(group string) Option {
return Option{F: func(o *Options) {
o.ServiceGroup = group
}}
}

// WithServiceVersion configures the version of the target dubbo Service.
// In dubbo side, this version is referred to ServiceConfig.version.
func WithServiceVersion(version string) Option {
return Option{F: func(o *Options) {
o.ServiceVersion = version
}}
}

// WithSessionTimeout configures the amount of time for which a session
// is considered valid after losing connection to a server.
// Within the session timeout it's possible to reestablish a connection
// to a different server and keep the same session.
// The default SessionTimeout would be 3 * time.Second
func WithSessionTimeout(timeout time.Duration) Option {
return Option{F: func(o *Options) {
o.SessionTimeout = timeout
}}
}
95 changes: 95 additions & 0 deletions registries/zookeeper/resolver/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package resolver

import (
"context"
"fmt"

Check failure on line 24 in registries/zookeeper/resolver/resolver.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
"github.com/cloudwego/kitex/pkg/discovery"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/go-zookeeper/zk"
"github.com/kitex-contrib/codec-dubbo/registries"
"strings"
"time"
)

const (
defaultSessionTimeout = 30 * time.Second
)

type zookeeperResolver struct {
conn *zk.Conn
opt *Options
}

func NewZookeeperResolver(opts ...Option) (discovery.Resolver, error) {
o := newOptions(opts)
conn, _, err := zk.Connect(o.Servers, o.SessionTimeout)
if err != nil {
return nil, err
}
return &zookeeperResolver{
conn: conn,
opt: o,
}, nil
}

func (z *zookeeperResolver) Target(ctx context.Context, target rpcinfo.EndpointInfo) (description string) {
return fmt.Sprintf(registries.RegistryServicesKey, z.opt.RegistryGroup, z.opt.InterfaceName)
}

func (z *zookeeperResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) {
fmt.Printf("opt.Group: %s, opt.Version: %s\n", z.opt.ServiceGroup, z.opt.ServiceVersion)
rawURLs, _, err := z.conn.Children(desc)
if err != nil {
return discovery.Result{}, err
}
instances := make([]discovery.Instance, 0, len(rawURLs))
for _, rawURL := range rawURLs {
u := new(registries.URL)
if err := u.FromString(rawURL); err != nil {
klog.Errorf("invalid dubbo URL from zookeeper: %s, err :%s", rawURL, err)
continue
}
tmpInstance := u.ToInstance()
if group, _ := tmpInstance.Tag(registries.DubboServiceGroupKey); group != z.opt.ServiceGroup {
continue
}
if ver, _ := tmpInstance.Tag(registries.DubboServiceVersionKey); ver != z.opt.ServiceVersion {
continue
}
instances = append(instances, tmpInstance)
}
return discovery.Result{
Cacheable: true,
CacheKey: desc,
Instances: instances,
}, nil
}

func (z *zookeeperResolver) Diff(cacheKey string, prev, next discovery.Result) (discovery.Change, bool) {
return discovery.DefaultDiff(cacheKey, prev, next)
}

func (z *zookeeperResolver) Name() string {
// todo(DMwangnima): consider this Name since we do not want share a common Resolver
return strings.Join([]string{"dubbo-zookeeper", z.opt.RegistryGroup, z.opt.InterfaceName, z.opt.ServiceGroup, z.opt.ServiceVersion}, ":")
}
6 changes: 6 additions & 0 deletions tests/dubbo-go/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ func (u *UserProviderImpl) EchoByte(ctx context.Context, req int8) (int8, error)
// }
//

type UserProviderImplV1 struct{}

func (u *UserProviderImplV1) EchoBool(ctx context.Context, req bool) (bool, error) {
return !req, nil
}

func init() {
hessian.RegisterPOJO(&User{}) // Register all transmission struct to hessian lib
// these POJOs would override POJOs registered by Kitex generation with same JavaClassName.
Expand Down
7 changes: 7 additions & 0 deletions tests/dubbo-java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ mvn -Djava.net.preferIPv4Stack=true -Dexec.mainClass=org.apache.dubbo.tests.clie
mvn clean package
mvn -Djava.net.preferIPv4Stack=true -Dexec.mainClass=org.apache.dubbo.tests.provider.Application exec:java
```

## Start the service provider

```bash
mvn clean package
mvn -Djava.net.preferIPv4Stack=true -Dexec.mainClass=org.apache.dubbo.tests.provider.RegistryApplication exec:java
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.apache.dubbo.tests.provider;

import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.ServiceConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.tests.api.UserProvider;

import java.util.ArrayList;
import java.util.List;

public class RegistryApplication {
public static void main(String[] args) {
ServiceConfig<UserProvider> service = new ServiceConfig<>();
service.setInterface(UserProvider.class);
service.setRef(new UserProviderImpl());

ServiceConfig<UserProvider> service1 = new ServiceConfig<>();
service1.setInterface(UserProvider.class);
service1.setRef(new UserProviderImplV1());
service1.setGroup("g1");
service1.setVersion("v1");

List<ServiceConfig> list = new ArrayList<>();
list.add(service);
list.add(service1);

String zookeeperAddress = "zookeeper://127.0.0.1:2181";
RegistryConfig zookeeper = new RegistryConfig(zookeeperAddress);
zookeeper.setGroup("myGroup");
zookeeper.setRegisterMode("interface");

DubboBootstrap.getInstance()
.application("first-dubbo-provider")
.registry(zookeeper)
.protocol(new ProtocolConfig("dubbo", 20001))
.services(list)
.start()
.await();
}
}
Loading

0 comments on commit 5130b53

Please sign in to comment.