From c121a7a6ff7be2349b86db3af5352888e1995a5b Mon Sep 17 00:00:00 2001 From: marcinromaszewicz Date: Thu, 25 Apr 2024 10:04:39 -0700 Subject: [PATCH] Expose Dialers inside Zk and Region This change allows for overriding the default dialers used by the ZooKeeper client and the Region client to connect to their respective servers. Proxy-aware dialers can be installed by people who need them. --- admin_client.go | 3 ++- client.go | 33 +++++++++++++++++++++++++++++---- debug_state_test.go | 2 ++ mockrc_test.go | 5 +++-- region/client.go | 8 ++++++-- region/new.go | 20 +++++++++++++++++--- rpc.go | 9 +++++---- rpc_test.go | 11 ++++++----- zk/client.go | 15 ++++++++++++--- 9 files changed, 82 insertions(+), 24 deletions(-) diff --git a/admin_client.go b/admin_client.go index 8dbda26d..bbd6a09d 100644 --- a/admin_client.go +++ b/admin_client.go @@ -12,6 +12,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" "github.com/tsuna/gohbase/region" @@ -67,7 +68,7 @@ func newAdminClient(zkquorum string, options ...Option) AdminClient { for _, option := range options { option(c) } - c.zkClient = zk.NewClient(zkquorum, c.zkTimeout) + c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer) return c } diff --git a/client.go b/client.go index e370234e..132b52ea 100644 --- a/client.go +++ b/client.go @@ -13,14 +13,16 @@ import ( "sync" "time" + gzk "github.com/go-zookeeper/zk" log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" + "modernc.org/b/v2" + "github.com/tsuna/gohbase/compression" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" "github.com/tsuna/gohbase/region" "github.com/tsuna/gohbase/zk" - "google.golang.org/protobuf/proto" - "modernc.org/b/v2" ) const ( @@ -97,9 +99,15 @@ type client struct { closeOnce sync.Once newRegionClientFn func(string, region.ClientType, int, time.Duration, - string, time.Duration, compression.Codec) hrpc.RegionClient + string, time.Duration, compression.Codec, region.Dialer) hrpc.RegionClient compressionCodec compression.Codec + + // zkDialer is passed through to Zk Connect() to configure custom connection settings + zkDialer gzk.Dialer + // regionDialer is passed into the region client to connect to hbase in a custom way, + // such as SOCKS proxy. + regionDialer region.Dialer } // NewClient creates a new HBase client. @@ -140,7 +148,7 @@ func newClient(zkquorum string, options ...Option) *client { //Have to create the zkClient after the Options have been set //since the zkTimeout could be changed as an option - c.zkClient = zk.NewClient(zkquorum, c.zkTimeout) + c.zkClient = zk.NewClient(zkquorum, c.zkTimeout, c.zkDialer) return c } @@ -268,6 +276,23 @@ func CompressionCodec(codec string) Option { } } +// ZooKeeperDialer will return an option to pass the given dialer function +// into the ZooKeeper client Connect() call, which allows for customizing +// network connections. +func ZooKeeperDialer(dialer gzk.Dialer) Option { + return func(c *client) { + c.zkDialer = dialer + } +} + +// RegionDialer will return an option that uses the specified Dialer for +// connecting to region servers. This allows for connecting through proxies. +func RegionDialer(dialer region.Dialer) Option { + return func(c *client) { + c.regionDialer = dialer + } +} + // Close closes connections to hbase master and regionservers func (c *client) Close() { c.closeOnce.Do(func() { diff --git a/debug_state_test.go b/debug_state_test.go index fe6bcd23..6860cb00 100644 --- a/debug_state_test.go +++ b/debug_state_test.go @@ -6,6 +6,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/region" ) @@ -25,6 +26,7 @@ func TestDebugStateSanity(t *testing.T) { defaultEffectiveUser, region.DefaultReadTimeout, client.compressionCodec, + nil, ) newClientFn := func() hrpc.RegionClient { return regClient diff --git a/mockrc_test.go b/mockrc_test.go index f6dce11c..02c228d5 100644 --- a/mockrc_test.go +++ b/mockrc_test.go @@ -13,11 +13,12 @@ import ( "sync/atomic" "time" + "google.golang.org/protobuf/proto" + "github.com/tsuna/gohbase/compression" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" "github.com/tsuna/gohbase/region" - "google.golang.org/protobuf/proto" ) type testClient struct { @@ -177,7 +178,7 @@ func init() { func newMockRegionClient(addr string, ctype region.ClientType, queueSize int, flushInterval time.Duration, effectiveUser string, - readTimeout time.Duration, codec compression.Codec) hrpc.RegionClient { + readTimeout time.Duration, codec compression.Codec, dialer region.Dialer) hrpc.RegionClient { m.Lock() clients[addr]++ m.Unlock() diff --git a/region/client.go b/region/client.go index 686aa00e..b4482688 100644 --- a/region/client.go +++ b/region/client.go @@ -22,10 +22,11 @@ import ( "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" - "github.com/tsuna/gohbase/hrpc" - "github.com/tsuna/gohbase/pb" "google.golang.org/protobuf/encoding/protowire" "google.golang.org/protobuf/proto" + + "github.com/tsuna/gohbase/hrpc" + "github.com/tsuna/gohbase/pb" ) // ClientType is a type alias to represent the type of this region client @@ -192,6 +193,9 @@ type client struct { // compressor for cellblocks. if nil, then no compression compressor *compressor + + // dialer is used to connect to region servers in non-standard ways + dialer Dialer } // QueueRPC will add an rpc call to the queue for processing by the writer goroutine diff --git a/region/new.go b/region/new.go index 4156396c..ea34b0a4 100644 --- a/region/new.go +++ b/region/new.go @@ -18,9 +18,16 @@ import ( "github.com/tsuna/gohbase/hrpc" ) +// Dialer is used to connect to region servers. net.Dialer conforms to this +// interface, which is just the subset of it that we use. +type Dialer interface { + DialContext(ctx context.Context, net, addr string) (net.Conn, error) +} + // NewClient creates a new RegionClient. func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time.Duration, - effectiveUser string, readTimeout time.Duration, codec compression.Codec) hrpc.RegionClient { + effectiveUser string, readTimeout time.Duration, codec compression.Codec, + dialer Dialer) hrpc.RegionClient { c := &client{ addr: addr, ctype: ctype, @@ -36,14 +43,21 @@ func NewClient(addr string, ctype ClientType, queueSize int, flushInterval time. if codec != nil { c.compressor = &compressor{Codec: codec} } + + if dialer != nil { + c.dialer = dialer + } else { + var d net.Dialer + c.dialer = &d + } + return c } func (c *client) Dial(ctx context.Context) error { c.dialOnce.Do(func() { - var d net.Dialer var err error - c.conn, err = d.DialContext(ctx, "tcp", c.addr) + c.conn, err = c.dialer.DialContext(ctx, "tcp", c.addr) if err != nil { c.fail(fmt.Errorf("failed to dial RegionServer: %s", err)) return diff --git a/rpc.go b/rpc.go index d86ec48c..e22d543d 100644 --- a/rpc.go +++ b/rpc.go @@ -16,12 +16,13 @@ import ( "time" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/codes" + "google.golang.org/protobuf/proto" + "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/internal/observability" "github.com/tsuna/gohbase/region" "github.com/tsuna/gohbase/zk" - "go.opentelemetry.io/otel/codes" - "google.golang.org/protobuf/proto" ) // Constants @@ -828,11 +829,11 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) { // master that we don't add to the cache // TODO: consider combining this case with the regular regionserver path client = c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval, - c.effectiveUser, c.regionReadTimeout, nil) + c.effectiveUser, c.regionReadTimeout, nil, c.regionDialer) } else { client = c.clients.put(addr, reg, func() hrpc.RegionClient { return c.newRegionClientFn(addr, c.clientType, c.rpcQueueSize, c.flushInterval, - c.effectiveUser, c.regionReadTimeout, c.compressionCodec) + c.effectiveUser, c.regionReadTimeout, c.compressionCodec, c.regionDialer) }) } diff --git a/rpc_test.go b/rpc_test.go index 12c88837..8c7dd485 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -20,6 +20,10 @@ import ( "github.com/golang/mock/gomock" "github.com/prometheus/client_golang/prometheus" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/wrapperspb" + "modernc.org/b/v2" + "github.com/tsuna/gohbase/compression" "github.com/tsuna/gohbase/hrpc" "github.com/tsuna/gohbase/pb" @@ -29,15 +33,12 @@ import ( mockRegion "github.com/tsuna/gohbase/test/mock/region" mockZk "github.com/tsuna/gohbase/test/mock/zk" "github.com/tsuna/gohbase/zk" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/wrapperspb" - "modernc.org/b/v2" ) func newRegionClientFn(addr string) func() hrpc.RegionClient { return func() hrpc.RegionClient { return newMockRegionClient(addr, region.RegionClient, - 0, 0, "root", region.DefaultReadTimeout, nil) + 0, 0, "root", region.DefaultReadTimeout, nil, nil) } } @@ -301,7 +302,7 @@ func TestEstablishRegionDialFail(t *testing.T) { newRegionClientFnCallCount := 0 c.newRegionClientFn = func(_ string, _ region.ClientType, _ int, _ time.Duration, - _ string, _ time.Duration, _ compression.Codec) hrpc.RegionClient { + _ string, _ time.Duration, _ compression.Codec, _ region.Dialer) hrpc.RegionClient { var rc hrpc.RegionClient if newRegionClientFnCallCount == 0 { rc = rcFailDial diff --git a/zk/client.go b/zk/client.go index 9257b618..51b8c253 100644 --- a/zk/client.go +++ b/zk/client.go @@ -17,8 +17,9 @@ import ( log "github.com/sirupsen/logrus" "github.com/go-zookeeper/zk" - "github.com/tsuna/gohbase/pb" "google.golang.org/protobuf/proto" + + "github.com/tsuna/gohbase/pb" ) type logger struct{} @@ -58,19 +59,27 @@ type Client interface { type client struct { zks []string sessionTimeout time.Duration + dialer zk.Dialer } // NewClient establishes connection to zookeeper and returns the client -func NewClient(zkquorum string, st time.Duration) Client { +func NewClient(zkquorum string, st time.Duration, dialer zk.Dialer) Client { return &client{ zks: strings.Split(zkquorum, ","), sessionTimeout: st, + dialer: dialer, } } // LocateResource returns address of the server for the specified resource. func (c *client) LocateResource(resource ResourceName) (string, error) { - conn, _, err := zk.Connect(c.zks, c.sessionTimeout) + var conn *zk.Conn + var err error + if c.dialer != nil { + conn, _, err = zk.Connect(c.zks, c.sessionTimeout, zk.WithDialer(c.dialer)) + } else { + conn, _, err = zk.Connect(c.zks, c.sessionTimeout) + } if err != nil { return "", fmt.Errorf("error connecting to ZooKeeper at %v: %s", c.zks, err) }