Skip to content

Commit

Permalink
scheduled upgrades (#373)
Browse files Browse the repository at this point in the history
* first stab at scheduled upgrades

* lint

* add service account and cluster role/binding reconciliation

* roles for manager

* skip version check if versions are not populated in spec

* matching labels

* lint

* switch to role

* scheme

* fix scheme, reconcile heights

* use separate status map

* change annotation so that pod is restarted

* use new height map

* height should be commit + 1

* restart pods with version mismatch immediately

* harden rollout

* version check on terminate in sidecar

* ticker for height check

* Make setting halt height configurable

* remove unused func

* Modify reconcile pods to prioritize upgrades

* Add tests for rbac

* wait until after first interval to lock db

* harden pod control and test

* dry up test
  • Loading branch information
agouin authored Oct 24, 2023
1 parent 9d6c29c commit b305a73
Show file tree
Hide file tree
Showing 31 changed files with 1,885 additions and 173 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ RUN go mod download
# Copy the go source
COPY *.go .
COPY api/ api/
COPY cmd/ cmd/
COPY controllers/ controllers/
COPY internal/ internal/

Expand Down
29 changes: 29 additions & 0 deletions api/v1/cosmosfullnode_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ type FullNodeStatus struct {
// Current sync information. Collected every 60s.
// +optional
SyncInfo *SyncInfoStatus `json:"syncInfo,omitempty"`

// Latest Height information. collected when node starts up and when RPC is successfully queried.
// +optional
Height map[string]uint64 `json:"height,omitempty"`
}

type SyncInfoStatus struct {
Expand Down Expand Up @@ -524,6 +528,31 @@ type ChainSpec struct {
// +kubebuilder:validation:Minimum:=0
// +optional
PrivvalSleepSeconds *int32 `json:"privvalSleepSeconds"`

// DatabaseBackend must match in order to detect the block height
// of the chain prior to starting in order to pick the correct image version.
// options: goleveldb, rocksdb, pebbledb
// Defaults to goleveldb.
// +optional
DatabaseBackend *string `json:"databaseBackend"`

// Versions of the chain and which height they should be applied.
// When provided, the operator will automatically upgrade the chain as it reaches the specified heights.
// If not provided, the operator will not upgrade the chain, and will use the image specified in the pod spec.
// +optional
Versions []ChainVersion `json:"versions"`
}

type ChainVersion struct {
// The block height when this version should be applied.
UpgradeHeight uint64 `json:"height"`

// The docker image for this version in "repository:tag" format. E.g. busybox:latest.
Image string `json:"image"`

// Determines if the node should forcefully halt at the upgrade height.
// +optional
SetHaltHeight bool `json:"setHaltHeight,omitempty"`
}

// CometConfig configures the config.toml.
Expand Down
32 changes: 32 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions healtcheck_cmd.go → cmd/healtcheck.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"context"
Expand All @@ -14,7 +14,7 @@ import (
"golang.org/x/sync/errgroup"
)

func healthcheckCmd() *cobra.Command {
func HealthCheckCmd() *cobra.Command {
hc := &cobra.Command{
Short: "Start health check probe",
Use: "healthcheck",
Expand Down Expand Up @@ -43,7 +43,7 @@ func startHealthCheckServer(cmd *cobra.Command, args []string) error {
httpClient = &http.Client{Timeout: 30 * time.Second}
cometClient = cosmos.NewCometClient(httpClient)

zlog = zapLogger("info", viper.GetString("log-format"))
zlog = ZapLogger("info", viper.GetString("log-format"))
logger = zapr.NewLogger(zlog)
)
defer func() { _ = zlog.Sync() }()
Expand Down
4 changes: 2 additions & 2 deletions logger.go → cmd/logger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"os"
Expand All @@ -8,7 +8,7 @@ import (
"go.uber.org/zap/zapcore"
)

func zapLogger(level, format string) *zap.Logger {
func ZapLogger(level, format string) *zap.Logger {
config := zap.NewProductionEncoderConfig()
config.EncodeTime = func(ts time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(ts.UTC().Format(time.RFC3339))
Expand Down
208 changes: 208 additions & 0 deletions cmd/versioncheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
Copyright © 2023 Strangelove Crypto, Inc.
*/
package cmd

import (
"context"
"fmt"
"io"
"os"
"time"

"cosmossdk.io/log"
"cosmossdk.io/store/rootmulti"
dbm "github.com/cosmos/cosmos-db"
"github.com/spf13/cobra"
cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
namespaceFile = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

flagBackend = "backend"
flagDaemon = "daemon"

tickTime = 30 * time.Second
)

// VersionCheckCmd gets the height of this node and updates the status of the crd.
// It panics if the wrong image is specified for the pod for the height,
// restarting the pod so that the correct image is used from the patched height.
// this command is intended to be run as an init container.
func VersionCheckCmd(scheme *runtime.Scheme) *cobra.Command {
cmd := &cobra.Command{
Use: "versioncheck",
Short: "Confirm correct image used for current node height",
Long: `Open the Cosmos SDK chain database, get the height, update the crd status with the height, then check the image for the height and panic if it is incorrect.`,
Run: func(cmd *cobra.Command, args []string) {
dataDir := os.Getenv("DATA_DIR")
backend, _ := cmd.Flags().GetString(flagBackend)
daemon, _ := cmd.Flags().GetBool(flagDaemon)

nsbz, err := os.ReadFile(namespaceFile)
if err != nil {
panic(fmt.Errorf("failed to read namespace from service account: %w", err))
}
ns := string(nsbz)

config, err := rest.InClusterConfig()
if err != nil {
panic(fmt.Errorf("failed to get in cluster config: %w", err))
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Errorf("failed to create kube clientset: %w", err))
}

ctx := cmd.Context()

thisPod, err := clientset.CoreV1().Pods(ns).Get(ctx, os.Getenv("HOSTNAME"), metav1.GetOptions{})
if err != nil {
panic(fmt.Errorf("failed to get this pod: %w", err))
}

cosmosFullNodeName := thisPod.Labels["app.kubernetes.io/name"]

kClient, err := client.New(config, client.Options{
Scheme: scheme,
})
if err != nil {
panic(fmt.Errorf("failed to create kube client: %w", err))
}

namespacedName := types.NamespacedName{
Namespace: ns,
Name: cosmosFullNodeName,
}

crd := new(cosmosv1.CosmosFullNode)
if err = kClient.Get(ctx, namespacedName, crd); err != nil {
panic(fmt.Errorf("failed to get crd: %w", err))
}

if len(crd.Spec.ChainSpec.Versions) == 0 {
fmt.Fprintln(cmd.OutOrStdout(), "No versions specified, skipping version check")
return
}

s, err := os.Stat(dataDir)
if err != nil {
panic(fmt.Errorf("failed to stat %s: %w", dataDir, err))
}

if !s.IsDir() {
panic(fmt.Errorf("%s is not a directory", dataDir))
}

if daemon {
ticker := time.NewTicker(tickTime)
defer ticker.Stop()
for {
select {
case <-cmd.Context().Done():
return
case <-ticker.C:
if err := checkVersion(cmd.Context(), nil, kClient, namespacedName, thisPod, dataDir, backend, cmd.OutOrStdout()); err != nil {
panic(err)
}
ticker.Reset(tickTime)
}
}
}
if err := checkVersion(cmd.Context(), crd, kClient, namespacedName, thisPod, dataDir, backend, cmd.OutOrStdout()); err != nil {
panic(err)
}
},
}

cmd.Flags().StringP(flagBackend, "b", "goleveldb", "Database backend")
cmd.Flags().BoolP(flagDaemon, "d", false, "Run as daemon")

return cmd
}

func checkVersion(
ctx context.Context,
crd *cosmosv1.CosmosFullNode,
kClient client.Client,
namespacedName types.NamespacedName,
thisPod *corev1.Pod,
dataDir string,
backend string,
writer io.Writer,
) error {
db, err := dbm.NewDB("application", getBackend(backend), dataDir)
if err != nil {
if crd == nil {
fmt.Fprintf(writer, "Failed to open db: %s. The node is likely running.\n", err)
// This is okay, we will read it later if the node shuts down.
return nil
} else {
return fmt.Errorf("failed to open db: %w", err)
}
}
store := rootmulti.NewStore(db, log.NewNopLogger(), nil)

height := store.LatestVersion() + 1
db.Close()

if crd == nil {
crd = new(cosmosv1.CosmosFullNode)
if err := kClient.Get(ctx, namespacedName, crd); err != nil {
return fmt.Errorf("failed to get crd: %w", err)
}
}

if crd.Status.Height == nil {
crd.Status.Height = make(map[string]uint64)
}

crd.Status.Height[thisPod.Name] = uint64(height)

if err := kClient.Status().Update(
ctx, crd,
); err != nil {
return fmt.Errorf("failed to patch status: %w", err)
}

var image string
for _, v := range crd.Spec.ChainSpec.Versions {
if uint64(height) < v.UpgradeHeight {
break
}
image = v.Image
}

thisPodImage := thisPod.Spec.Containers[0].Image
if thisPodImage != image {
return fmt.Errorf("image mismatch for height %d: %s != %s", height, thisPodImage, image)
}

fmt.Fprintf(writer, "Verified correct image for height %d: %s\n", height, image)

return nil
}

func getBackend(backend string) dbm.BackendType {
switch backend {
case "goleveldb":
return dbm.GoLevelDBBackend
case "memdb":
return dbm.MemDBBackend
case "rocksdb":
return dbm.RocksDBBackend
case "pebbledb":
return dbm.PebbleDBBackend
default:
panic(fmt.Errorf("unknown backend %s", backend))
}
}
Loading

0 comments on commit b305a73

Please sign in to comment.