Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1 from PeakBI/aws-sdk-upgrade
Browse files Browse the repository at this point in the history
AWS SDK upgrade
  • Loading branch information
sukhbir-singh authored Dec 2, 2020
2 parents 4e3f43a + ff23906 commit c59e76e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 10 deletions.
89 changes: 85 additions & 4 deletions lib/logstash/inputs/s3.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/aws_config"
require "time"
require "date"
require "tmpdir"
require "stud/interval"
require "stud/temporary"
require "aws-sdk"
require "aws-sdk-s3"
require "logstash/inputs/s3/patch"

require 'java'
Expand All @@ -26,7 +25,14 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base
java_import java.util.zip.GZIPInputStream
java_import java.util.zip.ZipException

include LogStash::PluginMixins::AwsConfig::V2
CredentialConfig = Struct.new(
:access_key_id,
:secret_access_key,
:session_token,
:profile,
:instance_profile_credentials_retries,
:instance_profile_credentials_timeout,
:region)

config_name "s3"

Expand Down Expand Up @@ -86,10 +92,56 @@ class LogStash::Inputs::S3 < LogStash::Inputs::Base
# default to an expression that matches *.gz and *.gzip file extensions
config :gzip_pattern, :validate => :string, :default => "\.gz(ip)?$"

config :region, :validate => :string, :default => "us-east-1"

# This plugin uses the AWS SDK and supports several ways to get credentials, which will be tried in this order:
#
# 1. Static configuration, using `access_key_id` and `secret_access_key` params or `role_arn` in the logstash plugin config
# 2. External credentials file specified by `aws_credentials_file`
# 3. Environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
# 4. Environment variables `AMAZON_ACCESS_KEY_ID` and `AMAZON_SECRET_ACCESS_KEY`
# 5. IAM Instance Profile (available when running inside EC2)
config :access_key_id, :validate => :string

# The AWS Secret Access Key
config :secret_access_key, :validate => :string

# Profile
config :profile, :validate => :string, :default => "default"

# The AWS Session token for temporary credential
config :session_token, :validate => :password

# URI to proxy server if required
config :proxy_uri, :validate => :string

# Custom endpoint to connect to s3
config :endpoint, :validate => :string

# The AWS IAM Role to assume, if any.
# This is used to generate temporary credentials typically for cross-account access.
# See https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html for more information.
config :role_arn, :validate => :string

# Session name to use when assuming an IAM role
config :role_session_name, :validate => :string, :default => "logstash"

# Path to YAML file containing a hash of AWS credentials.
# This file will only be loaded if `access_key_id` and
# `secret_access_key` aren't set. The contents of the
# file should look like this:
#
# [source,ruby]
# ----------------------------------
# :access_key_id: "12345"
# :secret_access_key: "54321"
# ----------------------------------
#
config :aws_credentials_file, :validate => :string

def register
require "fileutils"
require "digest/md5"
require "aws-sdk-resources"

@logger.info("Registering", :bucket => @bucket, :region => @region)

Expand Down Expand Up @@ -412,6 +464,35 @@ def delete_file_from_bucket(object)
end
end

def aws_options_hash
opts = {}

if @access_key_id.is_a?(NilClass) ^ @secret_access_key.is_a?(NilClass)
@logger.warn("Likely config error: Only one of access_key_id or secret_access_key was provided but not both.")
end

credential_config = CredentialConfig.new(@access_key_id, @secret_access_key, @session_token, @profile, 0, 1, @region)
@credentials = Aws::CredentialProviderChain.new(credential_config).resolve

opts[:credentials] = @credentials

opts[:http_proxy] = @proxy_uri if @proxy_uri

if self.respond_to?(:aws_service_endpoint)
# used by CloudWatch to basically do the same as bellow (returns { region: region })
opts.merge!(self.aws_service_endpoint(@region))
else
# NOTE: setting :region works with the aws sdk (resolves correct endpoint)
opts[:region] = @region
end

if !@endpoint.is_a?(NilClass)
opts[:endpoint] = @endpoint
end

return opts
end

def get_s3object
options = symbolized_settings.merge(aws_options_hash || {})
s3 = Aws::S3::Resource.new(options)
Expand Down
8 changes: 5 additions & 3 deletions logstash-input-s3.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-s3'
s.version = '3.5.2'
s.version = '4.0.0'
s.licenses = ['Apache-2.0']
s.summary = "Streams events from files in a S3 bucket"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,10 +21,12 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 2.1.12", "<= 2.99"
s.add_runtime_dependency 'logstash-mixin-aws', '>= 4.3.0'
s.add_runtime_dependency 'stud', '~> 0.0.18'
# s.add_runtime_dependency 'aws-sdk-resources', '>= 2.0.33'
s.add_runtime_dependency 'aws-sdk-s3', '~> 1'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency "logstash-codec-json"
s.add_development_dependency "logstash-codec-plain"
s.add_development_dependency "logstash-codec-multiline"
end

6 changes: 4 additions & 2 deletions spec/inputs/s3_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
require "logstash/inputs/s3"
require "logstash/codecs/multiline"
require "logstash/errors"
require "aws-sdk-resources"
require_relative "../support/helpers"
require "stud/temporary"
require "aws-sdk"
require "aws-sdk-s3"
require "fileutils"

describe LogStash::Inputs::S3 do
Expand Down Expand Up @@ -82,13 +81,16 @@
context 'when force_path_style is set' do
let(:settings) {
{
"access_key_id" => "1234",
"secret_access_key" => "secret",
"additional_settings" => { "force_path_style" => true },
"bucket" => "logstash-test",
}
}

it 'should instantiate AWS::S3 clients with force_path_style set' do
expect(Aws::S3::Resource).to receive(:new).with({
:credentials => kind_of(Aws::Credentials),
:region => subject.region,
:force_path_style => true
}).and_call_original
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/s3_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/s3"
require "aws-sdk"
require "aws-sdk-s3"
require "fileutils"
require_relative "../support/helpers"

Expand Down

0 comments on commit c59e76e

Please sign in to comment.