Skip to content

Commit

Permalink
feat: run deal filters in parallel (#1746)
Browse files Browse the repository at this point in the history
* run deal filters in parallel

* remove redundant return

* refactor to reduce duplicate code, apply suggestions

* cleanup and fix error string

* clean up

* add missing return
  • Loading branch information
LexLuthr authored Oct 18, 2023
1 parent ca4f6b5 commit 24d7303
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 66 deletions.
2 changes: 1 addition & 1 deletion docker/devnet/boost/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,4 @@ fi

echo Starting LID service and boost in dev mode...
trap 'kill %1' SIGINT
exec boostd -vv run --nosync=true
exec boostd -vv run --nosync=true &>> $BOOST_PATH/boostd.log
2 changes: 2 additions & 0 deletions storagemarket/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Provider struct {
publishedDealChan chan publishDealReq
updateRetryStateChan chan updateRetryStateReq
storageSpaceChan chan storageSpaceDealReq
processedDealChan chan processedDealReq

// Sealing Pipeline API
sps sealingpipeline.API
Expand Down Expand Up @@ -175,6 +176,7 @@ func NewProvider(cfg Config, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundma
publishedDealChan: make(chan publishDealReq),
updateRetryStateChan: make(chan updateRetryStateReq),
storageSpaceChan: make(chan storageSpaceDealReq),
processedDealChan: make(chan processedDealReq),

Transport: tspt,
xferLimiter: xferLimiter,
Expand Down
172 changes: 107 additions & 65 deletions storagemarket/provider_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ type updateRetryStateReq struct {
done chan error
}

type processedDealReq struct {
err *acceptError
rsp chan acceptDealResp
deal *types.ProviderDealState
}

func (p *Provider) logFunds(id uuid.UUID, trsp *fundmanager.TagFundsResp) {
p.dealLogger.Infow(id, "tagged funds for deal",
"tagged for deal publish", trsp.PublishMessage,
Expand Down Expand Up @@ -98,7 +104,7 @@ func (p *Provider) runDealFilters(deal *types.ProviderDealState) *acceptError {
return nil
}

func (p *Provider) processDealProposal(deal *types.ProviderDealState) *acceptError {
func (p *Provider) processOnlineDealProposal(deal *types.ProviderDealState) *acceptError {
host, err := deal.Transfer.Host()
if err != nil {
return &acceptError{
Expand Down Expand Up @@ -254,6 +260,26 @@ func (p *Provider) processOfflineDealProposal(ds *smtypes.ProviderDealState, dh
return nil
}

func (p *Provider) processDeal(deal *types.ProviderDealState, rsp chan acceptDealResp) {
var err *acceptError
if !deal.IsOffline {
err = p.processOnlineDealProposal(deal)
} else {
dh, herr := p.mkAndInsertDealHandler(deal.DealUuid)
if herr == nil {
err = p.processOfflineDealProposal(deal, dh)
} else {
err.error = herr
err.isSevereError = true
err.reason = "server error: creating deal thread"
}
}
select {
case p.processedDealChan <- processedDealReq{deal: deal, err: err, rsp: rsp}:
case <-p.ctx.Done():
}
}

func (p *Provider) processImportOfflineDealData(deal *types.ProviderDealState) *acceptError {
cleanup := func() {
collat, pub, errf := p.fundManager.UntagFunds(p.ctx, deal.DealUuid)
Expand Down Expand Up @@ -367,78 +393,28 @@ func (p *Provider) run() {
deal := dealReq.deal
p.dealLogger.Infow(deal.DealUuid, "processing deal acceptance request")

sendErrorResp := func(aerr *acceptError) {
// If the error is a severe error (eg can't connect to database)
if aerr.isSevereError {
// Send a rejection message to the client with a reason for rejection
resp := acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}}
// Log an error with more details for the provider
p.dealLogger.LogError(deal.DealUuid, "error while processing deal acceptance request", aerr)
dealReq.rsp <- resp
return
}

// The error is not a severe error, so don't log an error, just
// send a message to the client with a rejection reason
p.dealLogger.Infow(deal.DealUuid, "deal acceptance request rejected", "reason", aerr.reason, "error", aerr.error)
dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}, err: nil}
}

if deal.IsOffline && !dealReq.isImport {
// When the client proposes an offline deal, save the deal
// to the database but don't execute the deal. The deal
// will be executed when the Storage Provider imports the
// deal data.
dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
if err != nil {
sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: creating deal handler"})
continue
}

aerr := p.processOfflineDealProposal(dealReq.deal, dh)
if deal.IsOffline && dealReq.isImport {
// The Storage Provider is importing offline deal data, so tag
// funds for the deal and execute it
aerr := p.processImportOfflineDealData(dealReq.deal)
if aerr != nil {
dh.close()
p.delDealHandler(deal.DealUuid)
sendErrorResp(aerr)
p.sendErrorResp(aerr, dealReq.rsp, deal.DealUuid)
continue
}

// The deal proposal was successful. Send an Accept response to the client.
dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}
// Don't execute the deal now, wait for data import.
continue
}

var aerr *acceptError
if deal.IsOffline {
// The Storage Provider is importing offline deal data, so tag
// funds for the deal and execute it
aerr = p.processImportOfflineDealData(dealReq.deal)
} else {
// Process a regular deal proposal
aerr = p.processDealProposal(dealReq.deal)
}
if aerr != nil {
sendErrorResp(aerr)
continue
}
p.setupHandlerAndStartDeal(deal, dealReq.rsp)

// set up deal handler so that clients can subscribe to deal update events
dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
if err != nil {
sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"})
continue
}

// start executing the deal
_, err = p.startDealThread(dh, deal)
if err != nil {
sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"})
// send an accept response
dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}
continue
}

// send an accept response
dealReq.rsp <- acceptDealResp{&api.ProviderDealRejectionInfo{Accepted: true}, nil}
// Send new online and offline deals for processing.
// When the client proposes an offline deal, save the deal
// to the database but don't execute the deal. The deal
// will be executed when the Storage Provider imports the
// deal data.
go p.processDeal(deal, dealReq.rsp)

case storageSpaceDealReq := <-p.storageSpaceChan:
deal := storageSpaceDealReq.deal
Expand Down Expand Up @@ -519,6 +495,39 @@ func (p *Provider) run() {
}
finishedDeal.done <- struct{}{}

case processedDeal := <-p.processedDealChan:
deal := processedDeal.deal
if processedDeal.err != nil {
// Reject offline deal
if deal.IsOffline {
dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
if err != nil {
p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: getting deal thread"}, processedDeal.rsp, deal.DealUuid)
continue
}
dh.close()
p.delDealHandler(deal.DealUuid)
p.sendErrorResp(processedDeal.err, processedDeal.rsp, deal.DealUuid)
continue
}
// Reject online deal
p.sendErrorResp(processedDeal.err, processedDeal.rsp, deal.DealUuid)
continue
}

// Accept offline deal
if deal.IsOffline {
// The deal proposal was successful. Send an Accept response to the client.
processedDeal.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}
// Don't execute the deal now, wait for data import.
continue
}

p.setupHandlerAndStartDeal(deal, processedDeal.rsp)

// send an accept response
processedDeal.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}

case <-p.ctx.Done():
return
}
Expand Down Expand Up @@ -576,3 +585,36 @@ func (p *Provider) failPausedDeal(dh *dealHandler, deal *smtypes.ProviderDealSta

return nil
}

func (p *Provider) sendErrorResp(aerr *acceptError, resp chan acceptDealResp, dealId uuid.UUID) {
// If the error is a severe error (eg can't connect to database)
if aerr.isSevereError {
// Send a rejection message to the client with a reason for rejection
rsp := acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}}
// Log an error with more details for the provider
p.dealLogger.LogError(dealId, "error while processing deal acceptance request", aerr)
resp <- rsp
return
}

// The error is not a severe error, so don't log an error, just
// send a message to the client with a rejection reason
p.dealLogger.Infow(dealId, "deal acceptance request rejected", "reason", aerr.reason, "error", aerr.error)
resp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}, err: nil}
}

func (p *Provider) setupHandlerAndStartDeal(deal *types.ProviderDealState, rsp chan acceptDealResp) {
// Handle online accepted deal
// set up deal handler so that clients can subscribe to deal update events
dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
if err != nil {
p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: setting up deal handler"}, rsp, deal.DealUuid)
return
}

// start executing the deal
_, err = p.startDealThread(dh, deal)
if err != nil {
p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"}, rsp, deal.DealUuid)
}
}

0 comments on commit 24d7303

Please sign in to comment.