diff --git a/go.mod b/go.mod index f4e5154f..2d7761cd 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module github.com/onosproject/onos-topo go 1.12 require ( - github.com/atomix/atomix-go-client v0.0.0-20190819235619-b3cabfacacd0 - github.com/atomix/atomix-go-local v0.0.0-20190820000011-d55305ea0d8c - github.com/atomix/atomix-go-node v0.0.0-20190819235918-362f3143b084 + github.com/atomix/atomix-go-client v0.0.0-20190830184106-6ca178a89ccc + github.com/atomix/atomix-go-local v0.0.0-20190830183800-73f964b0f75a + github.com/atomix/atomix-go-node v0.0.0-20190830183818-a5b5157566f6 github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d github.com/mitchellh/go-homedir v1.1.0 github.com/onosproject/onos-config v0.0.0-20190715180819-079d3a8dc433 diff --git a/go.sum b/go.sum index a94cdec3..9c134975 100644 --- a/go.sum +++ b/go.sum @@ -16,20 +16,34 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/atomix/atomix-api v0.0.0-20190819202500-d202db6bbedb/go.mod h1:joWKUd0zIeYbAQ0vmYHGsnV03ZgRalhceHgnJ3EN0mI= github.com/atomix/atomix-api v0.0.0-20190819230829-366ccc994adb h1:5BfPSZekPTwr8SoHWZVYEj8bHtZJavTXWGRbsEs/t/4= github.com/atomix/atomix-api v0.0.0-20190819230829-366ccc994adb/go.mod h1:joWKUd0zIeYbAQ0vmYHGsnV03ZgRalhceHgnJ3EN0mI= +github.com/atomix/atomix-api v0.0.0-20190826211343-dd8f4db3bf77 h1:+PUuY9wDRp+VAg/JbEguzdOMJj6ruUw6Kw/y+QYHB6s= +github.com/atomix/atomix-api v0.0.0-20190826211343-dd8f4db3bf77/go.mod h1:joWKUd0zIeYbAQ0vmYHGsnV03ZgRalhceHgnJ3EN0mI= github.com/atomix/atomix-go-client v0.0.0-20190807011524-58b4352273d7 h1:XE+KsvclmfZjbqYDhBqeK8wHDDHYB/FiwGeLcBurYoM= github.com/atomix/atomix-go-client v0.0.0-20190807011524-58b4352273d7/go.mod h1:P/m0xcEzXviZLULk78gYlJr0mHjXCZAcem3kOkvwzhA= github.com/atomix/atomix-go-client v0.0.0-20190814013624-2b7049842ee1 h1:SW6b1TIATnNmsKrnGaoweY+uLezgUEKT61z3IGLKOts= github.com/atomix/atomix-go-client v0.0.0-20190814013624-2b7049842ee1/go.mod h1:P/m0xcEzXviZLULk78gYlJr0mHjXCZAcem3kOkvwzhA= github.com/atomix/atomix-go-client v0.0.0-20190819235619-b3cabfacacd0 h1:9FvHppD8AFmgQGG2K+OsFJGbYxGvha5tkHu5O85o6sU= github.com/atomix/atomix-go-client v0.0.0-20190819235619-b3cabfacacd0/go.mod h1:isfbdQyPZ93GKZ8KQ85dy42D8GYoUB/kN8JmT0hq/1E= +github.com/atomix/atomix-go-client v0.0.0-20190827234201-188602d4e780/go.mod h1:/UAIApUE5+Ghzu8oBVcYUoz6nCosrRPa0eUlluBtKz0= +github.com/atomix/atomix-go-client v0.0.0-20190830184106-6ca178a89ccc h1:yBus/VAiZxQhr3AavYW0YaMdhhUUp9GuHsdJUSMnnvg= +github.com/atomix/atomix-go-client v0.0.0-20190830184106-6ca178a89ccc/go.mod h1:Ap8Wz+lg4gojTXvMcKRKoYYe6ig6/FeaSLuZv7Ghlf0= github.com/atomix/atomix-go-local v0.0.0-20190819172907-cb2548c995e9 h1:EP+aAidkERynalsgvKOMU+nJwYbf0rS5GfYmlHmNDxo= github.com/atomix/atomix-go-local v0.0.0-20190819172907-cb2548c995e9/go.mod h1:iBfSnHJMJMuCEKBTWPbuTl1Y6rY+WGt+a9vYsSLTHmE= github.com/atomix/atomix-go-local v0.0.0-20190820000011-d55305ea0d8c h1:qVu0vfAkkeTFn4LgGTgCKlFp5JlPOKDFQGc7FuWtjw0= github.com/atomix/atomix-go-local v0.0.0-20190820000011-d55305ea0d8c/go.mod h1:YMZzUByozlrFK9BPkHN10WQirdPFRAkz3kEIn6c58uA= +github.com/atomix/atomix-go-local v0.0.0-20190827233944-938e35b06834/go.mod h1:qLBTOiVKoEqzYOjgxIgWFa+Hfa3SR+VexA6jGBcv0HA= +github.com/atomix/atomix-go-local v0.0.0-20190828183508-3db728c0fc3b/go.mod h1:VnwyXJvHzUHuVzzTmPhZ6/ktbBnz3CZk3aKMX7VlTmY= +github.com/atomix/atomix-go-local v0.0.0-20190830183800-73f964b0f75a h1:O/17kCIR6b+QeSkNpP12g/xTiDg976KSIS/8SSJ6Z/8= +github.com/atomix/atomix-go-local v0.0.0-20190830183800-73f964b0f75a/go.mod h1:lt/qUsFF29yT2ofmxOXfFzIz0poN22/Qa5SPdalgTKw= github.com/atomix/atomix-go-node v0.0.0-20190819174806-3d4536bf032d h1:/IgJxOQRnf+0FVlsudlPyOxh2hIRWhPOA5dSA8SpQZI= github.com/atomix/atomix-go-node v0.0.0-20190819174806-3d4536bf032d/go.mod h1:AX6dqVU12HBfSTPp0s7qqUjGD6fusaO1K0zkTukwH20= github.com/atomix/atomix-go-node v0.0.0-20190819235918-362f3143b084 h1:pp555c1uxKC8UqiifN/azV8CgNAsD3J/FQyrzwblbmA= github.com/atomix/atomix-go-node v0.0.0-20190819235918-362f3143b084/go.mod h1:rWp3DgtUt4qdHDm//ewTptIaEbFde298JSWYN90zeNo= +github.com/atomix/atomix-go-node v0.0.0-20190827191929-2d3dc9c550d9/go.mod h1:PL1T5R78itch1QC1CN4JmbRL/2XQlg4R95R14822C6Q= +github.com/atomix/atomix-go-node v0.0.0-20190828183436-fc30340cd8db/go.mod h1:dyh8Bb50qKfMlpqDE6X+dQ1tZ399WKEABa3ntDYImnA= +github.com/atomix/atomix-go-node v0.0.0-20190830183721-649263a17223/go.mod h1:KJxB/MAgndAbyCOqTV2hatw7lExiZZs7QCOr45IfC9U= +github.com/atomix/atomix-go-node v0.0.0-20190830183818-a5b5157566f6 h1:rxmMkW6vJOGYSRiqdsim5szVT9N4deSTCvlclPJwHN4= +github.com/atomix/atomix-go-node v0.0.0-20190830183818-a5b5157566f6/go.mod h1:398EUMrz8gNaqdsNDMXW2OlSp6nNPZLcC7b/QsXzl80= github.com/atomix/atomix-k8s-controller v0.0.0-20190620084759-d5e65f7fbf68/go.mod h1:vdmRfGKhgD28STeLKeKoq3tMZOyTR0l3WSG9QCrZWs0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/pkg/northbound/device/store.go b/pkg/northbound/device/store.go index 96f4ffb4..53911220 100644 --- a/pkg/northbound/device/store.go +++ b/pkg/northbound/device/store.go @@ -21,6 +21,7 @@ import ( "github.com/atomix/atomix-go-client/pkg/client/session" "github.com/atomix/atomix-go-local/pkg/atomix/local" "github.com/atomix/atomix-go-node/pkg/atomix" + "github.com/atomix/atomix-go-node/pkg/atomix/registry" "github.com/gogo/protobuf/proto" "github.com/onosproject/onos-topo/pkg/util" "google.golang.org/grpc" @@ -55,23 +56,11 @@ func NewAtomixStore() (Store, error) { // NewLocalStore returns a new local device store func NewLocalStore() (Store, error) { - lis := bufconn.Listen(1024 * 1024) - node := local.NewLocalNode(lis) - go func() { - _ = node.Start() - }() + node, conn := startLocalNode() name := primitive.Name{ Namespace: "local", Name: "devices", } - dialer := func(ctx context.Context, address string) (net.Conn, error) { - return lis.Dial() - } - - conn, err := grpc.DialContext(context.Background(), "devices", grpc.WithContextDialer(dialer), grpc.WithInsecure()) - if err != nil { - panic("Failed to dial devices") - } devices, err := _map.New(context.Background(), name, []*grpc.ClientConn{conn}) if err != nil { @@ -84,6 +73,23 @@ func NewLocalStore() (Store, error) { }, nil } +// startLocalNode starts a single local node +func startLocalNode() (*atomix.Node, *grpc.ClientConn) { + lis := bufconn.Listen(1024 * 1024) + node := local.NewNode(lis, registry.Registry) + _ = node.Start() + + dialer := func(ctx context.Context, address string) (net.Conn, error) { + return lis.Dial() + } + + conn, err := grpc.DialContext(context.Background(), "devices", grpc.WithContextDialer(dialer), grpc.WithInsecure()) + if err != nil { + panic("Failed to dial devices") + } + return node, conn +} + type nodeCloser struct { node *atomix.Node } @@ -145,7 +151,7 @@ func (s *atomixStore) Store(device *Device) error { if device.Revision == 0 { kv, err = s.devices.Put(ctx, string(device.ID), bytes) } else { - kv, err = s.devices.Put(ctx, string(device.ID), bytes, _map.WithVersion(int64(device.Revision))) + kv, err = s.devices.Put(ctx, string(device.ID), bytes, _map.IfVersion(int64(device.Revision))) } if err != nil { @@ -162,7 +168,7 @@ func (s *atomixStore) Delete(device *Device) error { defer cancel() if device.Revision > 0 { - _, err := s.devices.Remove(ctx, string(device.ID), _map.WithVersion(int64(device.Revision))) + _, err := s.devices.Remove(ctx, string(device.ID), _map.IfVersion(int64(device.Revision))) return err } _, err := s.devices.Remove(ctx, string(device.ID)) @@ -187,7 +193,7 @@ func (s *atomixStore) List(ch chan<- *Device) error { } func (s *atomixStore) Watch(ch chan<- *Event) error { - mapCh := make(chan *_map.MapEvent) + mapCh := make(chan *_map.Event) if err := s.devices.Watch(context.Background(), mapCh, _map.WithReplay()); err != nil { return err } @@ -207,6 +213,7 @@ func (s *atomixStore) Watch(ch chan<- *Event) error { } func (s *atomixStore) Close() error { + _ = s.devices.Close() return s.closer.Close() } diff --git a/pkg/util/atomix.go b/pkg/util/atomix.go index 64dd1123..032b3961 100644 --- a/pkg/util/atomix.go +++ b/pkg/util/atomix.go @@ -45,7 +45,7 @@ func GetAtomixRaftGroup() string { // GetAtomixClient returns the Atomix client func GetAtomixClient() (*client.Client, error) { - opts := []client.ClientOption{ + opts := []client.Option{ client.WithNamespace(getAtomixNamespace()), client.WithApplication(getAtomixApp()), }