Skip to content

Commit

Permalink
refactor(ai): fix lint warnings
Browse files Browse the repository at this point in the history
This commit fixes some linting errors to clean up the code a bit.
  • Loading branch information
rickstaa committed Oct 21, 2024
1 parent d5f0db7 commit 8a16848
Show file tree
Hide file tree
Showing 10 changed files with 1,327 additions and 2,296 deletions.
12 changes: 5 additions & 7 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/eth/blockwatch"
"github.com/livepeer/go-livepeer/eth/watchers"
"github.com/livepeer/go-livepeer/monitor"
lpmon "github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/server"
Expand Down Expand Up @@ -931,7 +930,6 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
mfv, _ := new(big.Int).SetString(*cfg.MaxFaceValue, 10)
if mfv == nil {
panic(fmt.Errorf("-maxFaceValue must be a valid integer, but %v provided. Restart the node with a different valid value for -maxFaceValue", *cfg.MaxFaceValue))
return
} else {
n.SetMaxFaceValue(mfv)
}
Expand Down Expand Up @@ -981,8 +979,8 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if maxPricePerUnit.Sign() > 0 {
pricePerPixel := new(big.Rat).Quo(maxPricePerUnit, pixelsPerUnit)
autoPrice, err := core.NewAutoConvertedPrice(currency, pricePerPixel, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxTranscodingPrice(price)
if lpmon.Enabled {
lpmon.MaxTranscodingPrice(price)
}
glog.Infof("Maximum transcoding price: %v wei per pixel\n ", price.FloatString(3))
})
Expand Down Expand Up @@ -1024,8 +1022,8 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
capName := core.CapabilityNameLookup[cap]
modelID := p.ModelID
autoCapPrice, err := core.NewAutoConvertedPrice(p.Currency, maxCapabilityPrice, func(price *big.Rat) {
if monitor.Enabled {
monitor.MaxPriceForCapability(capName, modelID, price)
if lpmon.Enabled {
lpmon.MaxPriceForCapability(capName, modelID, price)
}
glog.Infof("Maximum price per unit set to %v wei for capability=%v model_id=%v", price.FloatString(3), p.Pipeline, p.ModelID)
})
Expand Down Expand Up @@ -1587,7 +1585,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}

