-
Notifications
You must be signed in to change notification settings - Fork 2
/
reporter.go
114 lines (101 loc) · 3.86 KB
/
reporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main
import (
"os"
"strconv"
"time"
)
const (
REPORTING_FREQUENCY_SECONDS = 5
ACTIVE_SENDERS_LIMIT = 3
BEACON_THRESHOLD_SECONDS = 12
)
type Reporter struct {
reportsChannel chan *TestJob
logger Logger
client *APIClient
reports []TestJob
lastServerCommunication time.Time
activeSenders int // Counts how many go routines are activelly trying to send reports
tickerChan <-chan time.Time
activeSenderDone chan bool // We reduce the active senders by sending to this channel
cancelledTestRunIdsChan chan []int
}
// NewReporter should be used to create a Reporter instances. It ensures the correct
// initialization of all fields.
func NewReporter(reportsChannel chan *TestJob, cancelledTestRunIdsChan chan []int) *Reporter {
logger := Logger{"Reporter", os.Stdout}
return &Reporter{
reportsChannel: reportsChannel,
logger: logger,
client: NewClient(logger),
tickerChan: time.NewTicker(time.Second * REPORTING_FREQUENCY_SECONDS).C,
activeSenderDone: make(chan bool),
cancelledTestRunIdsChan: cancelledTestRunIdsChan,
}
}
func (r *Reporter) ParseChannels() {
select {
case testJob := <-r.reportsChannel:
r.reports = append(r.reports, *testJob)
case <-r.activeSenderDone:
r.activeSenders -= 1
case <-r.tickerChan:
if r.activeSenders < ACTIVE_SENDERS_LIMIT && len(r.reports) > 0 {
go r.SendReports(r.reports)
r.reports = []TestJob{}
r.activeSenders += 1
} else if r.NeedToBeacon() {
go func() {
if _, err := r.client.Beacon(); err != nil {
panic("Tried to beacon but there was an error: " + err.Error())
}
r.lastServerCommunication = time.Now()
}()
}
}
}
func (r *Reporter) Start() {
r.logger.Log("Entering loop")
for {
r.ParseChannels()
}
}
// NeedToBeacon returns true if BEACON_THRESHOLD_SECONDS have passed since the
// last beacon request.
func (r *Reporter) NeedToBeacon() bool {
return time.Since(r.lastServerCommunication).Seconds() > BEACON_THRESHOLD_SECONDS
}
// SendReports takes a slice of TestJobs and sends it to Testributor. It will
// continue trying until successfully sent. This method should be run as a go
// routine to avoid blocking the worker in case of network issues. This means
// that if manager successfully fetches jobs, but reporter cannot report them
// back (for whatever reason), we will be creating an infinite number of
// background routines trying to send the reports to Testributor. This won't only
// fill the memory at some point, but also take over the network resources
// trying to communicate with Testributor from a large number of different threads.
// To avoid this issue, we keep a track of "active" SendReport routines (using
// a counter which decrements through a channel when routines exit). We apply a
// sane limit to the number of these routines (ACTIVE_SENDERS_LIMIT).
func (r *Reporter) SendReports(reports []TestJob) error {
defer func() { r.activeSenderDone <- true }() // decrement activeSenders
r.logger.Log("Sending " + strconv.Itoa(len(reports)) + " reports")
res, err := r.client.UpdateTestJobs(reports)
if err != nil {
r.logger.Log(err.Error())
return err
}
r.lastServerCommunication = time.Now()
// Tell Manager to cancel these TestRuns since they were cancelled on Testributor
// NOTE: We could do this in a go routine to let this sender exit but it
// shouldn't take long to send the cancelled ids to the manager so we do it here.
deleteTestRunIds := r.deleteTestRunIds(res)
r.cancelledTestRunIdsChan <- deleteTestRunIds
return nil
}
func (r *Reporter) deleteTestRunIds(response interface{}) []int {
var deleteTestRunIds []int
for _, id := range response.(map[string]interface{})["delete_test_runs"].([]interface{}) {
deleteTestRunIds = append(deleteTestRunIds, int(id.(float64)))
}
return deleteTestRunIds
}