Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
qingyang-hu committed Oct 18, 2024
1 parent 00cc83a commit cb5e8aa
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 62 deletions.
68 changes: 50 additions & 18 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,31 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
Authenticator: bw.client.authenticator,
Name: "bulkWrite",
}.Execute(ctx)
if err != nil && errors.Is(err, driver.ErrUnacknowledgedWrite) {
return nil
var exception *ClientBulkWriteException
switch tt := err.(type) {
case CommandError:
exception = &ClientBulkWriteException{
TopLevelError: &WriteError{
Code: int(tt.Code),
Message: tt.Message,
Raw: tt.Raw,
},
}
default:
if errors.Is(err, driver.ErrUnacknowledgedWrite) {
err = nil
}
}
if len(batches.writeConcernErrors) > 0 || len(batches.writeErrors) > 0 {
if exception == nil {
exception = new(ClientBulkWriteException)
}
exception.WriteConcernErrors = batches.writeConcernErrors
exception.WriteErrors = batches.writeErrors
}
if exception != nil {
exception.PartialResult = batches.result
return *exception
}
return err
}
Expand Down Expand Up @@ -219,7 +242,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
return 0, dst, io.EOF
}

mb.cursorHandlers = make([]func(*cursorInfo, bson.Raw) bool, len(mb.models))
mb.cursorHandlers = mb.cursorHandlers[:0]
mb.newIDMap = make(map[int]interface{})

nsMap := make(map[string]int)
Expand All @@ -240,6 +263,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
opsIdx, dst := fn.appendStart(dst, "ops")
nsIdx, nsDst := fn.appendStart(nil, "nsInfo")

