Skip to content

Commit

Permalink
Update Atomix client (#36)
Browse files Browse the repository at this point in the history
* Update Atomix client to use cleaned up API.

* Update tests to ensure test server is closed following test.
  • Loading branch information
kuujo authored Aug 30, 2019
1 parent 8214f17 commit 54c5f0e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module github.com/onosproject/onos-topo
go 1.12

require (
github.com/atomix/atomix-go-client v0.0.0-20190819235619-b3cabfacacd0
github.com/atomix/atomix-go-local v0.0.0-20190820000011-d55305ea0d8c
github.com/atomix/atomix-go-node v0.0.0-20190819235918-362f3143b084
github.com/atomix/atomix-go-client v0.0.0-20190830184106-6ca178a89ccc
github.com/atomix/atomix-go-local v0.0.0-20190830183800-73f964b0f75a
github.com/atomix/atomix-go-node v0.0.0-20190830183818-a5b5157566f6
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d
github.com/mitchellh/go-homedir v1.1.0
github.com/onosproject/onos-config v0.0.0-20190715180819-079d3a8dc433
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,34 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/atomix/atomix-api v0.0.0-20190819202500-d202db6bbedb/go.mod h1:joWKUd0zIeYbAQ0vmYHGsnV03ZgRalhceHgnJ3EN0mI=
github.com/atomix/atomix-api v0.0.0-20190819230829-366ccc994adb h1:5BfPSZekPTwr8SoHWZVYEj8bHtZJavTXWGRbsEs/t/4=
github.com/atomix/atomix-api v0.0.0-20190819230829-366ccc994adb/go.mod h1:joWKUd0zIeYbAQ0vmYHGsnV03ZgRalhceHgnJ3EN0mI=
github.com/atomix/atomix-api v0.0.0-20190826211343-dd8f4db3bf77 h1:+PUuY9wDRp+VAg/JbEguzdOMJj6ruUw6Kw/y+QYHB6s=
github.com/atomix/atomix-api v0.0.0-20190826211343-dd8f4db3bf77/go.mod h1:joWKUd0zIeYbAQ0vmYHGsnV03ZgRalhceHgnJ3EN0mI=
github.com/atomix/atomix-go-client v0.0.0-20190807011524-58b4352273d7 h1:XE+KsvclmfZjbqYDhBqeK8wHDDHYB/FiwGeLcBurYoM=
github.com/atomix/atomix-go-client v0.0.0-20190807011524-58b4352273d7/go.mod h1:P/m0xcEzXviZLULk78gYlJr0mHjXCZAcem3kOkvwzhA=
github.com/atomix/atomix-go-client v0.0.0-20190814013624-2b7049842ee1 h1:SW6b1TIATnNmsKrnGaoweY+uLezgUEKT61z3IGLKOts=
github.com/atomix/atomix-go-client v0.0.0-20190814013624-2b7049842ee1/go.mod h1:P/m0xcEzXviZLULk78gYlJr0mHjXCZAcem3kOkvwzhA=
github.com/atomix/atomix-go-client v0.0.0-20190819235619-b3cabfacacd0 h1:9FvHppD8AFmgQGG2K+OsFJGbYxGvha5tkHu5O85o6sU=
github.com/atomix/atomix-go-client v0.0.0-20190819235619-b3cabfacacd0/go.mod h1:isfbdQyPZ93GKZ8KQ85dy42D8GYoUB/kN8JmT0hq/1E=
github.com/atomix/atomix-go-client v0.0.0-20190827234201-188602d4e780/go.mod h1:/UAIApUE5+Ghzu8oBVcYUoz6nCosrRPa0eUlluBtKz0=
github.com/atomix/atomix-go-client v0.0.0-20190830184106-6ca178a89ccc h1:yBus/VAiZxQhr3AavYW0YaMdhhUUp9GuHsdJUSMnnvg=
github.com/atomix/atomix-go-client v0.0.0-20190830184106-6ca178a89ccc/go.mod h1:Ap8Wz+lg4gojTXvMcKRKoYYe6ig6/FeaSLuZv7Ghlf0=
github.com/atomix/atomix-go-local v0.0.0-20190819172907-cb2548c995e9 h1:EP+aAidkERynalsgvKOMU+nJwYbf0rS5GfYmlHmNDxo=
github.com/atomix/atomix-go-local v0.0.0-20190819172907-cb2548c995e9/go.mod h1:iBfSnHJMJMuCEKBTWPbuTl1Y6rY+WGt+a9vYsSLTHmE=
github.com/atomix/atomix-go-local v0.0.0-20190820000011-d55305ea0d8c h1:qVu0vfAkkeTFn4LgGTgCKlFp5JlPOKDFQGc7FuWtjw0=
github.com/atomix/atomix-go-local v0.0.0-20190820000011-d55305ea0d8c/go.mod h1:YMZzUByozlrFK9BPkHN10WQirdPFRAkz3kEIn6c58uA=
github.com/atomix/atomix-go-local v0.0.0-20190827233944-938e35b06834/go.mod h1:qLBTOiVKoEqzYOjgxIgWFa+Hfa3SR+VexA6jGBcv0HA=
github.com/atomix/atomix-go-local v0.0.0-20190828183508-3db728c0fc3b/go.mod h1:VnwyXJvHzUHuVzzTmPhZ6/ktbBnz3CZk3aKMX7VlTmY=
github.com/atomix/atomix-go-local v0.0.0-20190830183800-73f964b0f75a h1:O/17kCIR6b+QeSkNpP12g/xTiDg976KSIS/8SSJ6Z/8=
github.com/atomix/atomix-go-local v0.0.0-20190830183800-73f964b0f75a/go.mod h1:lt/qUsFF29yT2ofmxOXfFzIz0poN22/Qa5SPdalgTKw=
github.com/atomix/atomix-go-node v0.0.0-20190819174806-3d4536bf032d h1:/IgJxOQRnf+0FVlsudlPyOxh2hIRWhPOA5dSA8SpQZI=
github.com/atomix/atomix-go-node v0.0.0-20190819174806-3d4536bf032d/go.mod h1:AX6dqVU12HBfSTPp0s7qqUjGD6fusaO1K0zkTukwH20=
github.com/atomix/atomix-go-node v0.0.0-20190819235918-362f3143b084 h1:pp555c1uxKC8UqiifN/azV8CgNAsD3J/FQyrzwblbmA=
github.com/atomix/atomix-go-node v0.0.0-20190819235918-362f3143b084/go.mod h1:rWp3DgtUt4qdHDm//ewTptIaEbFde298JSWYN90zeNo=
github.com/atomix/atomix-go-node v0.0.0-20190827191929-2d3dc9c550d9/go.mod h1:PL1T5R78itch1QC1CN4JmbRL/2XQlg4R95R14822C6Q=
github.com/atomix/atomix-go-node v0.0.0-20190828183436-fc30340cd8db/go.mod h1:dyh8Bb50qKfMlpqDE6X+dQ1tZ399WKEABa3ntDYImnA=
github.com/atomix/atomix-go-node v0.0.0-20190830183721-649263a17223/go.mod h1:KJxB/MAgndAbyCOqTV2hatw7lExiZZs7QCOr45IfC9U=
github.com/atomix/atomix-go-node v0.0.0-20190830183818-a5b5157566f6 h1:rxmMkW6vJOGYSRiqdsim5szVT9N4deSTCvlclPJwHN4=
github.com/atomix/atomix-go-node v0.0.0-20190830183818-a5b5157566f6/go.mod h1:398EUMrz8gNaqdsNDMXW2OlSp6nNPZLcC7b/QsXzl80=
github.com/atomix/atomix-k8s-controller v0.0.0-20190620084759-d5e65f7fbf68/go.mod h1:vdmRfGKhgD28STeLKeKoq3tMZOyTR0l3WSG9QCrZWs0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
39 changes: 23 additions & 16 deletions pkg/northbound/device/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/atomix/atomix-go-client/pkg/client/session"
"github.com/atomix/atomix-go-local/pkg/atomix/local"
"github.com/atomix/atomix-go-node/pkg/atomix"
"github.com/atomix/atomix-go-node/pkg/atomix/registry"
"github.com/gogo/protobuf/proto"
"github.com/onosproject/onos-topo/pkg/util"
"google.golang.org/grpc"
Expand Down Expand Up @@ -55,23 +56,11 @@ func NewAtomixStore() (Store, error) {

// NewLocalStore returns a new local device store
func NewLocalStore() (Store, error) {
lis := bufconn.Listen(1024 * 1024)
node := local.NewLocalNode(lis)
go func() {
_ = node.Start()
}()
node, conn := startLocalNode()
name := primitive.Name{
Namespace: "local",
Name: "devices",
}
dialer := func(ctx context.Context, address string) (net.Conn, error) {
return lis.Dial()
}

conn, err := grpc.DialContext(context.Background(), "devices", grpc.WithContextDialer(dialer), grpc.WithInsecure())
if err != nil {
panic("Failed to dial devices")
}

devices, err := _map.New(context.Background(), name, []*grpc.ClientConn{conn})
if err != nil {
Expand All @@ -84,6 +73,23 @@ func NewLocalStore() (Store, error) {
}, nil
}

// startLocalNode starts a single local node
func startLocalNode() (*atomix.Node, *grpc.ClientConn) {
lis := bufconn.Listen(1024 * 1024)
node := local.NewNode(lis, registry.Registry)
_ = node.Start()

dialer := func(ctx context.Context, address string) (net.Conn, error) {
return lis.Dial()
}

conn, err := grpc.DialContext(context.Background(), "devices", grpc.WithContextDialer(dialer), grpc.WithInsecure())
if err != nil {
panic("Failed to dial devices")
}
return node, conn
}

type nodeCloser struct {
node *atomix.Node
}
Expand Down Expand Up @@ -145,7 +151,7 @@ func (s *atomixStore) Store(device *Device) error {
if device.Revision == 0 {
kv, err = s.devices.Put(ctx, string(device.ID), bytes)
} else {
kv, err = s.devices.Put(ctx, string(device.ID), bytes, _map.WithVersion(int64(device.Revision)))
kv, err = s.devices.Put(ctx, string(device.ID), bytes, _map.IfVersion(int64(device.Revision)))
}

if err != nil {
Expand All @@ -162,7 +168,7 @@ func (s *atomixStore) Delete(device *Device) error {
defer cancel()

if device.Revision > 0 {
_, err := s.devices.Remove(ctx, string(device.ID), _map.WithVersion(int64(device.Revision)))
_, err := s.devices.Remove(ctx, string(device.ID), _map.IfVersion(int64(device.Revision)))
return err
}
_, err := s.devices.Remove(ctx, string(device.ID))
Expand All @@ -187,7 +193,7 @@ func (s *atomixStore) List(ch chan<- *Device) error {
}

func (s *atomixStore) Watch(ch chan<- *Event) error {
mapCh := make(chan *_map.MapEvent)
mapCh := make(chan *_map.Event)
if err := s.devices.Watch(context.Background(), mapCh, _map.WithReplay()); err != nil {
return err
}
Expand All @@ -207,6 +213,7 @@ func (s *atomixStore) Watch(ch chan<- *Event) error {
}

func (s *atomixStore) Close() error {
_ = s.devices.Close()
return s.closer.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/atomix.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func GetAtomixRaftGroup() string {

// GetAtomixClient returns the Atomix client
func GetAtomixClient() (*client.Client, error) {
opts := []client.ClientOption{
opts := []client.Option{
client.WithNamespace(getAtomixNamespace()),
client.WithApplication(getAtomixApp()),
}
Expand Down

0 comments on commit 54c5f0e

Please sign in to comment.