Skip to content

Commit

Permalink
GODRIVER-2962 Remove setters from AbortTransaction and CommitTransact…
Browse files Browse the repository at this point in the history
…ion.
  • Loading branch information
matthewdale committed Aug 28, 2023
1 parent 67f257a commit 8305771
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 312 deletions.
35 changes: 26 additions & 9 deletions mongo/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,20 @@ func (s *sessionImpl) AbortTransaction(ctx context.Context) error {
selector := makePinnedSelector(s.clientSession, description.WriteSelector())

s.clientSession.Aborting = true
_ = operation.NewAbortTransaction().Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").
Deployment(s.deployment).WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector).
Retry(driver.RetryOncePerCommand).CommandMonitor(s.client.monitor).
RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)).ServerAPI(s.client.serverAPI).Execute(ctx)

retry := driver.RetryOncePerCommand
op := &operation.AbortTransaction{
Session: s.clientSession,
Clock: s.client.clock,
Deployment: s.deployment,
WriteConcern: s.clientSession.CurrentWc,
Selector: selector,
Retry: &retry,
Monitor: s.client.monitor,
RecoveryToken: bsoncore.Document(s.clientSession.RecoveryToken),
ServerAPI: s.client.serverAPI,
}
_ = op.Execute(ctx)

s.clientSession.Aborting = false
_ = s.clientSession.AbortTransaction()
Expand Down Expand Up @@ -324,11 +334,18 @@ func (s *sessionImpl) CommitTransaction(ctx context.Context) error {
selector := makePinnedSelector(s.clientSession, description.WriteSelector())

s.clientSession.Committing = true
op := operation.NewCommitTransaction().
Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").Deployment(s.deployment).
WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector).Retry(driver.RetryOncePerCommand).
CommandMonitor(s.client.monitor).RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)).
ServerAPI(s.client.serverAPI).MaxTime(s.clientSession.CurrentMct)

retry := driver.RetryOncePerCommand
op := &operation.CommitTransaction{
Session: s.clientSession,
Clock: s.client.clock,
Deployment: s.deployment,
WriteConcern: s.clientSession.CurrentWc,
Selector: selector,
Retry: &retry,
ServerAPI: s.client.serverAPI,
MaxTime: s.clientSession.CurrentMct,
}

