Skip to content

Commit

Permalink
reafactor: use service logger
Browse files Browse the repository at this point in the history
  • Loading branch information
mpolitzer committed Dec 6, 2024
1 parent f64ef31 commit 9626a05
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 445 deletions.
74 changes: 36 additions & 38 deletions internal/advancer/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"time"

Expand Down Expand Up @@ -43,25 +42,11 @@ type IAdvancerMachines interface {
Apps() []Address
}

type Advancer struct {
repository IAdvancerRepository
machines IAdvancerMachines
}

type Service struct {
service.Service
Advancer
inspector *inspect.Inspector
}

func New(machines IAdvancerMachines, repository IAdvancerRepository) (*Advancer, error) {
if machines == nil {
return nil, ErrInvalidMachines
}
if repository == nil {
return nil, ErrInvalidRepository
}
return &Advancer{machines: machines, repository: repository}, nil
repository IAdvancerRepository
machines IAdvancerMachines
inspector *inspect.Inspector
}

type CreateInfo struct {
Expand Down Expand Up @@ -92,29 +77,42 @@ func Create(c *CreateInfo, s *Service) error {
return err
}

if c.Repository == nil {
c.Repository, err = repository.Connect(s.Context, c.PostgresEndpoint.Value)
if err != nil {
return err
if s.repository == nil {
if c.Repository == nil {
c.Repository, err = repository.Connect(s.Context, c.PostgresEndpoint.Value)
if err != nil {
return err
}
}
s.repository = c.Repository
}
s.repository = c.Repository

if c.Machines == nil {
c.Machines, err = machines.Load(s.Context, c.Repository, c.MachineServerVerbosity.Value)
if err != nil {
return err
if s.machines == nil {
if c.Machines == nil {
c.Machines, err = machines.Load(s.Context,
c.Repository, c.MachineServerVerbosity.Value, s.Logger)
if err != nil {
return err
}
}
s.machines = c.Machines
}
s.machines = c.Machines

if s.Service.ServeMux == nil {
if c.CreateInfo.ServeMux == nil {
c.ServeMux = http.NewServeMux()
// allow partial construction for testing
if s.inspector == nil && c.Machines != nil {
s.inspector = &inspect.Inspector{c.Machines}
if err != nil {
return fmt.Errorf("failed to create the inspector: %w", err)
}

if s.Service.ServeMux == nil {
if c.CreateInfo.ServeMux == nil {
c.ServeMux = http.NewServeMux()
}
}
s.Service.ServeMux.Handle("/inspect/{dapp}", http.Handler(s.inspector))
s.Service.ServeMux.Handle("/inspect/{dapp}/{payload}", http.Handler(s.inspector))
}
s.Service.ServeMux.Handle("/inspect/{dapp}", http.Handler(s.inspector))
s.Service.ServeMux.Handle("/inspect/{dapp}/{payload}", http.Handler(s.inspector))

return nil
}
Expand Down Expand Up @@ -144,7 +142,7 @@ func (v *Service) String() string {
// It gets unprocessed inputs from the repository,
// runs them through the cartesi machine,
// and updates the repository with the outputs.
func (advancer *Advancer) Step(ctx context.Context) error {
func (advancer *Service) Step(ctx context.Context) error {
// Dynamically updates the list of machines
err := advancer.machines.UpdateMachines(ctx)
if err != nil {
Expand All @@ -154,15 +152,15 @@ func (advancer *Advancer) Step(ctx context.Context) error {
apps := advancer.machines.Apps()

// Gets the unprocessed inputs (of all apps) from the repository.
slog.Debug("advancer: querying for unprocessed inputs")
advancer.Logger.Debug("querying for unprocessed inputs")
inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps)
if err != nil {
return err
}

// Processes each set of inputs.
for app, inputs := range inputs {
slog.Debug(fmt.Sprintf("advancer: processing %d input(s) from %v", len(inputs), app))
advancer.Logger.Debug(fmt.Sprintf("processing %d input(s) from %v", len(inputs), app))
err := advancer.process(ctx, app, inputs)
if err != nil {
return err
Expand All @@ -181,7 +179,7 @@ func (advancer *Advancer) Step(ctx context.Context) error {
}

// process sequentially processes inputs from the the application.
func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error {
func (advancer *Service) process(ctx context.Context, app Address, inputs []*Input) error {
// Asserts that the app has an associated machine.
machine, exists := advancer.machines.GetAdvanceMachine(app)
if !exists {
Expand All @@ -195,7 +193,7 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In

// FIXME if theres a change in epoch id call update epochs
for _, input := range inputs {
slog.Info("advancer: Processing input", "app", app, "id", input.Id, "index", input.Index)
advancer.Logger.Info("Processing input", "app", app, "id", input.Id, "index", input.Index)

// Sends the input to the cartesi machine.
res, err := machine.Advance(ctx, input.RawData, input.Index)
Expand Down
56 changes: 16 additions & 40 deletions internal/advancer/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cartesi/rollups-node/internal/advancer/machines"
. "github.com/cartesi/rollups-node/internal/model"
"github.com/cartesi/rollups-node/internal/nodemachine"
"github.com/cartesi/rollups-node/pkg/service"

"github.com/stretchr/testify/suite"
)
Expand All @@ -25,41 +26,16 @@ func TestAdvancer(t *testing.T) {

type AdvancerSuite struct{ suite.Suite }

func (s *AdvancerSuite) TestNew() {
s.Run("Ok", func() {
require := s.Require()
machines := newMockMachines()
machines.Map[randomAddress()] = &MockMachine{}
var repository IAdvancerRepository = &MockRepository{}
advancer, err := New(machines, repository)
require.NotNil(advancer)
require.Nil(err)
})

s.Run("InvalidMachines", func() {
require := s.Require()
var machines IAdvancerMachines = nil
var repository IAdvancerRepository = &MockRepository{}
advancer, err := New(machines, repository)
require.Nil(advancer)
require.Error(err)
require.Equal(ErrInvalidMachines, err)
})

s.Run("InvalidRepository", func() {
require := s.Require()
machines := newMockMachines()
machines.Map[randomAddress()] = &MockMachine{}
var repository IAdvancerRepository = nil
advancer, err := New(machines, repository)
require.Nil(advancer)
require.Error(err)
require.Equal(ErrInvalidRepository, err)
})
}

func (s *AdvancerSuite) TestPoller() {
s.T().Skip("TODO")
func New(m IAdvancerMachines, r IAdvancerRepository) (*Service, error) {
s := &Service{
machines: m,
repository: r,
}
return s, Create(&CreateInfo{
CreateInfo: service.CreateInfo{
Name: "advancer",
},
}, s)
}

func (s *AdvancerSuite) TestRun() {
Expand Down Expand Up @@ -105,15 +81,15 @@ func (s *AdvancerSuite) TestRun() {
}

func (s *AdvancerSuite) TestProcess() {
setup := func() (IAdvancerMachines, *MockRepository, *Advancer, Address) {
setup := func() (IAdvancerMachines, *MockRepository, *Service, Address) {
require := s.Require()

app := randomAddress()
machines := newMockMachines()
machines.Map[app] = &MockMachine{}
repository := &MockRepository{}
advancer := &Advancer{
machines: machines,
repository: repository,
}
advancer, err := New(machines, repository)
require.Nil(err)
return machines, repository, advancer, app
}

Expand Down
45 changes: 29 additions & 16 deletions internal/advancer/machines/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,20 @@ type Machines struct {
machines map[Address]*nm.NodeMachine
repository Repository
verbosity cm.ServerVerbosity
Logger *slog.Logger
}

// Load initializes the cartesi machines.
// Load advances a machine to the last processed input stored in the database.
//
// Load does not fail when one of those machines fail to initialize.
// It stores the error to be returned later and continues to initialize the other machines.
func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*Machines, error) {
func Load(
ctx context.Context,
repo Repository,
verbosity cm.ServerVerbosity,
logger *slog.Logger,
) (*Machines, error) {
configs, err := repo.GetMachineConfigurations(ctx)
if err != nil {
return nil, err
Expand All @@ -63,15 +69,15 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*

for _, config := range configs {
// Creates the machine.
machine, err := createMachine(ctx, verbosity, config)
machine, err := createMachine(ctx, verbosity, config, logger)
if err != nil {
err = fmt.Errorf("failed to create machine from snapshot (%v): %w", config, err)
errs = errors.Join(errs, err)
continue
}

// Advances the machine until it catches up with the state of the database (if necessary).
err = catchUp(ctx, repo, config.AppAddress, machine, config.ProcessedInputs)
err = catchUp(ctx, repo, config.AppAddress, machine, config.ProcessedInputs, logger)
if err != nil {
err = fmt.Errorf("failed to advance cartesi machine (%v): %w", config, err)
errs = errors.Join(errs, err, machine.Close())
Expand All @@ -81,7 +87,12 @@ func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*
machines[config.AppAddress] = machine
}

return &Machines{machines: machines, repository: repo, verbosity: verbosity}, errs
return &Machines{
machines: machines,
repository: repo,
verbosity: verbosity,
Logger: logger,
}, errs
}

func (m *Machines) UpdateMachines(ctx context.Context) error {
Expand All @@ -95,15 +106,15 @@ func (m *Machines) UpdateMachines(ctx context.Context) error {
continue
}

machine, err := createMachine(ctx, m.verbosity, config)
machine, err := createMachine(ctx, m.verbosity, config, m.Logger)
if err != nil {
slog.Error("advancer: Failed to create machine", "app", config.AppAddress, "error", err)
m.Logger.Error("Failed to create machine", "app", config.AppAddress, "error", err)
continue
}

err = catchUp(ctx, m.repository, config.AppAddress, machine, config.ProcessedInputs)
err = catchUp(ctx, m.repository, config.AppAddress, machine, config.ProcessedInputs, m.Logger)
if err != nil {
slog.Error("Failed to sync the machine", "app", config.AppAddress, "error", err)
m.Logger.Error("Failed to sync the machine", "app", config.AppAddress, "error", err)
machine.Close()
continue
}
Expand Down Expand Up @@ -158,7 +169,7 @@ func (m *Machines) RemoveAbsent(configs []*MachineConfig) {
}
for address, machine := range m.machines {
if !configMap[address] {
slog.Info("advancer: Application was disabled, shutting down machine", "application", address)
m.Logger.Info("Application was disabled, shutting down machine", "application", address)
machine.Close()
delete(m.machines, address)
}
Expand Down Expand Up @@ -200,7 +211,7 @@ func (m *Machines) Close() error {

err := closeMachines(m.machines)
if err != nil {
slog.Error(fmt.Sprintf("failed to close some machines: %v", err))
m.Logger.Error(fmt.Sprintf("failed to close some machines: %v", err))
}
return err
}
Expand All @@ -227,17 +238,18 @@ func closeMachines(machines map[Address]*nm.NodeMachine) (err error) {
func createMachine(ctx context.Context,
verbosity cm.ServerVerbosity,
config *MachineConfig,
logger *slog.Logger,
) (*nm.NodeMachine, error) {
slog.Info("advancer: creating machine", "application", config.AppAddress,
logger.Info("creating machine", "application", config.AppAddress,
"template-path", config.SnapshotPath)
slog.Debug("advancer: instantiating remote machine server", "application", config.AppAddress)
logger.Debug("instantiating remote machine server", "application", config.AppAddress)
// Starts the server.
address, err := cm.StartServer(verbosity, 0, os.Stdout, os.Stderr)
if err != nil {
return nil, err
}

slog.Info("advancer: loading machine on server", "application", config.AppAddress,
logger.Info("loading machine on server", "application", config.AppAddress,
"remote-machine", address, "template-path", config.SnapshotPath)
// Creates a CartesiMachine from the snapshot.
runtimeConfig := &emulator.MachineRuntimeConfig{}
Expand All @@ -246,7 +258,7 @@ func createMachine(ctx context.Context,
return nil, errors.Join(err, cm.StopServer(address))
}

slog.Debug("advancer: machine loaded on server", "application", config.AppAddress,
logger.Debug("machine loaded on server", "application", config.AppAddress,
"remote-machine", address, "template-path", config.SnapshotPath)

// Creates a RollupsMachine from the CartesiMachine.
Expand Down Expand Up @@ -276,9 +288,10 @@ func catchUp(ctx context.Context,
app Address,
machine *nm.NodeMachine,
processedInputs uint64,
logger *slog.Logger,
) error {

slog.Info("advancer: catching up unprocessed inputs", "app", app)
logger.Info("catching up unprocessed inputs", "app", app)

inputs, err := repo.GetProcessedInputs(ctx, app, processedInputs)
if err != nil {
Expand All @@ -287,7 +300,7 @@ func catchUp(ctx context.Context,

for _, input := range inputs {
// FIXME epoch id to epoch index
slog.Info("advancer: advancing", "app", app, "epochId", input.EpochId,
logger.Info("advancing", "app", app, "epochId", input.EpochId,
"input_index", input.Index)
_, err := machine.Advance(ctx, input.RawData, input.Index)
if err != nil {
Expand Down
Loading

0 comments on commit 9626a05

Please sign in to comment.