Amazon S3 plugin for Fluentd
s3 output plugin buffers event logs in local file and upload it to S3 periodically.
This plugin splits files exactly by using the time of event logs (not the time when the logs are received). For example, a log '2011-01-02 message B' is reached, and then another log '2011-01-03 message B' is reached in this order, the former one is stored in "20110102.gz" file, and latter one in "20110103.gz" file.
s3 input plugin reads data from S3 periodically. This plugin uses SQS queue on the region same as S3 bucket. We must setup SQS queue and S3 event notification before use this plugin.
fluent-plugin-s3 | fluentd | ruby |
---|---|---|
>= 1.0.0 | >= v0.14.0 | >= 2.1 |
< 1.0.0 | >= v0.12.0 | >= 1.9 |
Simply use RubyGems:
# install latest version
$ gem install fluent-plugin-s3 --no-document # for fluentd v1.0 or later
# If you need to install specifiv version, use -v option
$ gem install fluent-plugin-s3 -v 1.3.0 --no-document
# For v0.12. This is for old v0.12 users. Don't use v0.12 for new deployment
$ gem install fluent-plugin-s3 -v "~> 0.8" --no-document # for fluentd v0.12
Both S3 input/output plugin provide several credential methods for authentication/authorization.
These parameters are required when your agent is not running on EC2 instance with an IAM Role. When using an IAM role, make sure to configure instance_profile_credentials
. Usage can be found below.
aws_key_id
AWS access key id.
aws_sec_key
AWS secret key.
Typically, you use AssumeRole for cross-account access or federation.
<match *>
@type s3
<assume_role_credentials>
role_arn ROLE_ARN
role_session_name ROLE_SESSION_NAME
</assume_role_credentials>
</match>
See also:
role_arn (required)
The Amazon Resource Name (ARN) of the role to assume.
role_session_name (required)
An identifier for the assumed role session.
policy
An IAM policy in JSON format.
duration_seconds
The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 3600 seconds (1 hour). By default, the value is set to 3600 seconds.
external_id
A unique identifier that is used by third parties when assuming roles in their customers' accounts.
Similar to the assume_role_credentials, but for usage in EKS.
<match *>
@type s3
<web_identity_credentials>
role_arn ROLE_ARN
role_session_name ROLE_SESSION_NAME
web_identity_token_file AWS_WEB_IDENTITY_TOKEN_FILE
</web_identity_credentials>
</match>
See also:
- Using IAM Roles - AWS Identity and Access Management
- IAM Roles For Service Accounts
- Aws::STS::Client
- Aws::AssumeRoleWebIdentityCredentials
role_arn (required)
The Amazon Resource Name (ARN) of the role to assume.
role_session_name (required)
An identifier for the assumed role session.
web_identity_token_file (required)
The absolute path to the file on disk containing the OIDC token
policy
An IAM policy in JSON format.
duration_seconds
The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 43200 seconds (12 hours). By default, the value is set to 3600 seconds.
Retrieve temporary security credentials via HTTP request. This is useful on EC2 instance.
<match *>
@type s3
<instance_profile_credentials>
ip_address IP_ADDRESS
port PORT
</instance_profile_credentials>
</match>
See also:
- Aws::InstanceProfileCredentials
- Temporary Security Credentials - AWS Identity and Access Management
- Instance Metadata and User Data - Amazon Elastic Compute Cloud
retries
Number of times to retry when retrieving credentials. Default is 5.
ip_address
Default is 169.254.169.254.
port
Default is 80.
http_open_timeout
Default is 5.
http_read_timeout
Default is 5.
This loads AWS access credentials from local ini file. This is useful for local developing.
<match *>
@type s3
<shared_credentials>
path PATH
profile_name PROFILE_NAME
</shared_credentials>
</match>
See also:
path
Path to the shared file. Defaults to "#{Dir.home}/.aws/credentials".
profile_name
Defaults to 'default' or [ENV]('AWS_PROFILE')
.
With fluentd v1 and fluent-plugin-s3 v1.0.0 or later, use new buffer configuration to dynamic parameters.
<match pattern>
@type s3
aws_key_id YOUR_AWS_KEY_ID
aws_sec_key YOUR_AWS_SECRET_KEY
s3_bucket YOUR_S3_BUCKET_NAME
s3_region ap-northeast-1
path logs/${tag}/%Y/%m/%d/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
# if you want to use ${tag} or %Y/%m/%d/ like syntax in path / s3_object_key_format,
# need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
<buffer tag,time>
@type file
path /var/log/fluent/s3
timekey 3600 # 1 hour partition
timekey_wait 10m
timekey_use_utc true # use utc
</buffer>
<format>
@type json
</format>
</match>
For <buffer>
, you can use any record field in path
/ s3_object_key_format
.
path logs/${tag}/${foo}
<buffer tag,foo>
# parameters...
</buffer>
See official article for more detail: Config: Buffer Section - Fluentd
Note that this configuration doesn't work with fluentd v0.12.
This configuration works with both fluentd v0.12 and v1.0.
<match pattern>
@type s3
aws_key_id YOUR_AWS_KEY_ID
aws_sec_key YOUR_AWS_SECRET_KEY
s3_bucket YOUR_S3_BUCKET_NAME
s3_region ap-northeast-1
path logs/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
buffer_path /var/log/fluent/s3
time_slice_format %Y%m%d-%H
time_slice_wait 10m
utc
format json
</match>
If you want to embed tag in path
/ s3_object_key_format
, you need to use fluent-plugin-forest
plugin.
aws_iam_retries
This parameter is deprecated. Use instance_profile_credentials
instead.
The number of attempts to make (with exponential backoff) when loading instance profile credentials from the EC2 metadata service using an IAM role. Defaults to 5 retries.
s3_bucket (required)
S3 bucket name.
s3_region
s3 region name. For example, US West (Oregon) Region is "us-west-2". The
full list of regions are available here. >
http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region. We
recommend using s3_region
instead of s3_endpoint
.
s3_endpoint
endpoint for S3 compatible services. For example, Riak CS based storage or
something. This option is deprecated for AWS S3, use s3_region
instead.
See also AWS article: Working with Regions.
enable_transfer_acceleration
Enable S3 Transfer Acceleration for uploads. IMPORTANT: For this to work, you must first enable this feature on your destination S3 bucket.
enable_dual_stack
Enable Amazon S3 Dual-Stack Endpoints for uploads. Will make it possible to use either IPv4 or IPv6 when connecting to S3.
use_bundled_cert
For cases where the default SSL certificate is unavailable (e.g. Windows), you can set this option to true in order to use the AWS SDK bundled certificate. Default is false.
This fixes the following error often seen in Windows:
SSL_connect returned=1 errno=0 state=SSLv3 read server certificate B: certificate verify failed (Seahorse::Client::NetworkingError)
ssl_verify_peer
Verify SSL certificate of the endpoint. Default is true. Set false when you want to ignore the endpoint SSL certificate.
s3_object_key_format
The format of S3 object keys. You can use several built-in variables:
- %{path}
- %{time_slice}
- %{index}
- %{file_extension}
- %{hex_random}
- %{uuid_flush}
- %{hostname}
to decide keys dynamically.
- %{path} is exactly the value of path configured in the configuration file. E.g., "logs/" in the example configuration above.
- %{time_slice} is the time-slice in text that are formatted with time_slice_format.
- %{index} is the sequential number starts from 0, increments when multiple files are uploaded to S3 in the same time slice.
- %{file_extension} depends on store_as parameter.
- %{uuid_flush} a uuid that is replaced everytime the buffer will be flushed. If you want to use this placeholder, install
uuidtools
gem first. - %{hostname} is replaced with
Socket.gethostname
result. - %{hex_random} a random hex string that is replaced for each buffer chunk, not
assured to be unique. This is used to follow a way of performance tuning,
Add a Hex Hash Prefix to Key Name
, written in Request Rate and Performance Considerations - Amazon Simple Storage Service. You can configure the length of string with ahex_random_length
parameter (Default: 4).
The default format is %{path}%{time_slice}_%{index}.%{file_extension}
.
In addition, you can use buffer placeholders in this parameter,
so you can embed tag, time and record value like below:
s3_object_key_format %{path}/events/%Y%m%d/${tag}_%{index}.%{file_extension}
<buffer tag,time>
# buffer parameters...
</buffer>
For instance, using the example configuration above, actual object keys on S3 will be something like:
"logs/20130111-22_0.gz"
"logs/20130111-23_0.gz"
"logs/20130111-23_1.gz"
"logs/20130112-00_0.gz"
With the configuration:
s3_object_key_format %{path}/events/ts=%{time_slice}/events_%{index}.%{file_extension}
path log
time_slice_format %Y%m%d-%H
You get:
"log/events/ts=20130111-22/events_0.gz"
"log/events/ts=20130111-23/events_0.gz"
"log/events/ts=20130111-23/events_1.gz"
"log/events/ts=20130112-00/events_0.gz"
NOTE: ${hostname} placeholder is deprecated since v0.8. You can get same result by using configuration's embedded ruby code feature.
s3_object_key_format %{path}%{time_slice}_%{hostname}%{index}.%{file_extension}
s3_object_key_format "%{path}%{time_slice}_#{Socket.gethostname}%{index}.%{file_extension}"
Above two configurations are same. The important point is wrapping ""
is needed for #{Socket.gethostname}
.
force_path_style
:force_path_style (Boolean) — default: false — When set to true, the bucket name is always left in the request URI and never moved to the host as a sub-domain. See Plugins::S3BucketDns for more details.
This parameter is deprecated. See AWS announcement: https://aws.amazon.com/blogs/aws/amazon-s3-path-deprecation-plan-the-rest-of-the-story/
store_as
archive format on S3. You can use several format:
- gzip (default)
- json
- text
- lzo (Need lzop command)
- lzma2 (Need xz command)
- gzip_command (Need gzip command)
- This compressor uses an external gzip command, hence would result in
utilizing CPU cores well compared with
gzip
- This compressor uses an external gzip command, hence would result in
utilizing CPU cores well compared with
See Use your compression algorithm
section for adding another format.
<format>
or format
Change one line format in the S3 object. Supported formats are "out_file", "json", "ltsv", "single_value" and other formatter plugins. See also official Formatter article.
-
out_file (default).
time\ttag\t{..json1..} time\ttag\t{..json2..} ...
-
json
{..json1..} {..json2..} ...
At this format, "time" and "tag" are omitted. But you can set these
information to the record by setting <inject>
option. If you set following configuration in
S3 output:
# v1
<format>
@type json
</format>
<inject>
time_key log_time
</inject>
# v0.12
@format json
include_time_key true
time_key log_time # default is time
then the record has log_time field.
{"log_time":"time string",...}
See also official Inject Section article.
-
ltsv
key1:value1\tkey2:value2 key1:value1\tkey2:value2 ...
-
single_value
Use specified value instead of entire recode. If you get '{"message":"my log"}', then contents are
my log1
my log2
...
You can change key name by "message_key" option.
auto_create_bucket
Create S3 bucket if it does not exists. Default is true.
check_bucket
Check mentioned bucket if it exists in AWS or not. Default is true.
When it is false, fluentd will not check aws s3 for the existence of the mentioned bucket. This is the case where bucket will be pre-created before running fluentd.
check_object
Check object before creation if it exists or not. Default is true.
When it is false, s3_object_key_format will be %{path}%{time_slice}_%{hms_slice}.%{file_extension} by default where, hms_slice will be time-slice in hhmmss format, so that each object will be unique. Example object name, assuming it is created on 2016/16/11 3:30:54 PM 20161611_153054.txt (extension can be anything as per user's choice)
check_apikey_on_start
Check AWS key on start. Default is true.
proxy_uri
uri of proxy environment.
path
path prefix of the files on S3. Default is "" (no prefix). buffer placeholder is supported, so you can embed tag, time and record value like below.
path logs/%Y%m%d/${tag}/
<buffer tag,time>
# buffer parameters...
</buffer>
buffer_path (for v0.12)
path prefix of the files to buffer logs.
This parameter is for v0.12. Use <buffer>
's path
in v1.
time_slice_format(for v0.12)
Format of the time used as the file name. Default is '%Y%m%d'. Use '%Y%m%d%H' to split files hourly.
This parameter is for v0.12. Use buffer placeholder for path
/ s3_object_key_format
in v1.
time_slice_wait(for v0.12)
The time to wait old logs. Default is 10 minutes. Specify larger value if old logs may reach.
This parameter is for v0.12. Use <buffer>
's timekey_wait
in v1.
utc
Use UTC instead of local time.
storage_class
Set storage class. Possible values are STANDARD
, REDUCED_REDUNDANCY
, STANDARD_IA
from Ruby SDK.
reduced_redundancy
Use S3 reduced redundancy storage for 33% cheaper pricing. Default is false.
This is deprecated. Use storage_class REDUCED_REDUNDANCY
instead.
acl
Permission for the object in S3. This is useful for cross-account access using IAM roles. Valid values are:
- private (default)
- public-read
- public-read-write (not recommended - see Canned ACL)
- authenticated-read
- bucket-owner-read
- bucket-owner-full-control
To use cross-account access, you will need to create a bucket policy granting the specific access required. Refer to the AWS documentation for examples.
grant_full_control
Allows grantee READ, READ_ACP, and WRITE_ACP permissions on the object. This is useful for cross-account access using IAM roles.
Valid values are id="Grantee-CanonicalUserID"
. Please specify the grantee's canonical user ID.
e.g. id="79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
Note that a canonical user ID is different from an AWS account ID. Please refer to AWS documentation for more details.
grant_read
Allows grantee to read the object data and its metadata.
Valid values are id="Grantee-CanonicalUserID"
. Please specify the grantee's canonical user ID.
e.g. id="79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
grant_read_acp
Allows grantee to read the object ACL.
Valid values are id="Grantee-CanonicalUserID"
. Please specify the grantee's canonical user ID.
e.g. id="79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
grant_write_acp
Allows grantee to write the ACL for the applicable object.
Valid values are id="Grantee-CanonicalUserID"
. Please specify the grantee's canonical user ID.
e.g. id="79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
hex_random_length
The length of %{hex_random}
placeholder. Default is 4 as written in
Request Rate and Performance Considerations - Amazon Simple Storage
Service.
The maximum length is 16.
index_format
%{index}
is formatted by sprintf using this format_string. Default is '%d'. Zero padding is supported e.g. %04d
to ensure minimum length four digits. %{index}
can be in lowercase or uppercase hex using '%x' or '%X'
overwrite
Overwrite already existing path. Default is false, which raises an error
if a s3 object of the same path already exists, or increment the
%{index}
placeholder until finding an absent path.
use_server_side_encryption
The Server-side encryption algorithm used when storing this object in S3 (e.g., AES256, aws:kms)
ssekms_key_id
Specifies the AWS KMS key ID to use for object encryption. You have to
set "aws:kms" to use_server_side_encryption
to use the KMS encryption.
sse_customer_algorithm
Specifies the algorithm to use to when encrypting the object (e.g., AES256).
sse_customer_key
Specifies the AWS KMS key ID to use for object encryption.
sse_customer_key_md5
Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321.
compute_checksums
AWS SDK uses MD5 for API request/response by default. On FIPS enabled environment,
OpenSSL returns an error because MD5 is disabled. If you want to use
this plugin on FIPS enabled environment, set compute_checksums false
.
signature_version
Signature version for API request. s3
means signature version 2 and
v4
means signature version 4. Default is nil
(Following SDK's default).
It would be useful when you use S3 compatible storage that accepts only signature version 2.
warn_for_delay
Given a threshold to treat events as delay, output warning logs if delayed events were put into s3.
bucket_lifecycle_rule
Specify one or more lifecycle rules for the bucket
<bucket_lifecycle_rule>
id UNIQUE_ID_FOR_THE_RULE
prefix OPTIONAL_PREFIX # Objects whose keys begin with this prefix will be affected by the rule. If not specified all objects of the bucket will be affected
expiration_days NUMBER_OF_DAYS # The number of days before the object will expire
</bucket_lifecycle_rule>
- Create new SQS queue (use same region as S3)
- Set proper permission to new queue
- Configure S3 event notification
- Write configuration file such as fluent.conf
- Run fluentd
<source>
@type s3
aws_key_id YOUR_AWS_KEY_ID
aws_sec_key YOUR_AWS_SECRET_KEY
s3_bucket YOUR_S3_BUCKET_NAME
s3_region ap-northeast-1
add_object_metadata true
<sqs>
queue_name YOUR_SQS_QUEUE_NAME
</sqs>
</source>
add_object_metadata
Whether or not object metadata should be added to the record. Defaults to false
. See below for details.
s3_bucket (required)
S3 bucket name.
s3_region
S3 region name. For example, US West (Oregon) Region is
"us-west-2". The full list of regions are available here. >
http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region. We
recommend using s3_region
instead of s3_endpoint
.
store_as
archive format on S3. You can use serveral format:
- gzip (default)
- json
- text
- lzo (Need lzop command)
- lzma2 (Need xz command)
- gzip_command (Need gzip command)
- This compressor uses an external gzip command, hence would result in utilizing CPU cores well compared with
gzip
- This compressor uses an external gzip command, hence would result in utilizing CPU cores well compared with
See 'Use your compression algorithm' section for adding another format.
format
Parse a line as this format in the S3 object. Supported formats are "apache_error", "apache2", "syslog", "json", "tsv", "ltsv", "csv", "nginx" and "none".
check_apikey_on_start
Check AWS key on start. Default is true.
proxy_uri
URI of proxy environment.
sqs/queue_name (required)
SQS queue name. Need to create SQS queue on the region same as S3 bucket.
sqs/skip_delete
When true, messages are not deleted after polling block. Default is false.
sqs/wait_time_seconds
The long polling interval. Default is 20.
sqs/retry_error_interval
Interval to retry polling SQS if polling unsuccessful, in seconds. Default is 300.
If the add_object_metadata
option is set to true, then the name of the bucket
and the key for a given object will be added to each log record as s3_bucket
and s3_key
, respectively. This metadata can be used by filter plugins or other
downstream processors to better identify the source of a given record.
The following is an example for a IAM policy needed to write to an s3 bucket (matches my-s3bucket/logs, my-s3bucket-test, etc.).
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": "arn:aws:s3:::my-s3bucket"
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::my-s3bucket/*"
}
]
}
Note that the bucket must already exist and auto_create_bucket has no effect in this case.
s3:GetObject
is needed for object check to avoid object overwritten.
If you set check_object false
, s3:GetObject
is not needed.
Refer to the AWS documentation for example policies.
Using IAM roles with a properly configured IAM policy are preferred over embedding access keys on EC2 instances.
When the mentioned configuration will be made, fluentd will work with the minimum IAM poilcy, like:
"Statement": [{
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": ["*"]
}]
s3 plugin has pluggable compression mechanizm like Fluentd's input / output
plugin. If you set 'store_as xxx', out_s3
plugin searches
fluent/plugin/s3_compressor_xxx.rb
and in_s3
plugin searches
fluent/plugin/s3_extractor_xxx.rb
. You can define your (de)compression with
'S3Output::Compressor'/S3Input::Extractor
classes. Compressor API is here:
module Fluent # Since fluent-plugin-s3 v1.0.0 or later, use Fluent::Plugin instead of Fluent
class S3Output
class XXXCompressor < Compressor
S3Output.register_compressor('xxx', self)
# Used to file extension
def ext
'xxx'
end
# Used to file content type
def content_type
'application/x-xxx'
end
# chunk is buffer chunk. tmp is destination file for upload
def compress(chunk, tmp)
# call command or something
end
end
end
end
Extractor
is similar to Compressor
See bundled Compressor
/Extractor
classes for more detail.
Web site | http://fluentd.org/ |
---|---|
Documents | http://docs.fluentd.org/ |
Source repository | http://github.com/fluent/fluent-plugin-s3 |
Discussion | http://groups.google.com/group/fluentd |
Author | Sadayuki Furuhashi |
Copyright | (c) 2011 FURUHASHI Sadayuki |
License | Apache License, Version 2.0 |