Skip to content

Commit

Permalink
fix: accounting module to reduce lock. (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
LewisAufs authored Mar 26, 2022
1 parent dbe8d4d commit 6cf2f14
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 25 deletions.
43 changes: 19 additions & 24 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,15 @@ func (a *Accounting) Reserve(peer boson.Address, traffic uint64) (err error) {
if err != nil {
return err
}

accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
retrieve := accountingPeer.unPaidTraffic

if retrieve.Cmp(big.NewInt(0)) == 0 {
retrieve, err = a.settlement.RetrieveTraffic(peer)
if err != nil {
a.logger.Errorf("retrieve %s", retrieve)
return err
}
}
ret := big.NewInt(0).Add(retrieve, new(big.Int).SetUint64(traffic))
available, err := a.settlement.AvailableBalance()
if err != nil {
return err
}

if available.Cmp(ret) < 0 {
return ErrLowAvailableExceeded
}
a.accountingPeersMu.Lock()
accountingPeer.unPaidTraffic = big.NewInt(0).Add(retrieve, new(big.Int).SetUint64(traffic))
a.accountingPeersMu.Unlock()

return nil
}
Expand All @@ -123,6 +108,7 @@ func (a *Accounting) Credit(ctx context.Context, peer boson.Address, traffic uin
}
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
accountingPeer.unPaidTraffic = big.NewInt(0).Add(accountingPeer.unPaidTraffic, new(big.Int).SetUint64(traffic))
if err := a.settlement.PutRetrieveTraffic(peer, new(big.Int).SetUint64(traffic)); err != nil {
a.logger.Errorf("failed to modify retrieve traffic")
return err
Expand Down Expand Up @@ -152,13 +138,7 @@ func (a *Accounting) settle() {
// Debit increases the amount of debt we have with the given peer (and decreases
// existing credit).
func (a *Accounting) Debit(peer boson.Address, traffic uint64) error {
accountingPeer, err := a.getAccountingPeer(peer)
if err != nil {
return err
}

accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
tolerance := a.paymentTolerance
traff, err := a.settlement.TransferTraffic(peer)
if err != nil {
Expand Down Expand Up @@ -195,9 +175,13 @@ func (a *Accounting) getAccountingPeer(peer boson.Address) (*accountingPeer, err
defer a.accountingPeersMu.Unlock()
peerData, ok := a.accountingPeers[peer.String()]
if !ok {
retrieve, err := a.settlement.RetrieveTraffic(peer)
if err != nil {
return nil, err
}
peerData = &accountingPeer{
paymentThreshold: a.paymentThreshold,
unPaidTraffic: big.NewInt(0),
unPaidTraffic: retrieve,
}
a.accountingPeers[peer.String()] = peerData
}
Expand All @@ -210,8 +194,8 @@ func (a *Accounting) NotifyPayment(peer boson.Address, traffic *big.Int) error {
return err
}

a.accountingPeersMu.Lock()
defer a.accountingPeersMu.Unlock()
accountingPeer.lock.Lock()
defer accountingPeer.lock.Unlock()
unPay := accountingPeer.unPaidTraffic
if unPay.Cmp(big.NewInt(0)) <= 0 {
return nil
Expand All @@ -222,4 +206,15 @@ func (a *Accounting) NotifyPayment(peer boson.Address, traffic *big.Int) error {
accountingPeer.unPaidTraffic = new(big.Int).Sub(unPay, traffic)
}
return nil

}
func (a *Accounting) AsyncNotifyPayment(peer boson.Address, traffic *big.Int) error {

go func() {
err := a.NotifyPayment(peer, traffic)
if err != nil {
a.logger.Errorf("failed to notify accounting of payment: %v", err)
}
}()
return nil
}
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func NewAurora(nodeMode aurora.Model, addr string, bosonAddress boson.Address, p
stateStore,
settlement,
)
settlement.SetNotifyPaymentFunc(acc.NotifyPayment)
settlement.SetNotifyPaymentFunc(acc.AsyncNotifyPayment)

metricsDB, err := shed.NewDBWrap(stateStore.DB())
if err != nil {
Expand Down

0 comments on commit 6cf2f14

Please sign in to comment.