This repository has been archived by the owner on Sep 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
host_selection_service.go
188 lines (153 loc) · 5.48 KB
/
host_selection_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package audiusclient
import (
"errors"
"math/rand"
"sort"
"sync"
"time"
)
type HostSelectionService struct {
// A mutex to use while performing updates on the service.
mu sync.Mutex
// The name of the application issuing requests to Audius.
appName string
// The configuration for the service.
config hostSelectionServiceConfig
// The fetcher to use when updating the host list.
fetcher hostFetcher
// The service to use for health checking hosts.
healthCheckService *HostHealthCheckService
// The list of possible hosts to choose from.
// An update to the host list is only considered successful if one or more hosts was found.
hostList []string
// The time when the host list was last updated successfully (if at all).
hostListUpdatedAt *time.Time
// A mapping of unhealthy hosts (a subset of the host list) to the time they were marked unhealthy.
unhealthyHostMap map[string]time.Time
// The most recently selected host (among the host list).
// If the host is an empty string, it is unset.
selectedHost string
// The time when the selected host was last updated successfully (if at all).
selectedHostUpdatedAt *time.Time
}
func NewHostSelectionService(
appName string,
) *HostSelectionService {
selectionServiceConfig := newHostSelectionServiceConfig()
fetcher := NewDiscoveryHostFetcher(appName)
healthCheckFetcher := NewDiscoveryHostHealthCheckFetcher(appName)
healthCheckService := NewHostHealthCheckService(healthCheckFetcher)
return &HostSelectionService{
appName: appName,
config: selectionServiceConfig,
fetcher: fetcher,
healthCheckService: healthCheckService,
}
}
func (s *HostSelectionService) getHostList() ([]string, error) {
// Check if the host list has been fetched recently enough - if so just short circuit and return it.
if len(s.hostList) != 0 && s.hostListUpdatedAt != nil && time.Since(*s.hostListUpdatedAt) < s.config.HostListTTL {
return s.hostList, nil
}
// We need to re-fetch the host list.
hosts, err := s.fetcher.FetchHosts()
if err != nil {
return nil, err
}
if len(hosts) == 0 {
return nil, errors.New("fetched hosts were empty")
}
// Save the new host list:
s.hostList = hosts
t := time.Now()
s.hostListUpdatedAt = &t
s.unhealthyHostMap = map[string]time.Time{}
s.selectedHost = ""
s.selectedHostUpdatedAt = nil
return hosts, nil
}
func (s *HostSelectionService) GetSelectedHost() (string, error) {
s.mu.Lock()
defer func() {
s.mu.Unlock()
}()
// Check if the selected host has been fetched recently enough - if so just short circuit and return it.
if s.selectedHost != "" && s.selectedHostUpdatedAt != nil && time.Since(*s.selectedHostUpdatedAt) < s.config.SelectedHostTTL {
return s.selectedHost, nil
}
// We need to re-evaluate the hosts to select a new host.
hosts, err := s.getHostList()
if err != nil {
return "", err
}
// Filter out the unhealthy hosts:
filteredHosts := []string{}
for _, host := range hosts {
unhealthyAt, unhealthy := s.unhealthyHostMap[host]
if unhealthy && time.Since(unhealthyAt) < s.config.UnhealthyHostTTL {
// This host is still considered unhealthy - filter it out.
} else {
filteredHosts = append(filteredHosts, host)
}
}
// If the filtered hosts are empty, short circuit and return.
if len(filteredHosts) == 0 {
return "", errors.New("all hosts are currently unhealthy")
}
// Shuffle the filtered hosts:
rand.Seed(time.Now().Unix())
rand.Shuffle(len(filteredHosts), func(i, j int) { filteredHosts[i], filteredHosts[j] = filteredHosts[j], filteredHosts[i] })
// Pick a random subset of the hosts to test:
var hostsToTest []string
if len(filteredHosts) < s.config.MaximumConcurrentRequests {
hostsToTest = filteredHosts
} else {
hostsToTest = filteredHosts[:s.config.MaximumConcurrentRequests]
}
// Health check the hosts:
resultMap := s.healthCheckService.HealthCheckHosts(hostsToTest)
// Mark the unhealthy hosts and put the healthy hosts in an array for sorting:
var healthyHosts []hostHealthCheckResult
for host, healthCheckResult := range resultMap {
if healthCheckResult.Err != nil {
s.unhealthyHostMap[host] = time.Now()
} else {
healthyHosts = append(healthyHosts, healthCheckResult)
}
}
// If the healthy hosts are empty, short circuit and return.
if len(healthyHosts) == 0 {
return "", errors.New("all hosts are currently unhealthy")
}
// Sort the healthy hosts in order of preference.
sort.Slice(healthyHosts, func(i, j int) bool {
firstHostResult := healthyHosts[i]
secondHostResult := healthyHosts[j]
// // We prefer Audius hosts above all others:
// isFirstHostAudius := strings.HasSuffix(firstHostResult.Host, "audius.co")
// isSecondHostAudius := strings.HasSuffix(secondHostResult.Host, "audius.co")
// if isFirstHostAudius && !isSecondHostAudius {
// return true
// }
// if isSecondHostAudius && !isFirstHostAudius {
// return false
// }
// // We prefer staked hosts over others:
// isFirstHostStaked := strings.HasSuffix(firstHostResult.Host, "staked.cloud")
// isSecondHostStaked := strings.HasSuffix(secondHostResult.Host, "staked.cloud")
// if isFirstHostStaked && !isSecondHostStaked {
// return true
// }
// if isSecondHostStaked && !isFirstHostStaked {
// return false
// }
// We prefer faster hosts last:
return firstHostResult.Duration < secondHostResult.Duration
})
// Select the first host in order of preference:
selectedHost := healthyHosts[0].Host
s.selectedHost = selectedHost
t := time.Now()
s.selectedHostUpdatedAt = &t
return selectedHost, nil
}