if n.NodeType == core.AIWorkerNode {
go server.RunAIWorker(n, orchURLs[0].Host, core.MaxSessions, n.Capabilities.ToNetCapabilities())
go server.RunAIWorker(n, orchURLs[0].Host, n.Capabilities.ToNetCapabilities())
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/ai_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestRemoteAIWorkerManager(t *testing.T) {

// error on remote
strm.JobError = fmt.Errorf("JobError")
res, err = m.Process(context.TODO(), "request_id2", "text-to-image", "livepeer/model1", "", AIJobRequestData{Request: req})
_, err = m.Process(context.TODO(), "request_id2", "text-to-image", "livepeer/model1", "", AIJobRequestData{Request: req})
assert.NotNil(t, err)
strm.JobError = nil

Expand Down
2 changes: 1 addition & 1 deletion core/ai_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (n *LivepeerNode) saveRemoteAIWorkerResults(ctx context.Context, results *R
// other pipelines do not require saving data since they are text responses
imgResp, isImg := results.Results.(worker.ImageResponse)
if isImg {
for idx, _ := range imgResp.Images {
for idx := range imgResp.Images {
fileName := imgResp.Images[idx].Url
// save the file data to node and provide url for download
storage, exists := n.StorageConfigs[requestID]
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func TestCapability_ProfileToCapability(t *testing.T) {
// iterate through lpms-defined profiles to ensure all are accounted for
// need to put into a slice and sort to ensure consistent ordering
profs := []int{}
for k, _ := range ffmpeg.ProfileParameters {
for k := range ffmpeg.ProfileParameters {
profs = append(profs, int(k))
}
sort.Ints(profs)
Expand Down
33 changes: 16 additions & 17 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/eth"
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-tools/drivers"
Expand Down Expand Up @@ -183,8 +182,8 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen
if err != nil {
clog.Errorf(ctx, "Error receiving ticket sessionID=%v recipientRandHash=%x senderNonce=%v: %v", manifestID, ticket.RecipientRandHash, ticket.SenderNonce, err)

if monitor.Enabled {
monitor.PaymentRecvError(ctx, sender.Hex(), err.Error())
if lpmon.Enabled {
lpmon.PaymentRecvError(ctx, sender.Hex(), err.Error())
}
if _, ok := err.(*pm.FatalReceiveErr); ok {
return err
Expand Down Expand Up @@ -217,10 +216,10 @@ func (orch *orchestrator) ProcessPayment(ctx context.Context, payment net.Paymen

clog.V(common.DEBUG).Infof(ctx, "Payment tickets processed sessionID=%v faceValue=%v winProb=%v ev=%v", manifestID, eth.FormatUnits(totalFaceValue, "ETH"), totalWinProb.FloatString(10), totalEV.FloatString(2))

if monitor.Enabled {
monitor.TicketValueRecv(ctx, sender.Hex(), totalEV)
monitor.TicketsRecv(ctx, sender.Hex(), totalTickets)
monitor.WinningTicketsRecv(ctx, sender.Hex(), totalWinningTickets)
if lpmon.Enabled {
lpmon.TicketValueRecv(ctx, sender.Hex(), totalEV)
lpmon.TicketsRecv(ctx, sender.Hex(), totalTickets)
lpmon.WinningTicketsRecv(ctx, sender.Hex(), totalWinningTickets)
}

if receiveErr != nil {
Expand Down Expand Up @@ -269,8 +268,8 @@ func (orch *orchestrator) PriceInfo(sender ethcommon.Address, manifestID Manifes
return nil, err
}

if monitor.Enabled {
monitor.TranscodingPrice(sender.String(), price)
if lpmon.Enabled {
lpmon.TranscodingPrice(sender.String(), price)
}

return &net.PriceInfo{
Expand Down Expand Up @@ -671,8 +670,8 @@ func (n *LivepeerNode) transcodeSeg(ctx context.Context, config transcodeConfig,

took := time.Since(start)
clog.V(common.DEBUG).Infof(ctx, "Transcoding of segment took=%v", took)
if monitor.Enabled {
monitor.SegmentTranscoded(ctx, 0, seg.SeqNo, md.Duration, took, common.ProfilesNames(md.Profiles), true, true)
if lpmon.Enabled {
lpmon.SegmentTranscoded(ctx, 0, seg.SeqNo, md.Duration, took, common.ProfilesNames(md.Profiles), true, true)
}

// Prepare the result object
Expand Down Expand Up @@ -1003,25 +1002,25 @@ func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTransco
rtm.remoteTranscoders = append(rtm.remoteTranscoders, transcoder)
sort.Sort(byLoadFactor(rtm.remoteTranscoders))
var totalLoad, totalCapacity, liveTranscodersNum int
if monitor.Enabled {
if lpmon.Enabled {
totalLoad, totalCapacity, liveTranscodersNum = rtm.totalLoadAndCapacity()
}
rtm.RTmutex.Unlock()
if monitor.Enabled {
monitor.SetTranscodersNumberAndLoad(totalLoad, totalCapacity, liveTranscodersNum)
if lpmon.Enabled {
lpmon.SetTranscodersNumberAndLoad(totalLoad, totalCapacity, liveTranscodersNum)
}

<-transcoder.eof
glog.Infof("Got transcoder=%s eof, removing from live transcoders map", from)

rtm.RTmutex.Lock()
delete(rtm.liveTranscoders, transcoder.stream)
if monitor.Enabled {
if lpmon.Enabled {
totalLoad, totalCapacity, liveTranscodersNum = rtm.totalLoadAndCapacity()
}
rtm.RTmutex.Unlock()
if monitor.Enabled {
monitor.SetTranscodersNumberAndLoad(totalLoad, totalCapacity, liveTranscodersNum)
if lpmon.Enabled {
lpmon.SetTranscodersNumberAndLoad(totalLoad, totalCapacity, liveTranscodersNum)
}
}

Expand Down
2 changes: 1 addition & 1 deletion discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func TestNewDBOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 500*time.Millisecond)
require.NoError(err)
assert.Equal(pool.Size(), 3)
orchs, err := pool.GetOrchestrators(context.TODO(), pool.Size(), newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0))
orchs, _ := pool.GetOrchestrators(context.TODO(), pool.Size(), newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0))
for _, o := range orchs {
assert.Equal(o.RemoteInfo.PriceInfo, expPriceInfo)
assert.Equal(o.RemoteInfo.Transcoder, expTranscoder)
Expand Down
Loading

0 comments on commit 8a16848

Please sign in to comment.