Skip to content

Commit

Permalink
Topo: Add version support to GetTopologyPath (#15933)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored May 31, 2024
1 parent ab22305 commit c5e6e9b
Show file tree
Hide file tree
Showing 20 changed files with 1,575 additions and 1,205 deletions.
2 changes: 1 addition & 1 deletion examples/common/scripts/consul-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ sleep 5
echo "add ${cell} CellInfo"
set +e
# shellcheck disable=SC2086
command vtctldclient --server internal --topo-implementation consul --topo-global-server "${CONSUL_SERVER}:${consul_http_port}" AddCellInfo \
command vtctldclient --server internal --topo-implementation consul --topo-global-server-address "${CONSUL_SERVER}:${consul_http_port}" AddCellInfo \
--root "/vitess/${cell}" \
--server-address "${CONSUL_SERVER}:${consul_http_port}" \
"${cell}"
Expand Down
3 changes: 1 addition & 2 deletions examples/common/scripts/zk-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ echo "Started zk servers."
# Add the CellInfo description for the $CELL cell.
# If the node already exists, it's fine, means we used existing data.
set +e
# shellcheck disable=SC2086
command vtctldclient --server internal --topo-implementation zk2 --topo-global-server "${ZK_SERVER}" AddCellInfo \
command vtctldclient --server internal --topo-implementation zk2 --topo-global-server-address "${ZK_SERVER}" AddCellInfo \
--root "/vitess/${cell}" \
--server-address "${ZK_SERVER}" \
"${cell}"
Expand Down
1 change: 0 additions & 1 deletion go/cmd/vtctldclient/cli/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

"google.golang.org/protobuf/encoding/protojson"

"google.golang.org/protobuf/proto"
)

Expand Down
22 changes: 20 additions & 2 deletions go/cmd/vtctldclient/command/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ var (
Args: cobra.ExactArgs(1),
RunE: commandGetTopologyPath,
}

// The version of the key/path to get. If not specified, the latest/current
// version is returned.
version int64 = 0
// If true, only the data is output and it is in JSON format rather than prototext.
dataAsJSON bool = false
)

