Skip to content

Commit

Permalink
Add created timestamp to querypb.StreamHealthResponse
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Aug 29, 2024
1 parent f2d5d1c commit 16a2aed
Show file tree
Hide file tree
Showing 9 changed files with 1,027 additions and 904 deletions.
12 changes: 9 additions & 3 deletions go/vt/discovery/tablet_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import (
"bytes"
"encoding/json"
"strings"

"vitess.io/vitess/go/vt/vttablet/queryservice"
"time"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/vttablet/queryservice"
)

// TabletHealth represents simple tablet health data that is returned to users of healthcheck.
Expand All @@ -40,6 +42,7 @@ type TabletHealth struct {
PrimaryTermStartTime int64
LastError error
Serving bool
Timestamp *vttime.Time
}

func (th *TabletHealth) MarshalJSON() ([]byte, error) {
Expand All @@ -50,13 +53,15 @@ func (th *TabletHealth) MarshalJSON() ([]byte, error) {
PrimaryTermStartTime int64
Stats *query.RealtimeStats
LastError error
Timestamp time.Time
}{
Tablet: th.Tablet,
Target: th.Target,
Serving: th.Serving,
PrimaryTermStartTime: th.PrimaryTermStartTime,
Stats: th.Stats,
LastError: th.LastError,
Timestamp: protoutil.TimeFromProto(th.Timestamp),
})
}

Expand All @@ -69,7 +74,8 @@ func (th *TabletHealth) DeepEqual(other *TabletHealth) bool {
th.PrimaryTermStartTime == other.PrimaryTermStartTime &&
proto.Equal(th.Stats, other.Stats) &&
((th.LastError == nil && other.LastError == nil) ||
(th.LastError != nil && other.LastError != nil && th.LastError.Error() == other.LastError.Error()))
(th.LastError != nil && other.LastError != nil && th.LastError.Error() == other.LastError.Error())) &&
proto.Equal(th.Timestamp, other.Timestamp)
}

// GetTabletHostPort formats a tablet host port address.
Expand Down
15 changes: 9 additions & 6 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"google.golang.org/grpc"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options.
Expand Down Expand Up @@ -73,6 +73,8 @@ type tabletHealthCheck struct {
// LastError is the error we last saw when trying to get the
// tablet's healthcheck.
LastError error
// Timestamp represents the time the healthcheck data was produced.
Timestamp *vttime.Time
// possibly delete both these
loggedServingState bool
lastResponseTimestamp time.Time // timestamp of the last healthcheck response
Expand Down Expand Up @@ -217,6 +219,7 @@ func (thc *tabletHealthCheck) processResponse(hc *HealthCheckImpl, shr *query.St
thc.PrimaryTermStartTime = shr.PrimaryTermStartTimestamp
thc.Stats = shr.RealtimeStats
thc.LastError = healthErr
thc.Timestamp = shr.Timestamp
reason := "healthCheck update"
if healthErr != nil {
reason = "healthCheck update error: " + healthErr.Error()
Expand Down
1,793 changes: 904 additions & 889 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,17 @@ import (

"github.com/spf13/pflag"

vtschema "vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"

"vitess.io/vitess/go/vt/servenv"

"vitess.io/vitess/go/history"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
vtschema "vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

Expand Down Expand Up @@ -80,6 +79,8 @@ type healthStreamer struct {
signalWhenSchemaChange bool

viewsEnabled bool

nowTimeFunc func() time.Time
}

func newHealthStreamer(env tabletenv.Env, alias *topodatapb.TabletAlias, engine *schema.Engine) *healthStreamer {
Expand All @@ -100,6 +101,7 @@ func newHealthStreamer(env tabletenv.Env, alias *topodatapb.TabletAlias, engine
signalWhenSchemaChange: env.Config().SignalWhenSchemaChange,
viewsEnabled: env.Config().EnableViews,
se: engine,
nowTimeFunc: func() time.Time { return time.Now() },
}
hs.unhealthyThreshold.Store(env.Config().Healthcheck.UnhealthyThreshold.Nanoseconds())
return hs
Expand Down Expand Up @@ -212,6 +214,7 @@ func (hs *healthStreamer) ChangeState(tabletType topodatapb.TabletType, ptsTimes
}

func (hs *healthStreamer) broadCastToClients(shr *querypb.StreamHealthResponse) {
shr.Timestamp = protoutil.TimeToProto(hs.nowTimeFunc())
for ch := range hs.clients {
select {
case ch <- shr:
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -98,6 +99,8 @@ func TestHealthStreamerBroadcast(t *testing.T) {
}
blpFunc = testBlpFunc
hs := newHealthStreamer(env, alias, &schema.Engine{})
now := time.Now()
hs.nowTimeFunc = func() time.Time { return now }
hs.Open()
defer hs.Close()

Expand Down Expand Up @@ -125,11 +128,11 @@ func TestHealthStreamerBroadcast(t *testing.T) {
FilteredReplicationLagSeconds: 1,
BinlogPlayersCount: 2,
},
Timestamp: protoutil.TimeToProto(now),
}
assert.Truef(t, proto.Equal(want, shr), "want: %v, got: %v", want, shr)

// Test primary and timestamp.
now := time.Now()
hs.ChangeState(topodatapb.TabletType_PRIMARY, now, 0, nil, true)
shr = <-ch
want = &querypb.StreamHealthResponse{
Expand All @@ -143,6 +146,7 @@ func TestHealthStreamerBroadcast(t *testing.T) {
FilteredReplicationLagSeconds: 1,
BinlogPlayersCount: 2,
},
Timestamp: protoutil.TimeToProto(now),
}
assert.Truef(t, proto.Equal(want, shr), "want: %v, got: %v", want, shr)

Expand All @@ -159,6 +163,7 @@ func TestHealthStreamerBroadcast(t *testing.T) {
FilteredReplicationLagSeconds: 1,
BinlogPlayersCount: 2,
},
Timestamp: protoutil.TimeToProto(now),
}
assert.Truef(t, proto.Equal(want, shr), "want: %v, got: %v", want, shr)

Expand All @@ -175,6 +180,7 @@ func TestHealthStreamerBroadcast(t *testing.T) {
FilteredReplicationLagSeconds: 1,
BinlogPlayersCount: 2,
},
Timestamp: protoutil.TimeToProto(now),
}
assert.Truef(t, proto.Equal(want, shr), "want: %v, got: %v", want, shr)
}
Expand Down
4 changes: 4 additions & 0 deletions proto/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ option java_package = "io.vitess.proto";

import "topodata.proto";
import "vtrpc.proto";
import "vttime.proto";

// Target describes what the client expects the tablet is.
// If the tablet does not match, an error is returned.
Expand Down Expand Up @@ -985,6 +986,9 @@ message StreamHealthResponse {
// hasn't changed in the meantime e.g. due to tablet restarts where ports or
// ips have been reused but assigned differently.
topodata.TabletAlias tablet_alias = 5;

// timestamp represents the time the response was generated.
vttime.Time timestamp = 7;
}

// TransactionState represents the state of a distributed transaction.
Expand Down
6 changes: 6 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 16a2aed

Please sign in to comment.