Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: run deal filters in parallel #1746

Merged
merged 6 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/devnet/boost/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ fi

echo Starting LID service and boost in dev mode...
trap 'kill %1' SIGINT
exec boostd -vv run --nosync=true
exec boostd -vv run --nosync=true &>> $BOOST_PATH/boostd.log
2 changes: 2 additions & 0 deletions storagemarket/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Provider struct {
publishedDealChan chan publishDealReq
updateRetryStateChan chan updateRetryStateReq
storageSpaceChan chan storageSpaceDealReq
processedDealChan chan processedDealReq

// Sealing Pipeline API
sps sealingpipeline.API
Expand Down Expand Up @@ -175,6 +176,7 @@ func NewProvider(cfg Config, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundma
publishedDealChan: make(chan publishDealReq),
updateRetryStateChan: make(chan updateRetryStateReq),
storageSpaceChan: make(chan storageSpaceDealReq),
processedDealChan: make(chan processedDealReq),

Transport: tspt,
xferLimiter: xferLimiter,
Expand Down
172 changes: 107 additions & 65 deletions storagemarket/provider_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ type updateRetryStateReq struct {
done chan error
}

type processedDealReq struct {
err *acceptError
rsp chan acceptDealResp
deal *types.ProviderDealState
}

func (p *Provider) logFunds(id uuid.UUID, trsp *fundmanager.TagFundsResp) {
p.dealLogger.Infow(id, "tagged funds for deal",
"tagged for deal publish", trsp.PublishMessage,
Expand Down Expand Up @@ -98,7 +104,7 @@ func (p *Provider) runDealFilters(deal *types.ProviderDealState) *acceptError {
return nil
}

func (p *Provider) processDealProposal(deal *types.ProviderDealState) *acceptError {
func (p *Provider) processOnlineDealProposal(deal *types.ProviderDealState) *acceptError {
host, err := deal.Transfer.Host()
if err != nil {
return &acceptError{
Expand Down Expand Up @@ -254,6 +260,26 @@ func (p *Provider) processOfflineDealProposal(ds *smtypes.ProviderDealState, dh
return nil
}

func (p *Provider) processDeal(deal *types.ProviderDealState, rsp chan acceptDealResp) {
var err *acceptError
if !deal.IsOffline {
err = p.processOnlineDealProposal(deal)
} else {
dh, herr := p.mkAndInsertDealHandler(deal.DealUuid)
if herr == nil {
err = p.processOfflineDealProposal(deal, dh)
} else {
err.error = herr
err.isSevereError = true
err.reason = "server error: creating deal thread"
}
}
select {
case p.processedDealChan <- processedDealReq{deal: deal, err: err, rsp: rsp}:
case <-p.ctx.Done():
}
}

func (p *Provider) processImportOfflineDealData(deal *types.ProviderDealState) *acceptError {
cleanup := func() {
collat, pub, errf := p.fundManager.UntagFunds(p.ctx, deal.DealUuid)
Expand Down Expand Up @@ -367,78 +393,28 @@ func (p *Provider) run() {
deal := dealReq.deal
p.dealLogger.Infow(deal.DealUuid, "processing deal acceptance request")

sendErrorResp := func(aerr *acceptError) {
// If the error is a severe error (eg can't connect to database)
if aerr.isSevereError {
// Send a rejection message to the client with a reason for rejection
resp := acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}}
// Log an error with more details for the provider
p.dealLogger.LogError(deal.DealUuid, "error while processing deal acceptance request", aerr)
dealReq.rsp <- resp
return
}

// The error is not a severe error, so don't log an error, just
// send a message to the client with a rejection reason
p.dealLogger.Infow(deal.DealUuid, "deal acceptance request rejected", "reason", aerr.reason, "error", aerr.error)
dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}, err: nil}
}

if deal.IsOffline && !dealReq.isImport {
// When the client proposes an offline deal, save the deal
// to the database but don't execute the deal. The deal
// will be executed when the Storage Provider imports the
// deal data.
dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
if err != nil {
sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: creating deal handler"})
continue
}

aerr := p.processOfflineDealProposal(dealReq.deal, dh)
if deal.IsOffline && dealReq.isImport {
// The Storage Provider is importing offline deal data, so tag
// funds for the deal and execute it
aerr := p.processImportOfflineDealData(dealReq.deal)
if aerr != nil {
dh.close()
p.delDealHandler(deal.DealUuid)
sendErrorResp(aerr)
p.sendErrorResp(aerr, dealReq.rsp, deal.DealUuid)
continue
}

// The deal proposal was successful. Send an Accept response to the client.
dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}
// Don't execute the deal now, wait for data import.
continue
}

var aerr *acceptError
if deal.IsOffline {
// The Storage Provider is importing offline deal data, so tag
// funds for the deal and execute it
aerr = p.processImportOfflineDealData(dealReq.deal)
} else {
// Process a regular deal proposal
aerr = p.processDealProposal(dealReq.deal)
}
if aerr != nil {
sendErrorResp(aerr)
continue
}
p.setupHandlerAndStartDeal(deal, dealReq.rsp)

// set up deal handler so that clients can subscribe to deal update events
dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
if err != nil {
sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"})
continue
}

// start executing the deal
_, err = p.startDealThread(dh, deal)
if err != nil {
sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"})
// send an accept response
dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}
continue
}

// send an accept response
dealReq.rsp <- acceptDealResp{&api.ProviderDealRejectionInfo{Accepted: true}, nil}
// Send new online and offline deals for processing.
// When the client proposes an offline deal, save the deal
// to the database but don't execute the deal. The deal
// will be executed when the Storage Provider imports the
// deal data.
go p.processDeal(deal, dealReq.rsp)

case storageSpaceDealReq := <-p.storageSpaceChan:
deal := storageSpaceDealReq.deal
Expand Down Expand Up @@ -519,6 +495,39 @@ func (p *Provider) run() {
}
finishedDeal.done <- struct{}{}

case processedDeal := <-p.processedDealChan:
deal := processedDeal.deal
if processedDeal.err != nil {
// Reject offline deal
if deal.IsOffline {
dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
if err != nil {
p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: getting deal thread"}, processedDeal.rsp, deal.DealUuid)
continue
}
dh.close()
p.delDealHandler(deal.DealUuid)
p.sendErrorResp(processedDeal.err, processedDeal.rsp, deal.DealUuid)
continue
}
// Reject online deal
p.sendErrorResp(processedDeal.err, processedDeal.rsp, deal.DealUuid)
continue
}

// Accept offline deal
if deal.IsOffline {
// The deal proposal was successful. Send an Accept response to the client.
processedDeal.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}
// Don't execute the deal now, wait for data import.
continue
}

p.setupHandlerAndStartDeal(deal, processedDeal.rsp)

// send an accept response
processedDeal.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}}

case <-p.ctx.Done():
return
}
Expand Down Expand Up @@ -576,3 +585,36 @@ func (p *Provider) failPausedDeal(dh *dealHandler, deal *smtypes.ProviderDealSta

return nil
}

func (p *Provider) sendErrorResp(aerr *acceptError, resp chan acceptDealResp, dealId uuid.UUID) {
// If the error is a severe error (eg can't connect to database)
if aerr.isSevereError {
// Send a rejection message to the client with a reason for rejection
rsp := acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}}
// Log an error with more details for the provider
p.dealLogger.LogError(dealId, "error while processing deal acceptance request", aerr)
resp <- rsp
return
}

// The error is not a severe error, so don't log an error, just
// send a message to the client with a rejection reason
p.dealLogger.Infow(dealId, "deal acceptance request rejected", "reason", aerr.reason, "error", aerr.error)
resp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}, err: nil}
}

func (p *Provider) setupHandlerAndStartDeal(deal *types.ProviderDealState, rsp chan acceptDealResp) {
// Handle online accepted deal
// set up deal handler so that clients can subscribe to deal update events
dh, err := p.mkAndInsertDealHandler(deal.DealUuid)
if err != nil {
p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: setting up deal handler"}, rsp, deal.DealUuid)
return
}

// start executing the deal
_, err = p.startDealThread(dh, deal)
if err != nil {
p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"}, rsp, deal.DealUuid)
}
}