Skip to content

Commit

Permalink
feat: use int versions in active_versions
Browse files Browse the repository at this point in the history
  • Loading branch information
kristina-solovyova committed Feb 2, 2024
1 parent a8a4ae1 commit 84682fa
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 135 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ proxy_url = VALUE
| <a name="input_function_app_storage_account_prefix"></a> [function\_app\_storage\_account\_prefix](#input\_function\_app\_storage\_account\_prefix) | Weka storage account name prefix | `string` | `"weka"` | no |
| <a name="input_function_app_subnet_delegation_cidr"></a> [function\_app\_subnet\_delegation\_cidr](#input\_function\_app\_subnet\_delegation\_cidr) | Subnet delegation enables you to designate a specific subnet for an Azure PaaS service. | `string` | `"10.0.1.0/25"` | no |
| <a name="input_function_app_subnet_delegation_id"></a> [function\_app\_subnet\_delegation\_id](#input\_function\_app\_subnet\_delegation\_id) | Required to specify if subnet\_name were used to specify pre-defined subnets for weka. Function subnet delegation requires an additional subnet, and in the case of pre-defined networking this one also should be pre-defined | `string` | `""` | no |
| <a name="input_function_app_version"></a> [function\_app\_version](#input\_function\_app\_version) | Function app code version (hash) | `string` | `"9611fb1feebc0c4e3f8b34671fa909fe"` | no |
| <a name="input_function_app_version"></a> [function\_app\_version](#input\_function\_app\_version) | Function app code version (hash) | `string` | `"abe83f3f0224c29630262452d616dffd"` | no |
| <a name="input_get_weka_io_token"></a> [get\_weka\_io\_token](#input\_get\_weka\_io\_token) | The token to download the Weka release from get.weka.io. | `string` | `""` | no |
| <a name="input_hotspare"></a> [hotspare](#input\_hotspare) | Number of hotspares to set on weka cluster. Refer to https://docs.weka.io/overview/ssd-capacity-management#hot-spare | `number` | `1` | no |
| <a name="input_install_cluster_dpdk"></a> [install\_cluster\_dpdk](#input\_install\_cluster\_dpdk) | Install weka cluster with DPDK | `bool` | `true` | no |
Expand Down
38 changes: 28 additions & 10 deletions function-app/code/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package common

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -962,7 +964,7 @@ func RetrySetDeletionProtectionAndReport(
counter++
// deletion protection invoked by terminate function
if maxAttempts == 0 {
msg := fmt.Sprintf("Deletion protection set authorization isn't ready, will retry on next scale down workflow")
msg := "Deletion protection set authorization isn't ready, will retry on next scale down workflow"
ReportMsg(ctx, hostName, subscriptionId, resourceGroupName, stateContainerName, stateStorageName, "debug", msg)
return
}
Expand Down Expand Up @@ -994,8 +996,11 @@ func GetWekaClusterPassword(ctx context.Context, keyVaultUri string) (password s
return GetKeyVaultValue(ctx, keyVaultUri, "weka-password")
}

func GetVmScaleSetName(prefix, clusterName string, version string) string {
return fmt.Sprintf("%s-%s-vmss-%s", prefix, clusterName, version)
func GetVmScaleSetName(prefix, clusterName string, version int) string {
if version == 0 {
return fmt.Sprintf("%s-%s-vmss", prefix, clusterName)
}
return fmt.Sprintf("%s-%s-vmss-v%d", prefix, clusterName, version)
}

func GetScaleSetInstanceIds(ctx context.Context, subscriptionId, resourceGroupName, vmScaleSetName string) (instanceIds []string, err error) {
Expand Down Expand Up @@ -1111,12 +1116,19 @@ func ReadVmssConfig(ctx context.Context, storageName, containerName string) (vms
if err != nil {
return
}

// calculate hash of the config (used to identify vmss config changes)
// take first 10 chars of the hash
hash := sha256.Sum256(asByteArray)
hashStr := fmt.Sprintf("%x", hash)

err = json.Unmarshal(asByteArray, &vmssConfig)
if err != nil {
logger.Error().Err(err).Send()
return
}

vmssConfig.ConfigHash = hashStr[:10]
return
}

Expand Down Expand Up @@ -1233,23 +1245,29 @@ func GetScaleSetNameWithLatestConfiguration(ctx context.Context, subscriptionId,
return scaleSetName, nil
}

func GetScaleSetsList(ctx context.Context, subscriptionId, resourceGroupName string, scaleSetNames []string) (scaleSets []*armcompute.VirtualMachineScaleSet, err error) {
func GetScaleSetsByVersion(ctx context.Context, subscriptionId, resourceGroupName string, vmssState *VMSSState) (map[int]*armcompute.VirtualMachineScaleSet, error) {
logger := logging.LoggerFromCtx(ctx)

for _, scaleSetName := range scaleSetNames {
scaleSetNames := make([]string, 0)
scaleSetsByVersion := make(map[int]*armcompute.VirtualMachineScaleSet, len(vmssState.Versions))

for _, version := range vmssState.Versions {
scaleSetName := GetVmScaleSetName(vmssState.Prefix, vmssState.ClusterName, version)
scaleSet, err := getScaleSet(ctx, subscriptionId, resourceGroupName, scaleSetName)
if err != nil {
// if scale set not found, ignore
if getErr, ok := err.(*azcore.ResponseError); ok && getErr.ErrorCode == "ResourceNotFound" {
var responseErr *azcore.ResponseError
if errors.As(err, &responseErr) && responseErr.ErrorCode == "ResourceNotFound" {
continue
}
return nil, err
}
scaleSets = append(scaleSets, scaleSet)
scaleSetsByVersion[version] = scaleSet
scaleSetNames = append(scaleSetNames, scaleSetName)
}

logger.Info().Msgf("Found %d scale sets from list %v", len(scaleSets), scaleSetNames)
return
logger.Info().Msgf("Found %d scale sets from list %v", len(scaleSetsByVersion), scaleSetNames)
return scaleSetsByVersion, nil
}

func GetVmssConfig(ctx context.Context, resourceGroupName string, scaleSet *armcompute.VirtualMachineScaleSet) *VMSSConfig {
Expand Down Expand Up @@ -1388,7 +1406,7 @@ func CreateOrUpdateVmss(ctx context.Context, subscriptionId, resourceGroupName,
return
}

config.Tags["version"] = configHash
config.Tags["config_hash"] = configHash
config.Tags["config_applied_at"] = time.Now().Format(time.RFC3339)
size := int64(vmssSize)
forceDeletion := false
Expand Down
59 changes: 26 additions & 33 deletions function-app/code/common/vmss_config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package common

import (
"crypto/sha256"
"encoding/json"
"fmt"
"strings"

Expand Down Expand Up @@ -84,30 +82,21 @@ type VMSSConfig struct {
DataDisk DataDisk `json:"data_disk"`
PrimaryNIC PrimaryNIC `json:"primary_nic"`
SecondaryNICs SecondaryNICs `json:"secondary_nics"`
}

func GetConfigHash(c VMSSConfig) (string, error) {
// calculate hash of the config (used to identify vmss config changes)
// take first 10 chars of the hash
jsonData, err := json.Marshal(c)
if err != nil {
return "", fmt.Errorf("cannot marshal vmss config: %v", err)
}

hash := sha256.Sum256(jsonData)
hashStr := fmt.Sprintf("%x", hash)
return hashStr[:10], nil
// ignore the following fields when marshaling to json
ConfigHash string `json:"-"`
}

// Compares two vmss configs - works with copies of VMSSConfig structs
// NOTES:
// - does not compare "version" and "config_applied_at" tags, and names which include version
// - does not compare "config_hash" and "config_applied_at" tags, and names which include version
func VmssConfigsDiff(old, new VMSSConfig) string {
old.CustomData, new.CustomData = "", ""
old.Tags["version"], new.Tags["version"] = "", ""
old.Tags["config_hash"], new.Tags["config_hash"] = "", ""
old.Tags["config_applied_at"], new.Tags["config_applied_at"] = "", ""
old.ComputerNamePrefix, new.ComputerNamePrefix = "", ""
old.Name, new.Name = "", ""
old.ConfigHash, new.ConfigHash = "", ""

if len(old.PrimaryNIC.IPConfigurations) == len(new.PrimaryNIC.IPConfigurations) {
for i := range old.PrimaryNIC.IPConfigurations {
Expand All @@ -121,7 +110,7 @@ func VmssConfigsDiff(old, new VMSSConfig) string {
old.OSDisk.SizeGB = nil
}

return cmp.Diff(new, old) // arguments order: (want, got)
return cmp.Diff(old, new) // arguments order: (want, got)
}

func GetRefreshVmssName(outdatedVmssName string, currentVmssVersion uint16) string {
Expand All @@ -133,12 +122,25 @@ func GetRefreshVmssName(outdatedVmssName string, currentVmssVersion uint16) stri
}

type VMSSState struct {
Prefix string `json:"prefix"`
ClusterName string `json:"cluster_name"`
Versions []string `json:"active_versions"`
Prefix string `json:"prefix"`
ClusterName string `json:"cluster_name"`
Versions []int `json:"active_versions"`
}

func (q *VMSSState) AddVersion(item string) {
func (q *VMSSState) DeduceNextVersion() int {
if len(q.Versions) == 0 {
return 0
}
maxVersion := q.Versions[0]
for _, v := range q.Versions {
if v > maxVersion {
maxVersion = v
}
}
return maxVersion + 1
}

func (q *VMSSState) AddVersion(item int) {
// make sure version is added in the end of the queue
// and there are no duplicates
for i, v := range q.Versions {
Expand All @@ -152,24 +154,15 @@ func (q *VMSSState) AddVersion(item string) {
q.Versions = append(q.Versions, item)
}

func (q *VMSSState) GetLatestVersion() string {
func (q *VMSSState) GetLatestVersion() int {
return q.Versions[len(q.Versions)-1]
}

func (q *VMSSState) IsEmpty() bool {
return len(q.Versions) == 0
}

func (q *VMSSState) ReplaceVersion(old, new string) {
for i, v := range q.Versions {
if v == old {
q.Versions[i] = new
return
}
}
}

func (q *VMSSState) RemoveVersion(item string) error {
func (q *VMSSState) RemoveVersion(item int) error {
for i, v := range q.Versions {
// do not allow removing the last element of array
if v == item && i != len(q.Versions)-1 {
Expand All @@ -179,7 +172,7 @@ func (q *VMSSState) RemoveVersion(item string) error {
return fmt.Errorf("cannot remove the latest version from the queue")
}
}
return fmt.Errorf("version %s not found in the queue", item)
return fmt.Errorf("version %d not found in the queue", item)
}

type VMSSStateVerbose struct {
Expand Down
10 changes: 2 additions & 8 deletions function-app/code/functions/clusterize/clusterize.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,8 @@ func Clusterize(ctx context.Context, p ClusterizationParams) (clusterizeScript s
instanceName := strings.Split(p.VmName, ":")[0]
instanceId := common.GetScaleSetVmIndex(instanceName)

vmssState, err := common.ReadVmssState(ctx, p.StateStorageName, p.StateContainerName)
if err != nil {
err = fmt.Errorf("failed to read vmss state: %w", err)
logger.Error().Err(err).Send()
return
}

vmScaleSetName := common.GetVmScaleSetName(p.Prefix, p.Cluster.ClusterName, vmssState.GetLatestVersion())
version := 0 // on cluserization step we are sure that the vmss version is 0 as no refresh was done yet
vmScaleSetName := common.GetVmScaleSetName(p.Prefix, p.Cluster.ClusterName, version)
vmName := p.VmName

ip, err := common.GetPublicIp(ctx, p.SubscriptionId, p.ResourceGroupName, vmScaleSetName, p.Prefix, p.Cluster.ClusterName, instanceId)
Expand Down
Loading

0 comments on commit 84682fa

Please sign in to comment.