func commandGetTopologyPath(cmd *cobra.Command, args []string) error {
Expand All @@ -43,13 +49,23 @@ func commandGetTopologyPath(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

resp, err := client.GetTopologyPath(commandCtx, &vtctldatapb.GetTopologyPathRequest{
Path: path,
Path: path,
Version: version,
AsJson: dataAsJSON,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Cell)
if dataAsJSON {
if resp.GetCell() == nil || resp.GetCell().GetData() == "" {
return fmt.Errorf("no data found for path %s", path)
}
fmt.Println(resp.GetCell().GetData())
return nil
}

data, err := cli.MarshalJSONPretty(resp.GetCell())
if err != nil {
return err
}
Expand All @@ -60,5 +76,7 @@ func commandGetTopologyPath(cmd *cobra.Command, args []string) error {
}

func init() {
GetTopologyPath.Flags().Int64Var(&version, "version", version, "The version of the path's key to get. If not specified, the latest version is returned.")
GetTopologyPath.Flags().BoolVar(&dataAsJSON, "data-as-json", dataAsJSON, "If true, only the data is output and it is in JSON format rather than prototext.")
Root.AddCommand(GetTopologyPath)
}
2,339 changes: 1,184 additions & 1,155 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

98 changes: 94 additions & 4 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions go/vt/topo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ type Conn interface {
// Can return ErrNoNode if the file doesn't exist.
Get(ctx context.Context, filePath string) ([]byte, Version, error)

// GetVersion returns the content of a file at the given version.
// filePath is a path relative to the root directory of the cell.
// Can return ErrNoNode if the file doesn't exist at the given
// version or ErrNoImplementation if the topo server does not
// support storing multiple versions and retrieving a specific one.
GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error)

// List returns KV pairs, along with metadata like the version, for
// entries where the key contains the specified prefix.
// filePathPrefix is a path relative to the root directory of the cell.
Expand Down
5 changes: 5 additions & 0 deletions go/vt/topo/consultopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func (s *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Version
return pair.Value, ConsulVersion(pair.ModifyIndex), nil
}

// GetVersion is part of topo.Conn interface.
func (s *Server) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in consul topo")
}

// List is part of the topo.Conn interface.
func (s *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
nodePathPrefix := path.Join(s.root, filePathPrefix)
Expand Down
14 changes: 13 additions & 1 deletion go/vt/topo/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func DecodeContent(filename string, data []byte, json bool) (string, error) {
p = new(topodatapb.SrvKeyspace)
case RoutingRulesFile:
p = new(vschemapb.RoutingRules)
case CommonRoutingRulesFile:
switch path.Base(dir) {
case "keyspace":
p = new(vschemapb.KeyspaceRoutingRules)
}
default:
switch dir {
case "/" + GetExternalVitessClusterDir():
Expand All @@ -74,7 +79,14 @@ func DecodeContent(filename string, data []byte, json bool) (string, error) {
var marshalled []byte
var err error
if json {
marshalled, err = protojson.Marshal(p)
// Maintain snake_case for the JSON output as this keeps the output consistent across
// vtctldclient commands and it is needed if the returned value is used as input to
// vtctldclient, e.g. for ApplyRoutingRules.
pm := protojson.MarshalOptions{
Indent: " ",
UseProtoNames: true,
}
marshalled, err = pm.Marshal(p)
} else {
marshalled, err = prototext.Marshal(p)
}
Expand Down
15 changes: 15 additions & 0 deletions go/vt/topo/etcd2topo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ func (s *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Version
return resp.Kvs[0].Value, EtcdVersion(resp.Kvs[0].ModRevision), nil
}

// GetVersion is part of the topo.Conn interface.
func (s *Server) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
nodePath := path.Join(s.root, filePath)

resp, err := s.cli.Get(ctx, nodePath, clientv3.WithRev(version))
if err != nil {
return nil, convertError(err, nodePath)
}
if len(resp.Kvs) != 1 {
return nil, topo.NewError(topo.NoNode, nodePath)
}

return resp.Kvs[0].Value, nil
}

// List is part of the topo.Conn interface.
func (s *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
nodePathPrefix := path.Join(s.root, filePathPrefix)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/topo/faketopo/faketopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ func (f *FakeConn) Get(ctx context.Context, filePath string) ([]byte, topo.Versi
return res.contents, memorytopo.NodeVersion(res.version), nil
}

// GetVersion is part of topo.Conn interface.
func (f *FakeConn) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in fake topo")
}

// List is part of the topo.Conn interface.
func (f *FakeConn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
return nil, topo.NewError(topo.NoImplementation, "List not supported in fake topo")
Expand Down
13 changes: 8 additions & 5 deletions go/vt/topo/memorytopo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,19 @@ func (c *Conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version,

// Get the node.
n := c.factory.nodeByPath(c.cell, filePath)
if n == nil {
// This matches the other topo implementations of returning topo.NoNode when calling
// Get() with a key prefix or "directory".
if n == nil || n.contents == nil {
return nil, nil, topo.NewError(topo.NoNode, filePath)
}
if n.contents == nil {
// it's a directory
return nil, nil, fmt.Errorf("cannot Get() directory %v in cell %v", filePath, c.cell)
}
return n.contents, NodeVersion(n.version), nil
}

// GetVersion is part of topo.Conn interface.
func (c *Conn) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in memory topo")
}

// List is part of the topo.Conn interface.
func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
c.factory.callstats.Add([]string{"List"}, 1)
Expand Down
13 changes: 13 additions & 0 deletions go/vt/topo/stats_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version,
return bytes, version, err
}

// GetVersion is part of the Conn interface.
func (st *StatsConn) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
startTime := time.Now()
statsKey := []string{"GetVersion", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, err := st.conn.GetVersion(ctx, filePath, version)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return bytes, err
}
return bytes, err
}

// List is part of the Conn interface
func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) {
startTime := time.Now()
Expand Down
9 changes: 9 additions & 0 deletions go/vt/topo/stats_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func (st *fakeConn) Get(ctx context.Context, filePath string) (bytes []byte, ver
return bytes, ver, err
}

// GetVersion is part of the Conn interface
func (st *fakeConn) GetVersion(ctx context.Context, filePath string, version int64) (bytes []byte, err error) {
if filePath == "error" {
return bytes, fmt.Errorf("Dummy error")

}
return bytes, err
}

// List is part of the Conn interface
func (st *fakeConn) List(ctx context.Context, filePathPrefix string) (bytes []KVInfo, err error) {
if filePathPrefix == "error" {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/topo/zk2topo/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func (zs *Server) Get(ctx context.Context, filePath string) ([]byte, topo.Versio
return contents, ZKVersion(stat.Version), nil
}

// GetVersion is part of topo.Conn interface.
func (zs *Server) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in ZK2 topo")
}

// List is part of the topo.Conn interface.
func (zs *Server) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
return nil, topo.NewError(topo.NoImplementation, "List not supported in ZK2 topo")
Expand Down
Loading

0 comments on commit c5e6e9b

Please sign in to comment.