diff --git a/elasticsearch/lib/elasticsearch/helpers/bulk_helper.rb b/elasticsearch/lib/elasticsearch/helpers/bulk_helper.rb index 80c906c38f..e5982eccb7 100644 --- a/elasticsearch/lib/elasticsearch/helpers/bulk_helper.rb +++ b/elasticsearch/lib/elasticsearch/helpers/bulk_helper.rb @@ -48,7 +48,9 @@ def initialize(client, index, params = {}) def ingest(docs, params = {}, body = {}, &block) ingest_docs = docs.map { |doc| { index: { _index: @index, data: doc} } } if (slice = params.delete(:slice)) - ingest_docs.each_slice(slice) { |items| ingest(items, params, &block) } + ingest_docs.each_slice(slice) do |items| + ingest(items.map { |item| item[:index][:data] }, params, &block) + end else bulk_request(ingest_docs, params, &block) end diff --git a/elasticsearch/spec/integration/helpers/bulk_helper_spec.rb b/elasticsearch/spec/integration/helpers/bulk_helper_spec.rb index 8fb7cb5485..31da9a0b84 100644 --- a/elasticsearch/spec/integration/helpers/bulk_helper_spec.rb +++ b/elasticsearch/spec/integration/helpers/bulk_helper_spec.rb @@ -21,6 +21,7 @@ context 'Elasticsearch client helpers' do context 'Bulk helper' do let(:index) { 'bulk_animals' } + let(:index_slice) { 'bulk_animals_slice' } let(:params) { { refresh: 'wait_for' } } let(:bulk_helper) { Elasticsearch::Helpers::BulkHelper.new(client, index, params) } let(:docs) do @@ -40,6 +41,7 @@ after do client.indices.delete(index: index, ignore: 404) + client.indices.delete(index: index_slice, ignore: 404) end it 'Ingests documents' do @@ -76,10 +78,13 @@ it 'Ingests documents and yields response and docs' do slice = 2 + bulk_helper = Elasticsearch::Helpers::BulkHelper.new(client, index_slice, params) response = bulk_helper.ingest(docs, {slice: slice}) do |response, docs| expect(response).to be_an_instance_of Elasticsearch::API::Response expect(docs.count).to eq slice end + response = client.search(index: index_slice, size: 200) + expect(response['hits']['hits'].map { |a| a['_source'].transform_keys(&:to_sym) }).to eq docs end context 'JSON File helper' do