安装Flume就需要有Flume的安装包,Flume包可以在官网下载到:
然后:
如果要下载往期版本可以,点击第一张图片所在网页下方的:archive repository
在这里可以下载Flume的所有版本:
把下载之后的tar
包上传到hadoop102
的/opt/software
目录下,随后解压到/opt/module
:
# 解压文件
[wzq@hadoop102 software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
# cd到module
[wzq@hadoop102 software]$ cd /opt/module/
# 改个名字,一般是:框架名+版本号
[wzq@hadoop102 module]$ mv apache-flume-1.9.0-bin flume-1.9.0
打开这个文件夹,发现这个文件和以往的那些框架都大差不差,都是由bin、conf、lib等组成,下图展示了各个文件夹都是干嘛的
删除lib
目录下的guava
包,这个包是Google
开源的一个工具类,但是比较恶心,大数据框架里面引用这个包,大家的版本都不一致,所以为了解决兼容性问题,这个包我们应该删掉:
[wzq@hadoop102 flume-1.9.0]$ ls lib/ | grep guava
guava-11.0.2.jar
[wzq@hadoop102 flume-1.9.0]$ rm -rf lib/guava-11.0.2.jar
至此,Flume已经安装部署好了,但是值得注意的是:必须要有Java的环境变量,因为在前面的Hadoop、Zookeeper、Hive都已经配置好了,所以这里不做演示
需求:使用Flume监听一个端口,收集该端口数据,并打印到控制台
如下图所示,首先借助netcat
工具向本机的44444端口发送数据;Flume监控本机的44444端口,通过Flume的Source端读取数据;最后Flume将获取到的数据通过Sink端写出到控制台
最终要达到的一个效果是:输入数据,flume端在控制台输出:
首先需要安装netcat
:
[wzq@hadoop102 flume-1.9.0]$ sudo yum install nc
安装完成,我们简单使用一下netcat
,开启一个端口的命令是:
[wzq@hadoop102 flume-1.9.0]$ nc -lk 9999
监听一个端口的命令是:
[wzq@hadoop102 flume-1.9.0]$ nc localhost 9999
必须首先开启端口,然后才能监听
这时候就可以两个窗口就可以互相发送信息了:
首先判断44444
端口是不是已经被占用了:
[wzq@hadoop102 flume-1.9.0]$ sudo netstat -tlp | grep 44444
如果没有弹出任何东西,就是说44444端口没有被占用,如果该端口被占用,有两个解决方案:
- 找到占用该端口的进程号,杀掉这个进程
- 换一个端口
创建Flume Agent配置文件 netcat-flume-logger.conf,该文件需要在flume/job/
目录下:
[wzq@hadoop102 flume-1.9.0]$ mkdir job
[wzq@hadoop102 flume-1.9.0]$ vim job/netcat-flume-logger.conf
然后将以下配置信息填进去:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意:这个配置文件名称是有讲究的
netcat-flume-logger.conf
,一般情况下是这样命名的:数据源-flume-输出位置.conf
现在来解读一下这个配置文件的信息:
注意:
agent
的名字是a1
,这个a1
需要保证全局唯一!- 事务的个数需要小于channel中event的个数
- 一个channel可以有多个source
- 一个channel只能有一个sink
其实这样子就算配置好了
首先要开启Flume
监听端口,开启Flume
监听端口有两种写法:
第一种:
[wzq@hadoop102 flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
第二种写法:
[wzq@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
参数说明:
--conf
等价于-c
:指定配置文件所在目录--name
等价于-n
:指定agent的名字a1--conf-file
等价于-f
:flume本次启动读取的配置文件是job目录下的netcat-flume-logger.conf-Dflume.root.logger=INFO,console
:-D表示flume运行时动态修改flume.root.logger
参数属性值,并将控制台日志打印级别设置为INFO,日志级别包括log、info、warn、error
然后使用netcat
工具向本机44444端口发送内容:
[wzq@hadoop102 flume-1.9.0]$ nc localhost 44444
结果如下图所示:
**需求:**实时监控Hive日志,并上传到HDFS
如下图所示:我们首先需要创建符合条件的flume配置文件,然后执行该配置文件开启监控,随后打开Hive生成日志,最后再输出到HDFS。
做这个案例服务器必须要有Java、Hadoop、Hive的环境
首先在/opt/module/flume-1.9.0/job/
目录下创建一个文件:file-flume-hdfs.conf
:
[wzq@hadoop102 flume-1.9.0]$ vim job/file-flume-hdfs.conf
首先写Agent配置,这里就是给Agent起名字,包括sources、channel、sinks的名字:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
然后写source配置,这里的配置我们可以参考Apach Flume官网的文档:因为我们想监控一个文件,这个文件是实时追加的,我们平常可以使用tail -f 文件
去监控这个文件,所以在Source这块需要使用exec source
,下表是这个模块的配置信息:
Property Name | Default | Description |
---|---|---|
channels | – | |
type | – | The component type name, needs to be exec |
command | – | The command to execute |
shell | – | A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc. |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
batchSize | 20 | The max number of lines to read and send to the channel at a time |
batchTimeout | 3000 | Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream |
selector.type | replicating | replicating or multiplexing |
selector.* | Depends on the selector.type value | |
interceptors | – | Space-separated list of interceptors |
interceptors.* |
其中加粗的字体是必须要有的配置
我们先配必须要有的,一会儿再来扩展:
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.channels = c1
注意这里
tail -F
,他和tail -f
的区别是:
-f
:如果文件删掉了,不会再继续监控-F
:如果文件没有了,tail依旧会帮助我们建立连接持续监控
然后是写sink配置,这里需要使用HDFS sink
,因为我们最终需要输出到HDFS,看一下HDFS sink的所有配置信息:(表格太长了,这里贴一张图片吧!)
和上面一样黑色字体是必须要配置的,先配必须的:
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume
a1.sinks.k1.channel = c1
注意:这里的hdfs.path,指定的hadoop端口,必须是hdfs通信的内网端口,可以在hadoop配置文件core-site.xml种查看这个端口
最后我们再配置一下channel:
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be memory |
capacity | 100 | The maximum number of events stored in the channel |
transactionCapacity | 100 | The maximum number of events the channel will take from a source or give to a sink per transaction |
keep-alive | 3 | Timeout in seconds for adding or removing an event |
byteCapacityBufferPercentage | 20 | Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below. |
byteCapacity | see description | Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body , which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB. |
配置必须要有的:
# Memory Channel
a1.channels.c1.type = memory
综合来看这个配置信息:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume
a1.sinks.k1.channel = c1
# Memory Channel
a1.channels.c1.type = memory
我们可以在sinks
里面多加一些配置,比如将这些个日志文件以年月日的形式存储到各个文件夹,多长时间创建一个文件,文件达到多少兆再创建一个等等:
# 给这个地址加上时间分文件
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y-%m-%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳,这个一定要设置为true
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a1.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0
也可以指定Channel的容量,以及事务容量:
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
所以这个配置文件是这样的:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y-%m-%d/%H
a1.sinks.k1.channel = c1
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a1.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0
# Memory Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
写完之后就可以测试了,执行一下命令:
[wzq@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/file-flume-hdfs.conf
运行Hive:
[wzq@hadoop102 hive-3.1.2]$ hive
随便执行几条命令:
use gulivideo;
show tables;
select * from gulivideo_orc limit 10;
打开hdfs
可以发现日志已经有啦:
**需求:**使用Flume监听一个目录下的文件,并上传至HDFS
在这一部分source
就要变一变了,监控的是目录下的新文件,所以应该使用Spooldir Source
其他不变
首先在/opt/moudle/flume-1.9.0/job/
下创建文件dir-flume-hdfs.conf
:
[wzq@hadoop102 flume-1.9.0]$ touch job/dir-flume-hdfs.conf
然后开始写配置文件了,第一个Agent部分:
# Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
第二个写Source部分,这次需要使用的是Spooldir Source
,所以可以在官网上找一找找个source都需要配置一些什么东西:
需要关注的是加粗字体,还有fileSuffix
、includePattern
、ignorePattern
还有fileHeader
type
:source的类型,是什么就指定什么,前两个案例已经写了netcat
和exec
了spoolDir
:要监控哪个文件夹,这个案例监控/opt/module/flume
下的upload
目录,读者可以自己创建一个channels
:与哪个channel绑定,这次我们在最后再绑定channelfileSuffix
:文件后缀,默认值.COMPLETED
,Flume会扫描指定的文件夹,如果某个文件后缀是.COMPLETED
就不会被上传,如果往监控目录扔一个文件,该文件会立刻被Flume扫描,并加上这个后缀includePattern
:只有被你定义的正则表达式匹配的文件才会上传ignorePattern
:忽略你定义的正则表达式匹配的文件fileHeader
:是否添加存储绝对路径文件名的标头,这个默认值是false
,需要设置为true
所以就可以写配置了:
# Source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume-1.9.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
# 文件后缀为tmp的文件不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
第三个部分写sink
,这里直接照搬第二个案例sink
的配置,因为我们想上传到hdfs
:
# Sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y-%m-%d/%H
# 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
# 重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才刷新到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
# 多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 20
# 设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
第四个部分写channel
,也和上面一样:
# Channel
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
最后一部分是channels与soure和sink绑定:
# Bind
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
然后就可以测试了,输入下面的命令开启监控:
[wzq@hadoop102 flume-1.9.0]$ bin/flume-ng agent -n a3 -c conf/ -f job/dir-flume-hdfs.conf
然后我们可以在外部创建一个文件,然后丢入本地的upload
文件夹:
[wzq@hadoop102 flume-1.9.0]$ vim 1.txt
[wzq@hadoop102 flume-1.9.0]$ mv 1.txt upload/
移动到本地的upload文件夹后,这个文件被加了.COMPLETED
后缀:
再去看看hdfs
,已经成功上传了
但是注意:
- 同名的文件不能扔到一个文件夹
- 不能扔进去一个和指定的
.COMPLETED
后缀一样的文件到本地目录- 不能在本地目录动态修改文件
exec source
适用于监控一个实时追加的文件,但不能保证数据丢失;Spooldir source
能保证数据不丢失,而且能够实现断点续传,但不能实时监控,而Taildir source
既能实现实时追加,也可以实现断点续传,保证数据不丢失。
**需求:**使用Flume监听整个目录的实时追加文件,并上传至HDFS
首先创建一个文件:taildir-flume-hdfs.conf
:
[wzq@hadoop102 flume-1.9.0]$ touch job/taildir-flume-hdfs.conf
然后开始写配置,第一部分依旧是agent
:
# Agent
a4.sources = r4
a4.sinks = k4
a4.channels = c4
然后是source
,这次需要使用taildir source
,去官网看一下配置:
除了关注黑体字之外,还需要关注一个配置positionFile
:
type
:source类型filegroups
:这个字段可以为要监控的目录命名filegroups.<filegroupName>
:为上一步命名的目录指定目录,可以写入正则规则positionFile
:要实现断点续传,就需要有一个文件标记某个要上传的文件到了哪一行,这个文件默认存储位置是:~/.flume/taildir_position.json
,可以把它更改到别的地方
所以就可以写配置了:
# Source
a4.sources.r4.type = TAILDIR
a4.sources.r4.filegroups = f1 f2
a4.sources.r4.filegroups.f1 = /opt/module/flume-1.9.0/files/file1/.*file.*
a4.sources.r4.filegroups.f2 = /opt/module/flume-1.9.0/files/file2/.*log.*
a4.sources.r4.positionFile = /opt/module/flume-1.9.0/tail_dir.json
接下来的sinks、channels和绑定都可以和上面的一样,唯一不一样的就是可以换个在hdfs上存储的目录,这里只贴这一部分:
a4.sinks.k4.hdfs.path = hdfs://hadoop102:8020/flume/upload2/%Y-%m-%d/%H
首先启动flume的agent进程:
[wzq@hadoop102 flume-1.9.0]$ bin/flume-ng agent -n a4 -c conf/ -f job/taildir-flume-hdfs.conf
然后写入一些文件:
[wzq@hadoop102 files]$ touch file1/file1.txt
[wzq@hadoop102 files]$ echo hello >> file1/file1.txt
[wzq@hadoop102 files]$ touch file2/log1.txt
[wzq@hadoop102 files]$ echo wzq >> file2/log1.txt
观察hdfs:
已经成功上传
但是美中不足的是,如果对已有的文件进行更名操作,flume就会再次上传一份数据,这就导致了上传重复数据,如果要解决这一问题,有两种解决方案:
- 尚硅谷Flume教学视频
- Apache Flume官网