-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Test worker priorization #45
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -13,6 +13,13 @@ import ( | |||||
"github.com/dadosjusbr/alba/storage" | ||||||
) | ||||||
|
||||||
type dbInterface interface { | ||||||
GetPipelinesByDay(day int) ([]storage.Pipeline, error) | ||||||
InsertExecution(e storage.Execution) error | ||||||
GetLastExecutionsForAllPipelines() ([]storage.Execution, error) | ||||||
GetLastExecutionsByPipelineID(limit, id int) ([]storage.Execution, error) | ||||||
} | ||||||
|
||||||
func main() { | ||||||
var pipelines []storage.Pipeline | ||||||
var finalPipelines []storage.Pipeline | ||||||
|
@@ -22,6 +29,16 @@ func main() { | |||||
log.Fatal("error trying get environment variable: $MONGODB is empty") | ||||||
} | ||||||
|
||||||
errorLimitStr := os.Getenv("ERROR_LIMIT") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Também poderia comentar essa variável? Vale super pena documentar esse parâmetro do programa. |
||||||
if uri == "" { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lembrando que o |
||||||
log.Fatal("error trying get environment variable: $ERROR_LIMIT is empty") | ||||||
} | ||||||
|
||||||
errorLimit, err := strconv.Atoi(errorLimitStr) | ||||||
if err != nil { | ||||||
log.Fatalf("error trying convert variable $ERROR_LIMIT: %q", err) | ||||||
} | ||||||
|
||||||
dbClient, err := storage.NewDBClient(uri) | ||||||
if err != nil { | ||||||
log.Fatal(err) | ||||||
|
@@ -55,16 +72,15 @@ func main() { | |||||
// if err != nil { | ||||||
// log.Fatal(err) | ||||||
// } | ||||||
// TODO: merge pipelines a fim de não repetir os ids | ||||||
// final_pipelines = append(final_pipelines, pipelines...) | ||||||
|
||||||
// pipelines, err = getPipelinesForCompleteHistory(dbClient) | ||||||
// if err != nil { | ||||||
// log.Fatal(err) | ||||||
// } | ||||||
// final_pipelines = append(final_pipelines, pipelines...) | ||||||
|
||||||
// Algoritmo: shuffle na lista + cap | ||||||
toExecuteNow := prioritizeAndLimit(finalPipelines) | ||||||
toExecuteNow := prioritizeAndLimit(dbClient, finalPipelines, errorLimit) | ||||||
|
||||||
for _, p := range toExecuteNow { | ||||||
err := run(emailSender, emailPassword, emailReceiver, p, dbClient) | ||||||
|
@@ -143,13 +159,12 @@ func mergeEnv(defaultEnv, stageEnv map[string]string) map[string]string { | |||||
return env | ||||||
} | ||||||
|
||||||
func prioritizeAndLimit(list []storage.Pipeline) []storage.Pipeline { | ||||||
// TODO: ordenar do mais recente para o mais antigo | ||||||
// TODO: receber quantidade máxima de execuções via parâmetro e manter somente os x primeiros | ||||||
func prioritizeAndLimit(db dbInterface, list []storage.Pipeline, limit int) []storage.Pipeline { | ||||||
|
||||||
return list | ||||||
} | ||||||
|
||||||
func getPipelinesToExecuteToday(db *storage.DBClient) ([]storage.Pipeline, error) { | ||||||
func getPipelinesToExecuteToday(db dbInterface) ([]storage.Pipeline, error) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eu acho arretada essa forma que programas em Go crescem em complexidade. Hoje em dia me soa bastante orgânico, natural? Nenhuma modição em extends ou implements! Note que você não precisou modificar em nada |
||||||
results, err := db.GetPipelinesByDay(time.Now().Day()) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("error getting pipelines by day: %q", err) | ||||||
|
@@ -160,14 +175,14 @@ func getPipelinesToExecuteToday(db *storage.DBClient) ([]storage.Pipeline, error | |||||
|
||||||
// Apenas as execuções que devem acontecer por causa do mecanismo de | ||||||
// tolerância à falhas. | ||||||
func getPipelinesThatFailed(db *storage.DBClient) []storage.Pipeline { | ||||||
func getPipelinesThatFailed(db dbInterface) []storage.Pipeline { | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
// Apenas execuções de devem acontecer para completar o histórico. Devemos | ||||||
// ignorar casos em que já houve tentativa de execução, quer seja sucesso ou falha. | ||||||
func getPipelinesForCompleteHistory(db *storage.DBClient) []storage.Pipeline { | ||||||
func getPipelinesForCompleteHistory(db dbInterface) []storage.Pipeline { | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package main | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/dadosjusbr/alba/storage" | ||
"github.com/dadosjusbr/executor" | ||
"github.com/matryer/is" | ||
) | ||
|
||
var pipelinesDB = []storage.Pipeline{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tu deve precisar criar vários desses a medida que o teste aumenta. Sugestão: cria os Daí, cria o |
||
{ | ||
Pipeline: executor.Pipeline{ | ||
Name: "Pipeline 01: Finalizado com sucesso", | ||
}, | ||
ID: "pipeline1", | ||
Entity: "Pipeline 01", | ||
City: "João Pessoa", | ||
FU: "PB", | ||
Repo: "github.com/dadosjusbr/coletores", | ||
Frequency: 30, | ||
StartDay: 5, | ||
LimitMonthBackward: 2, | ||
LimitYearBackward: 2021, | ||
}, | ||
{ | ||
Pipeline: executor.Pipeline{ | ||
Name: "Pipeline 02: Tolerância a falha", | ||
}, | ||
ID: "pipeline2", | ||
Entity: "Tribunal Regional do Trabalho 13ª Região", | ||
City: "João Pessoa", | ||
FU: "PB", | ||
Repo: "github.com/dadosjusbr/coletores", | ||
Frequency: 30, | ||
StartDay: 5, | ||
LimitMonthBackward: 2, | ||
LimitYearBackward: 2021, | ||
}, | ||
{ | ||
Pipeline: executor.Pipeline{ | ||
Name: "Pipeline 03: Histórico", | ||
}, | ||
ID: "pipeline3", | ||
Entity: "Pipeline 03", | ||
City: "João Pessoa", | ||
FU: "PB", | ||
Repo: "github.com/dadosjusbr/coletores", | ||
Frequency: 30, | ||
StartDay: 5, | ||
LimitMonthBackward: 2, | ||
LimitYearBackward: 2021, | ||
}, | ||
} | ||
|
||
type fakeFinder struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fakeDB? |
||
pipelines []storage.Pipeline | ||
executions []storage.Execution | ||
err error | ||
} | ||
|
||
func (fake fakeFinder) GetPipelinesByDay(day int) ([]storage.Pipeline, error) { | ||
return fake.pipelines, fake.err | ||
} | ||
func (fake fakeFinder) InsertExecution(e storage.Execution) error { | ||
return fake.err | ||
} | ||
|
||
// TODO: Retornar a última execução de cada pipeline | ||
func (fake fakeFinder) GetLastExecutionsForAllPipelines() ([]storage.Execution, error) { | ||
return fake.executions, fake.err | ||
} | ||
|
||
// TODO: Retornar as <limit> ultimas execuções daquele pipeline | ||
func (fake fakeFinder) GetLastExecutionsByPipelineID(limit, id int) ([]storage.Execution, error) { | ||
return fake.executions, fake.err | ||
} | ||
|
||
// PrioritizeAndLimit | ||
// 1º descartar se o pipeline já tem uma execução finalizada com sucesso hoje | ||
// 2º descartar se o número de execuções com erro for igual ou maior que o limite | ||
// ... | ||
// por último aplicar filtro de tamanho | ||
func TestPrioritizeAndLimit_LimitTest(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
is := is.New(t) | ||
list := pipelinesDB | ||
limit := 2 | ||
|
||
finalPipelines := prioritizeAndLimit(fakeFinder{pipelines: pipelinesDB, err: nil}, list, limit) | ||
|
||
is.True(len(finalPipelines) <= limit) | ||
} | ||
|
||
func TestPrioritizeAndLimit_PipelineWithMaxLimitOfFailure(t *testing.T) { | ||
is := is.New(t) | ||
list := pipelinesDB | ||
limit := 3 | ||
|
||
executions := []storage.Execution{ | ||
{ | ||
PipelineResult: executor.PipelineResult{ | ||
Name: "Pipeline 02", | ||
Status: "Setup Error", | ||
}, | ||
ID: "pipeline2", | ||
Entity: "Pipeline 02", | ||
}, | ||
{ | ||
PipelineResult: executor.PipelineResult{ | ||
Name: "Pipeline 02", | ||
Status: "Connection Error", | ||
}, | ||
ID: "pipeline2", | ||
Entity: "Pipeline 02", | ||
}, | ||
{ | ||
PipelineResult: executor.PipelineResult{ | ||
Name: "Pipeline 02", | ||
Status: "Run Error", | ||
}, | ||
ID: "pipeline2", | ||
Entity: "Pipeline 02", | ||
}, | ||
} | ||
|
||
finalPipelines := prioritizeAndLimit(fakeFinder{executions: executions, err: nil}, list, limit) | ||
|
||
is.True(len(finalPipelines) == 2) | ||
} | ||
|
||
func TestPrioritizeAndLimit_PipelineWithTwoFailures(t *testing.T) { | ||
is := is.New(t) | ||
list := pipelinesDB | ||
limit := 3 | ||
|
||
executions := []storage.Execution{ | ||
{ | ||
PipelineResult: executor.PipelineResult{ | ||
Name: "Pipeline 02", | ||
Status: "Setup Error", | ||
}, | ||
ID: "pipeline2", | ||
Entity: "Pipeline 02", | ||
}, | ||
{ | ||
PipelineResult: executor.PipelineResult{ | ||
Name: "Pipeline 02", | ||
Status: "Connection Error", | ||
}, | ||
ID: "pipeline2", | ||
Entity: "Pipeline 02", | ||
}, | ||
} | ||
|
||
finalPipelines := prioritizeAndLimit(fakeFinder{executions: executions, err: nil}, list, limit) | ||
|
||
is.True(len(finalPipelines) == 3) | ||
} | ||
|
||
func TestPrioritizeAndLimit_PipelineWithSuccessfulExecution(t *testing.T) { | ||
is := is.New(t) | ||
list := pipelinesDB | ||
limit := 3 | ||
|
||
executions := []storage.Execution{ | ||
{ | ||
PipelineResult: executor.PipelineResult{ | ||
Name: "Pipeline 01", | ||
Status: "OK", | ||
}, | ||
ID: "pipeline1", | ||
Entity: "Pipeline 01", | ||
}, | ||
{ | ||
PipelineResult: executor.PipelineResult{ | ||
Name: "Pipeline 02", | ||
Status: "Setup Error", | ||
}, | ||
ID: "pipeline2", | ||
Entity: "Pipeline 02", | ||
}, | ||
{ | ||
PipelineResult: executor.PipelineResult{ | ||
Name: "Pipeline 03", | ||
Status: "OK", | ||
}, | ||
ID: "pipeline3", | ||
Entity: "Pipeline 03", | ||
}, | ||
} | ||
|
||
finalPipelines := prioritizeAndLimit(fakeFinder{executions: executions, err: nil}, list, limit) | ||
|
||
is.True(len(finalPipelines) == 1) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Você provavelmente vai precisar passar algum tipo de quantidade aqui, não?
Outra coisa importante, a interface pode ser um bom lugar para documentar bem essas funções. Por exemplo, o método
GetPipelinesByDay(int)
retorna todos os pipelines do dia especificado, sempre no mês corrente.