diff --git a/docker/devnet/boost/entrypoint.sh b/docker/devnet/boost/entrypoint.sh index beb8c02f9..7f0931339 100755 --- a/docker/devnet/boost/entrypoint.sh +++ b/docker/devnet/boost/entrypoint.sh @@ -111,4 +111,4 @@ fi echo Starting LID service and boost in dev mode... trap 'kill %1' SIGINT -exec boostd -vv run --nosync=true +exec boostd -vv run --nosync=true &>> $BOOST_PATH/boostd.log diff --git a/storagemarket/provider.go b/storagemarket/provider.go index 70c5625f1..301b4616f 100644 --- a/storagemarket/provider.go +++ b/storagemarket/provider.go @@ -96,6 +96,7 @@ type Provider struct { publishedDealChan chan publishDealReq updateRetryStateChan chan updateRetryStateReq storageSpaceChan chan storageSpaceDealReq + processedDealChan chan processedDealReq // Sealing Pipeline API sps sealingpipeline.API @@ -175,6 +176,7 @@ func NewProvider(cfg Config, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundma publishedDealChan: make(chan publishDealReq), updateRetryStateChan: make(chan updateRetryStateReq), storageSpaceChan: make(chan storageSpaceDealReq), + processedDealChan: make(chan processedDealReq), Transport: tspt, xferLimiter: xferLimiter, diff --git a/storagemarket/provider_loop.go b/storagemarket/provider_loop.go index 58bba9017..eb0432b45 100644 --- a/storagemarket/provider_loop.go +++ b/storagemarket/provider_loop.go @@ -49,6 +49,12 @@ type updateRetryStateReq struct { done chan error } +type processedDealReq struct { + err *acceptError + rsp chan acceptDealResp + deal *types.ProviderDealState +} + func (p *Provider) logFunds(id uuid.UUID, trsp *fundmanager.TagFundsResp) { p.dealLogger.Infow(id, "tagged funds for deal", "tagged for deal publish", trsp.PublishMessage, @@ -98,7 +104,7 @@ func (p *Provider) runDealFilters(deal *types.ProviderDealState) *acceptError { return nil } -func (p *Provider) processDealProposal(deal *types.ProviderDealState) *acceptError { +func (p *Provider) processOnlineDealProposal(deal *types.ProviderDealState) *acceptError { host, err := deal.Transfer.Host() if err != nil { return &acceptError{ @@ -254,6 +260,26 @@ func (p *Provider) processOfflineDealProposal(ds *smtypes.ProviderDealState, dh return nil } +func (p *Provider) processDeal(deal *types.ProviderDealState, rsp chan acceptDealResp) { + var err *acceptError + if !deal.IsOffline { + err = p.processOnlineDealProposal(deal) + } else { + dh, herr := p.mkAndInsertDealHandler(deal.DealUuid) + if herr == nil { + err = p.processOfflineDealProposal(deal, dh) + } else { + err.error = herr + err.isSevereError = true + err.reason = "server error: creating deal thread" + } + } + select { + case p.processedDealChan <- processedDealReq{deal: deal, err: err, rsp: rsp}: + case <-p.ctx.Done(): + } +} + func (p *Provider) processImportOfflineDealData(deal *types.ProviderDealState) *acceptError { cleanup := func() { collat, pub, errf := p.fundManager.UntagFunds(p.ctx, deal.DealUuid) @@ -367,78 +393,28 @@ func (p *Provider) run() { deal := dealReq.deal p.dealLogger.Infow(deal.DealUuid, "processing deal acceptance request") - sendErrorResp := func(aerr *acceptError) { - // If the error is a severe error (eg can't connect to database) - if aerr.isSevereError { - // Send a rejection message to the client with a reason for rejection - resp := acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}} - // Log an error with more details for the provider - p.dealLogger.LogError(deal.DealUuid, "error while processing deal acceptance request", aerr) - dealReq.rsp <- resp - return - } - - // The error is not a severe error, so don't log an error, just - // send a message to the client with a rejection reason - p.dealLogger.Infow(deal.DealUuid, "deal acceptance request rejected", "reason", aerr.reason, "error", aerr.error) - dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}, err: nil} - } - - if deal.IsOffline && !dealReq.isImport { - // When the client proposes an offline deal, save the deal - // to the database but don't execute the deal. The deal - // will be executed when the Storage Provider imports the - // deal data. - dh, err := p.mkAndInsertDealHandler(deal.DealUuid) - if err != nil { - sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: creating deal handler"}) - continue - } - - aerr := p.processOfflineDealProposal(dealReq.deal, dh) + if deal.IsOffline && dealReq.isImport { + // The Storage Provider is importing offline deal data, so tag + // funds for the deal and execute it + aerr := p.processImportOfflineDealData(dealReq.deal) if aerr != nil { - dh.close() - p.delDealHandler(deal.DealUuid) - sendErrorResp(aerr) + p.sendErrorResp(aerr, dealReq.rsp, deal.DealUuid) continue } - // The deal proposal was successful. Send an Accept response to the client. - dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}} - // Don't execute the deal now, wait for data import. - continue - } - - var aerr *acceptError - if deal.IsOffline { - // The Storage Provider is importing offline deal data, so tag - // funds for the deal and execute it - aerr = p.processImportOfflineDealData(dealReq.deal) - } else { - // Process a regular deal proposal - aerr = p.processDealProposal(dealReq.deal) - } - if aerr != nil { - sendErrorResp(aerr) - continue - } + p.setupHandlerAndStartDeal(deal, dealReq.rsp) - // set up deal handler so that clients can subscribe to deal update events - dh, err := p.mkAndInsertDealHandler(deal.DealUuid) - if err != nil { - sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"}) - continue - } - - // start executing the deal - _, err = p.startDealThread(dh, deal) - if err != nil { - sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"}) + // send an accept response + dealReq.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}} continue } - // send an accept response - dealReq.rsp <- acceptDealResp{&api.ProviderDealRejectionInfo{Accepted: true}, nil} + // Send new online and offline deals for processing. + // When the client proposes an offline deal, save the deal + // to the database but don't execute the deal. The deal + // will be executed when the Storage Provider imports the + // deal data. + go p.processDeal(deal, dealReq.rsp) case storageSpaceDealReq := <-p.storageSpaceChan: deal := storageSpaceDealReq.deal @@ -519,6 +495,39 @@ func (p *Provider) run() { } finishedDeal.done <- struct{}{} + case processedDeal := <-p.processedDealChan: + deal := processedDeal.deal + if processedDeal.err != nil { + // Reject offline deal + if deal.IsOffline { + dh, err := p.mkAndInsertDealHandler(deal.DealUuid) + if err != nil { + p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: getting deal thread"}, processedDeal.rsp, deal.DealUuid) + continue + } + dh.close() + p.delDealHandler(deal.DealUuid) + p.sendErrorResp(processedDeal.err, processedDeal.rsp, deal.DealUuid) + continue + } + // Reject online deal + p.sendErrorResp(processedDeal.err, processedDeal.rsp, deal.DealUuid) + continue + } + + // Accept offline deal + if deal.IsOffline { + // The deal proposal was successful. Send an Accept response to the client. + processedDeal.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}} + // Don't execute the deal now, wait for data import. + continue + } + + p.setupHandlerAndStartDeal(deal, processedDeal.rsp) + + // send an accept response + processedDeal.rsp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: true}} + case <-p.ctx.Done(): return } @@ -576,3 +585,36 @@ func (p *Provider) failPausedDeal(dh *dealHandler, deal *smtypes.ProviderDealSta return nil } + +func (p *Provider) sendErrorResp(aerr *acceptError, resp chan acceptDealResp, dealId uuid.UUID) { + // If the error is a severe error (eg can't connect to database) + if aerr.isSevereError { + // Send a rejection message to the client with a reason for rejection + rsp := acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}} + // Log an error with more details for the provider + p.dealLogger.LogError(dealId, "error while processing deal acceptance request", aerr) + resp <- rsp + return + } + + // The error is not a severe error, so don't log an error, just + // send a message to the client with a rejection reason + p.dealLogger.Infow(dealId, "deal acceptance request rejected", "reason", aerr.reason, "error", aerr.error) + resp <- acceptDealResp{ri: &api.ProviderDealRejectionInfo{Accepted: false, Reason: aerr.reason}, err: nil} +} + +func (p *Provider) setupHandlerAndStartDeal(deal *types.ProviderDealState, rsp chan acceptDealResp) { + // Handle online accepted deal + // set up deal handler so that clients can subscribe to deal update events + dh, err := p.mkAndInsertDealHandler(deal.DealUuid) + if err != nil { + p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: setting up deal handler"}, rsp, deal.DealUuid) + return + } + + // start executing the deal + _, err = p.startDealThread(dh, deal) + if err != nil { + p.sendErrorResp(&acceptError{error: err, isSevereError: true, reason: "server error: starting deal thread"}, rsp, deal.DealUuid) + } +}