Skip to content

Commit

Permalink
Fix execute item failure callback on bulk error (#626)
Browse files Browse the repository at this point in the history
Signed-off-by: Kellen Miller <kellen.miller@finxact.com>
  • Loading branch information
Kellen Miller committed Oct 3, 2024
1 parent 3f1ca5e commit 915f706
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 113 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix ISM Allocation field types ([#609](https://github.com/opensearch-project/opensearch-go/pull/609))
- Fix ISM Error Notification types ([#612](https://github.com/opensearch-project/opensearch-go/pull/612))
- Fix signer receiving drained body on retries ([#620](https://github.com/opensearch-project/opensearch-go/pull/620))
- Fix Bulk Index Items not executing failure callbacks on bulk request failure ([#626](https://github.com/opensearch-project/opensearch-go/issues/626))

### Security

Expand Down
25 changes: 18 additions & 7 deletions opensearchutil/bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ func (w *worker) run() {
w.mu.Lock()

if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID)
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action,
item.DocumentID)
}

if err := w.writeMeta(item); err != nil {
Expand Down Expand Up @@ -503,11 +504,7 @@ func (w *worker) flush(ctx context.Context) error {

blk, err = w.bi.config.Client.Bulk(ctx, req)
if err != nil {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %w", err))
}
return fmt.Errorf("flush: %w", err)
return w.handleBulkError(ctx, fmt.Errorf("flush: %w", err))
}

for i, blkItem := range blk.Items {
Expand All @@ -520,7 +517,7 @@ func (w *worker) flush(ctx context.Context) error {
item = w.items[i]
// The OpenSearch bulk response contains an array of maps like this:
// [ { "index": { ... } }, { "create": { ... } }, ... ]
// We range over the map, to set the first key and value as "op" and "info".
// We range over the map, to set the last key and value as "op" and "info".
for k, v := range blkItem {
op = k
info = v
Expand Down Expand Up @@ -552,3 +549,17 @@ func (w *worker) flush(ctx context.Context) error {

return err
}

func (w *worker) handleBulkError(ctx context.Context, err error) error {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))

// info (the response item) will be empty since the bulk request failed
var info opensearchapi.BulkRespItem
for i := range w.items {
if item := w.items[i]; item.OnFailure != nil {
item.OnFailure(ctx, item, info, err)
}
}

return err
}
Loading

0 comments on commit 915f706

Please sign in to comment.