totalSize -= 1000
size := (len(dst) - l) * 2
var n int
for i := mb.offset; i < len(mb.models); i++ {
Expand All @@ -254,7 +278,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
var err error
switch model := mb.models[i].model.(type) {
case *ClientInsertOneModel:
mb.cursorHandlers[i] = mb.appendInsertResult
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendInsertResult)
var id interface{}
id, doc, err = (&clientInsertDoc{
namespace: nsIdx,
Expand All @@ -265,7 +289,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
}
mb.newIDMap[i] = id
case *ClientUpdateOneModel:
mb.cursorHandlers[i] = mb.appendUpdateResult
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
doc, err = (&clientUpdateDoc{
namespace: nsIdx,
filter: model.Filter,
Expand All @@ -279,7 +303,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
}).marshal(mb.client.bsonOpts, mb.client.registry)
case *ClientUpdateManyModel:
canRetry = false
mb.cursorHandlers[i] = mb.appendUpdateResult
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
doc, err = (&clientUpdateDoc{
namespace: nsIdx,
filter: model.Filter,
Expand All @@ -292,7 +316,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
checkDollarKey: true,
}).marshal(mb.client.bsonOpts, mb.client.registry)
case *ClientReplaceOneModel:
mb.cursorHandlers[i] = mb.appendUpdateResult
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendUpdateResult)
doc, err = (&clientUpdateDoc{
namespace: nsIdx,
filter: model.Filter,
Expand All @@ -305,7 +329,7 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
checkDollarKey: false,
}).marshal(mb.client.bsonOpts, mb.client.registry)
case *ClientDeleteOneModel:
mb.cursorHandlers[i] = mb.appendDeleteResult
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendDeleteResult)
doc, err = (&clientDeleteDoc{
namespace: nsIdx,
filter: model.Filter,
Expand All @@ -315,22 +339,27 @@ func (mb *modelBatches) appendBatches(fn functionSet, dst []byte, maxCount, maxD
}).marshal(mb.client.bsonOpts, mb.client.registry)
case *ClientDeleteManyModel:
canRetry = false
mb.cursorHandlers[i] = mb.appendDeleteResult
mb.cursorHandlers = append(mb.cursorHandlers, mb.appendDeleteResult)
doc, err = (&clientDeleteDoc{
namespace: nsIdx,
filter: model.Filter,
collation: model.Collation,
hint: model.Hint,
multi: true,
}).marshal(mb.client.bsonOpts, mb.client.registry)
default:
mb.cursorHandlers = append(mb.cursorHandlers, nil)
}
if err != nil {
return 0, nil, err
}
length := len(doc) + len(ns)
length := len(doc)
if length > maxDocSize {
break
}
if !exists {
length += len(ns)
}
size += length
if size >= totalSize {
break
Expand Down Expand Up @@ -369,7 +398,6 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
mb.writeConcernErrors = append(mb.writeConcernErrors, *wce)
}
}
// closeImplicitSession(sess)
if len(resp) == 0 {
return nil
}
Expand Down Expand Up @@ -435,8 +463,9 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
if err != nil {
return err
}
isOrdered := mb.ordered == nil || *mb.ordered
fmt.Println("ProcessResponse toplevelerror", res.Ok, res.NErrors, res.Code, res.Errmsg)
if writeCmdErr.WriteConcernError != nil || !ok || !res.Ok || res.NErrors > 0 {
if isOrdered && (writeCmdErr.WriteConcernError != nil || !ok || !res.Ok || res.NErrors > 0) {
exception := ClientBulkWriteException{
WriteConcernErrors: mb.writeConcernErrors,
WriteErrors: mb.writeErrors,
Expand All @@ -455,48 +484,51 @@ func (mb *modelBatches) processResponse(ctx context.Context, resp bsoncore.Docum
}

func (mb *modelBatches) appendDeleteResult(cur *cursorInfo, raw bson.Raw) bool {
idx := int(cur.Idx) + mb.offset
if err := cur.extractError(); err != nil {
err.Raw = raw
if mb.writeErrors == nil {
mb.writeErrors = make(map[int]WriteError)
}
mb.writeErrors[int(cur.Idx)] = *err
mb.writeErrors[idx] = *err
return false
}

if mb.result.DeleteResults == nil {
mb.result.DeleteResults = make(map[int]ClientDeleteResult)
}
mb.result.DeleteResults[int(cur.Idx)] = ClientDeleteResult{int64(cur.N)}
mb.result.DeleteResults[idx] = ClientDeleteResult{int64(cur.N)}

return true
}

func (mb *modelBatches) appendInsertResult(cur *cursorInfo, raw bson.Raw) bool {
idx := int(cur.Idx) + mb.offset
if err := cur.extractError(); err != nil {
err.Raw = raw
if mb.writeErrors == nil {
mb.writeErrors = make(map[int]WriteError)
}
mb.writeErrors[int(cur.Idx)] = *err
mb.writeErrors[idx] = *err
return false
}

if mb.result.InsertResults == nil {
mb.result.InsertResults = make(map[int]ClientInsertResult)
}
mb.result.InsertResults[int(cur.Idx)] = ClientInsertResult{mb.newIDMap[int(cur.Idx)]}
mb.result.InsertResults[idx] = ClientInsertResult{mb.newIDMap[idx]}

return true
}

func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
idx := int(cur.Idx) + mb.offset
if err := cur.extractError(); err != nil {
err.Raw = raw
if mb.writeErrors == nil {
mb.writeErrors = make(map[int]WriteError)
}
mb.writeErrors[int(cur.Idx)] = *err
mb.writeErrors[idx] = *err
return false
}

Expand All @@ -512,7 +544,7 @@ func (mb *modelBatches) appendUpdateResult(cur *cursorInfo, raw bson.Raw) bool {
if cur.Upserted != nil {
result.UpsertedID = cur.Upserted.ID
}
mb.result.UpdateResults[int(cur.Idx)] = result
mb.result.UpdateResults[idx] = result

return true
}
Expand Down
Loading

0 comments on commit cb5e8aa

Please sign in to comment.