err = op.Execute(ctx)
// Return error without updating transaction state if it is a timeout, as the transaction has not
Expand Down
195 changes: 44 additions & 151 deletions x/mongo/driver/operation/abort_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,38 @@ import (

// AbortTransaction performs an abortTransaction operation.
type AbortTransaction struct {
recoveryToken bsoncore.Document
session *session.Client
clock *session.ClusterClock
collection string
monitor *event.CommandMonitor
crypt driver.Crypt
database string
deployment driver.Deployment
selector description.ServerSelector
writeConcern *writeconcern.WriteConcern
retry *driver.RetryMode
serverAPI *driver.ServerAPIOptions
}
// RecoveryToken is the recovery token to use when committing or aborting a
// sharded transaction.
RecoveryToken bsoncore.Document

// Session is the session for this operation.
Session *session.Client

// Clock is the cluster clock for this operation.
Clock *session.ClusterClock

// Monitor is the monitor to use for APM events.
Monitor *event.CommandMonitor

// Crypt is the Crypt object to use for automatic encryption and decryption.
Crypt driver.Crypt

// Deployment is the deployment to use for this operation.
Deployment driver.Deployment

// Selector is the selector used to retrieve a server.
Selector description.ServerSelector

// WriteConcern is the write concern for this operation.
WriteConcern *writeconcern.WriteConcern

// NewAbortTransaction constructs and returns a new AbortTransaction.
func NewAbortTransaction() *AbortTransaction {
return &AbortTransaction{}
// Retry enables retryable mode for this operation. Retries are handled
// automatically in driver.Operation.Execute based on how the operation is
// set.
Retry *driver.RetryMode

// ServerAPI is the server API version for this operation.
ServerAPI *driver.ServerAPIOptions
}

func (at *AbortTransaction) processResponse(driver.ResponseInfo) error {
Expand All @@ -47,155 +62,33 @@ func (at *AbortTransaction) processResponse(driver.ResponseInfo) error {

// Execute runs this operations and returns an error if the operation did not execute successfully.
func (at *AbortTransaction) Execute(ctx context.Context) error {
if at.deployment == nil {
if at.Deployment == nil {
return errors.New("the AbortTransaction operation must have a Deployment set before Execute can be called")
}

return driver.Operation{
CommandFn: at.command,
ProcessResponseFn: at.processResponse,
RetryMode: at.retry,
RetryMode: at.Retry,
Type: driver.Write,
Client: at.session,
Clock: at.clock,
CommandMonitor: at.monitor,
Crypt: at.crypt,
Database: at.database,
Deployment: at.deployment,
Selector: at.selector,
WriteConcern: at.writeConcern,
ServerAPI: at.serverAPI,
Client: at.Session,
Clock: at.Clock,
CommandMonitor: at.Monitor,
Crypt: at.Crypt,
Database: "admin",
Deployment: at.Deployment,
Selector: at.Selector,
WriteConcern: at.WriteConcern,
ServerAPI: at.ServerAPI,
Name: driverutil.AbortTransactionOp,
}.Execute(ctx)

}

func (at *AbortTransaction) command(dst []byte, _ description.SelectedServer) ([]byte, error) {

dst = bsoncore.AppendInt32Element(dst, "abortTransaction", 1)
if at.recoveryToken != nil {
dst = bsoncore.AppendDocumentElement(dst, "recoveryToken", at.recoveryToken)
if at.RecoveryToken != nil {
dst = bsoncore.AppendDocumentElement(dst, "recoveryToken", at.RecoveryToken)
}
return dst, nil
}

// RecoveryToken sets the recovery token to use when committing or aborting a sharded transaction.
func (at *AbortTransaction) RecoveryToken(recoveryToken bsoncore.Document) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.recoveryToken = recoveryToken
return at
}

// Session sets the session for this operation.
func (at *AbortTransaction) Session(session *session.Client) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.session = session
return at
}

// ClusterClock sets the cluster clock for this operation.
func (at *AbortTransaction) ClusterClock(clock *session.ClusterClock) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.clock = clock
return at
}

// Collection sets the collection that this command will run against.
func (at *AbortTransaction) Collection(collection string) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.collection = collection
return at
}

// CommandMonitor sets the monitor to use for APM events.
func (at *AbortTransaction) CommandMonitor(monitor *event.CommandMonitor) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.monitor = monitor
return at
}

// Crypt sets the Crypt object to use for automatic encryption and decryption.
func (at *AbortTransaction) Crypt(crypt driver.Crypt) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.crypt = crypt
return at
}

// Database sets the database to run this operation against.
func (at *AbortTransaction) Database(database string) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.database = database
return at
}

// Deployment sets the deployment to use for this operation.
func (at *AbortTransaction) Deployment(deployment driver.Deployment) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.deployment = deployment
return at
}

// ServerSelector sets the selector used to retrieve a server.
func (at *AbortTransaction) ServerSelector(selector description.ServerSelector) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.selector = selector
return at
}

// WriteConcern sets the write concern for this operation.
func (at *AbortTransaction) WriteConcern(writeConcern *writeconcern.WriteConcern) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.writeConcern = writeConcern
return at
}

// Retry enables retryable mode for this operation. Retries are handled automatically in driver.Operation.Execute based
// on how the operation is set.
func (at *AbortTransaction) Retry(retry driver.RetryMode) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.retry = &retry
return at
}

// ServerAPI sets the server API version for this operation.
func (at *AbortTransaction) ServerAPI(serverAPI *driver.ServerAPIOptions) *AbortTransaction {
if at == nil {
at = new(AbortTransaction)
}

at.serverAPI = serverAPI
return at
}
Loading

0 comments on commit 8305771

Please sign in to comment.