Amazon S3 plugin for Fluentd
s3 output plugin buffers event logs in local file and upload it to S3 periodically.
This plugin splits files exactly by using the time of event logs (not the time when the logs are received). For example, a log '2011-01-02 message B' is reached, and then another log '2011-01-03 message B' is reached in this order, the former one is stored in "20110102.gz" file, and latter one in "20110103.gz" file.
s3 input plugin reads data from S3 periodically. This plugin uses SQS queue on the region same as S3 bucket. We must setup SQS queue and S3 event notification before use this plugin.
fluent-plugin-s3 | fluentd | ruby |
---|---|---|
>= 1.0.0 | >= v0.14.0 | >= 2.1 |
< 1.0.0 | >= v0.12.0 | >= 1.9 |
NOTE: fluent-plugin-s3 v1.0.0 is now RC. We will release stable v1.0.0 soon.
Simply use RubyGems:
$ gem install fluent-plugin-s3 -v "~> 0.8" --no-document # for fluentd v0.12 or later
$ gem install fluent-plugin-s3 -v 1.0.0.rc2 --no-document # for fluentd v0.14 or later
With fluentd v0.14 and fluent-plugin-s3 v1.0.0, use new buffer configuration to dynamic parameters.
<match pattern>
@type s3
aws_key_id YOUR_AWS_KEY_ID
aws_sec_key YOUR_AWS_SECRET_KEY
s3_bucket YOUR_S3_BUCKET_NAME
s3_region ap-northeast-1
path logs/${tag}/%Y/%m/%d/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
# if you want to use ${tag} or %Y/%m/%d/ like syntax in path / s3_object_key_format,
# need to specify tag for ${tag} and time for %Y/%m/%d in <buffer> argument.
<buffer tag,time>
@type file
path /var/log/fluent/s3
timekey 3600 # 1 hour partition
timekey_wait 10m
timekey_use_utc true # use utc
</buffer>
<format>
@type json
</format>
</match>
For <buffer>
, you can use any record field in path
/ s3_object_key_format
.
path logs/${tag}/${foo}
<buffer tag,foo>
# parameters...
</buffer>
See official article for more detail: Buffer section configurations
Note that this configuration doesn't work with fluentd v0.12.
This configuration works with both fluentd v0.12 and v0.14.
<match pattern>
@type s3
aws_key_id YOUR_AWS_KEY_ID
aws_sec_key YOUR_AWS_SECRET_KEY
s3_bucket YOUR_S3_BUCKET_NAME
s3_region ap-northeast-1
path logs/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
buffer_path /var/log/fluent/s3
time_slice_format %Y%m%d-%H
time_slice_wait 10m
utc
format json
</match>
If you want to embed tag in path
/ s3_object_key_format
, you need to use fluent-plugin-forest
plugin.
aws_key_id
AWS access key id. This parameter is required when your agent is not
running on EC2 instance with an IAM Role. When using an IAM role, make
sure to configure instance_profile_credentials
. Usage can be found below.
aws_sec_key
AWS secret key. This parameter is required when your agent is not running on EC2 instance with an IAM Role.
aws_iam_retries
The number of attempts to make (with exponential backoff) when loading instance profile credentials from the EC2 metadata service using an IAM role. Defaults to 5 retries.
s3_bucket (required)
S3 bucket name.
s3_region
s3 region name. For example, US West (Oregon) Region is "us-west-2". The
full list of regions are available here. >
http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region. We
recommend using s3_region
instead of s3_endpoint
.
s3_endpoint
endpoint for S3 compatible services. For example, Riak CS based storage or
something. This option doesn't work on S3, use s3_region
instead.
ssl_verify_peer
Verify SSL certificate of the endpoint. Default is true. Set false when you want to ignore the endpoint SSL certificate.
s3_object_key_format
The format of S3 object keys. You can use several built-in variables:
- %{path}
- %{time_slice}
- %{index}
- %{file_extension}
- %{hex_random}
- %{uuid_flush}
- %{hostname}
to decide keys dynamically.
- %{path} is exactly the value of path configured in the configuration file. E.g., "logs/" in the example configuration above.
- %{time_slice} is the time-slice in text that are formatted with time_slice_format. %{index} is the sequential number starts from 0, increments when multiple files are uploaded to S3 in the same time slice.
- %{file_extention} is always "gz" for now.
- %{uuid_flush} a uuid that is replaced everytime the buffer will be flushed. If you want to use this placeholder, install
uuidtools
gem first. - %{hostname} is replaced with
Socket.gethostname
result. - %{hex_random} a random hex string that is replaced for each buffer chunk, not
assured to be unique. This is used to follow a way of peformance tuning,
Add a Hex Hash Prefix to Key Name
, written in Request Rate and Performance Considerations - Amazon Simple Storage Service. You can configure the length of string with ahex_random_length
parameter (Default: 4).
The default format is %{path}%{time_slice}_%{index}.%{file_extension}
.
For instance, using the example configuration above, actual object keys on S3 will be something like:
"logs/20130111-22_0.gz"
"logs/20130111-23_0.gz"
"logs/20130111-23_1.gz"
"logs/20130112-00_0.gz"
With the configuration:
s3_object_key_format %{path}/events/ts=%{time_slice}/events_%{index}.%{file_extension}
path log
time_slice_format %Y%m%d-%H
You get:
"log/events/ts=20130111-22/events_0.gz"
"log/events/ts=20130111-23/events_0.gz"
"log/events/ts=20130111-23/events_1.gz"
"log/events/ts=20130112-00/events_0.gz"
NOTE: ${hostname} placeholder is deprecated since v0.8. You can get same result by using configuration's embedded ruby code feature.
s3_object_key_format %{path}%{time_slice}_%{hostname}%{index}.%{file_extension}
s3_object_key_format "%{path}%{time_slice}_#{Socket.gethostname}%{index}.%{file_extension}"
Above two configurations are same. The important point is wrapping ""
is needed for #{Socket.gethostname}
.
force_path_style
:force_path_style (Boolean) — default: false — When set to true, the bucket name is always left in the request URI and never moved to the host as a sub-domain. See Plugins::S3BucketDns for more details.
store_as
archive format on S3. You can use serveral format:
- gzip (default)
- json
- text
- lzo (Need lzop command)
- lzma2 (Need xz command)
- gzip_command (Need gzip command)
- This compressor uses an external gzip command, hence would result in
utilizing CPU cores well compared with
gzip
- This compressor uses an external gzip command, hence would result in
utilizing CPU cores well compared with
See Use your compression algorithm
section for adding another format.
<format>
or format
Change one line format in the S3 object. Supported formats are "out_file", "json", "ltsv" and "single_value". See also official Formatter article.
-
out_file (default).
time\ttag\t{..json1..} time\ttag\t{..json2..} ...
-
json
{..json1..} {..json2..} ...
At this format, "time" and "tag" are omitted. But you can set these information to the record by setting "include_tag_key" / "tag_key" and "include_time_key" / "time_key" option. If you set following configuration in S3 output:
format json
include_time_key true
time_key log_time # default is time
then the record has log_time field.
{"log_time":"time string",...}
-
ltsv
key1:value1\tkey2:value2 key1:value1\tkey2:value2 ...
"ltsv" format also accepts "include_xxx" related options. See "json" section.
- single_value
Use specified value instead of entire recode. If you get '{"message":"my log"}', then contents are
my log1
my log2
...
You can change key name by "message_key" option.
auto_create_bucket
Create S3 bucket if it does not exists. Default is true.
check_bucket
Check mentioned bucket if it exists in AWS or not. Default is true.
When it is false, fluentd will not check aws s3 for the existence of the mentioned bucket. This is the case where bucket will be pre-created before running fluentd.
check_object
Check object before creation if it exists or not. Default is true.
When it is false, s3_object_key_format will be %{path}%{date_slice}_%{time_slice}.%{file_extension} where, time_slice will be in hhmmss format, so that each object will be unique. Example object name, assuming it is created on 2016/16/11 3:30:54 PM 20161611_153054.txt (extension can be anything as per user's choice)
Example when check_bucket=false and check_object=false
When the mentioned configuration will be made, fluentd will work with the minimum IAM poilcy, like: "Statement": [{ "Effect": "Allow", "Action": "s3:PutObject", "Resource": ["*"] }]
check_apikey_on_start
Check AWS key on start. Default is true.
proxy_uri
uri of proxy environment.
path
path prefix of the files on S3. Default is "" (no prefix).
buffer_path (required)
path prefix of the files to buffer logs.
time_slice_format
Format of the time used as the file name. Default is '%Y%m%d'. Use '%Y%m%d%H' to split files hourly.
time_slice_wait
The time to wait old logs. Default is 10 minutes. Specify larger value if old logs may reache.
utc
Use UTC instead of local time.
storage_class
Set storage class. Possible values are STANDARD
, REDUCED_REDUNDANCY
, STANDARD_IA
from Ruby SDK.
reduced_redundancy
Use S3 reduced redundancy storage for 33% cheaper pricing. Default is false.
This is deprecated. Use storage_class REDUCED_REDUNDANCY
instead.
acl
Permission for the object in S3. This is useful for cross-account access using IAM roles. Valid values are:
- private (default)
- public-read
- public-read-write (not recommended - see Canned ACL)
- authenticated-read
- bucket-owner-read
- bucket-owner-full-control
To use cross-account access, you will need to create a bucket policy granting the specific access required. Refer to the AWS documentation for examples.
hex_random_length
The length of %{hex_random}
placeholder. Default is 4 as written in
Request Rate and Performance Considerations - Amazon Simple Storage
Service.
The maximum length is 16.
overwrite
Overwrite already existing path. Default is false, which raises an error
if a s3 object of the same path already exists, or increment the
%{index}
placeholder until finding an absent path.
use_server_side_encryption
The Server-side encryption algorithm used when storing this object in S3 (e.g., AES256, aws:kms)
ssekms_key_id
Specifies the AWS KMS key ID to use for object encryption. You have to
set "aws:kms" to use_server_side_encryption
to use the KMS encryption.
sse_customer_algorithm
Specifies the algorithm to use to when encrypting the object (e.g., AES256).
sse_customer_key
Specifies the AWS KMS key ID to use for object encryption.
sse_customer_key_md5
Specifies the 128-bit MD5 digest of the encryption key according to RFC 1321.
compute_checksums
AWS SDK uses MD5 for API request/response by default. On FIPS enabled environment,
OpenSSL returns an error because MD5 is disabled. If you want to use
this plugin on FIPS enabled environment, set compute_checksums false
.
signature_version
Signature version for API request. s3
means signature version 2 and
v4
means signature version 4. Default is nil
(Following SDK's default).
It would be useful when you use S3 compatible storage that accepts only signature version 2.
warn_for_delay
Given a threshold to treat events as delay, output warning logs if delayed events were put into s3.
Typically, you use AssumeRole for cross-account access or federation.
<match *>
@type s3
<assume_role_credentials>
role_arn ROLE_ARN
role_session_name ROLE_SESSION_NAME
</assume_role_credentials>
</match>
See also:
role_arn (required)
The Amazon Resource Name (ARN) of the role to assume.
role_session_name (required)
An identifier for the assumed role session.
policy
An IAM policy in JSON format.
duration_seconds
The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 3600 seconds (1 hour). By default, the value is set to 3600 seconds.
external_id
A unique identifier that is used by third parties when assuming roles in their customers' accounts.
Retrieve temporary security credentials via HTTP request. This is useful on EC2 instance.
<match *>
@type s3
<instance_profile_credentials>
ip_address IP_ADDRESS
port PORT
</instance_profile_credentials>
</match>
See also:
- Aws::InstanceProfileCredentials
- Temporary Security Credentials - AWS Identity and Access Management
- Instance Metadata and User Data - Amazon Elastic Compute Cloud
retries
Number of times to retry when retrieving credentials. Default is 5.
ip_address
Default is 169.254.169.254.
port
Default is 80.
http_open_timeout
Default is 5.
http_read_timeout
Default is 5.
This loads AWS access credentials from local ini file. This is useful for local developing.
<match *>
@type s3
<shared_credentials>
path PATH
profile_name PROFILE_NAME
</shared_credentials>
</match>
See also:
path
Path to the shared file. Defaults to "#{Dir.home}/.aws/credentials".
profile_name
Defaults to 'default' or [ENV]('AWS_PROFILE')
.
- Create new SQS queue (use same region as S3)
- Set proper permission to new queue
- Configure S3 event notification
- Write configuration file such as fluent.conf
- Run fluentd
<source>
@type s3
aws_key_id YOUR_AWS_KEY_ID
aws_sec_key YOUR_AWS_SECRET_KEY
s3_bucket YOUR_S3_BUCKET_NAME
s3_region ap-northeast-1
<sqs>
queue_name YOUR_SQS_QUEUE_NAME
</sqs>
</source>
aws_key_id
AWS access key id. This parameter is required when your agent is not running on EC2 instance with an IAM Role.
aws_sec_key
AWS secret key. This parameter is required when your agent is not running on EC2 instance with an IAM Role.
aws_iam_retries
The number of attempts to make (with exponential backoff) when loading instance profile credentials from the EC2 metadata service using an IAM role. Defaults to 5 retries.
s3_bucket (required)
S3 bucket name.
s3_region
S3 region name. For example, US West (Oregon) Region is
"us-west-2". The full list of regions are available here. >
http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region. We
recommend using s3_region
instead of s3_endpoint
.
store_as
archive format on S3. You can use serveral format:
- gzip (default)
- json
- text
- lzo (Need lzop command)
- lzma2 (Need xz command)
- gzip_command (Need gzip command)
- This compressor uses an external gzip command, hence would result in utilizing CPU cores well compared with
gzip
- This compressor uses an external gzip command, hence would result in utilizing CPU cores well compared with
See 'Use your compression algorithm' section for adding another format.
format
Parse a line as this format in the S3 object. Supported formats are "apache_error", "apache2", "syslog", "json", "tsv", "ltsv", "csv", "nginx" and "none".
check_apikey_on_start
Check AWS key on start. Default is true.
proxy_uri
URI of proxy environment.
sqs/queue_name (required)
SQS queue name. Need to create SQS queue on the region same as S3 bucket.
sqs/skip_delete
When true, messages are not deleted after polling block. Default is false.
sqs/wait_time_seconds
The long polling interval. Default is 20.
The following is an example for a minimal IAM policy needed to write to an s3 bucket (matches my-s3bucket/logs, my-s3bucket-test, etc.).
{ "Statement": [
{ "Effect":"Allow",
"Action":"s3:*",
"Resource":"arn:aws:s3:::my-s3bucket*"
} ]
}
Note that the bucket must already exist and auto_create_bucket has no effect in this case.
Refer to the AWS documentation for example policies.
Using IAM roles with a properly configured IAM policy are preferred over embedding access keys on EC2 instances.
s3 plugin has plugabble compression mechanizm like Fleuntd's input / output
plugin. If you set 'store_as xxx', out_s3
plugin searches
fluent/plugin/s3_compressor_xxx.rb
and in_s3
plugin searches
fluent/plugin/s3_extractor_xxx.rb
. You can define your (de)compression with
'S3Output::Compressor'/S3Input::Extractor
classes. Compressor API is here:
module Fluent # Since fluent-plugin-s3 v1.0.0 or later, use Fluent::Plugin instead of Fluent
class S3Output
class XXXCompressor < Compressor
S3Output.register_compressor('xxx', self)
# Used to file extension
def ext
'xxx'
end
# Used to file content type
def content_type
'application/x-xxx'
end
# chunk is buffer chunk. tmp is destination file for upload
def compress(chunk, tmp)
# call command or something
end
end
end
end
Extractor
is similar to Compressor
See bundled Compressor
/Extractor
classes for more detail.
Web site | http://fluentd.org/ |
---|---|
Documents | http://docs.fluentd.org/ |
Source repository | http://github.com/fluent/fluent-plugin-s3 |
Discussion | http://groups.google.com/group/fluentd |
Author | Sadayuki Furuhashi |
Copyright | (c) 2011 FURUHASHI Sadayuki |
License | Apache License, Version 2.0 |