Skip to content

Latest commit

 

History

History
704 lines (481 loc) · 19.4 KB

3、打通行为日志采集通道.md

File metadata and controls

704 lines (481 loc) · 19.4 KB

本blog正式开始数据采集,在开始之前,应该确保服务器集群有Hadoop、Hive、Kafka、Flume等环境,如果没有安装,请访问以下链接进行安装:

一、行为日志数据通道

行为日志一般存储在服务器的文件里面,所以这一part可以使用FlumeTailDirSource监控日志文件,再使用Kafka Channel发送到Kafka,进行一些过滤之后,再使用Flume发送到HDFS里面。具体过程如下图所示:

image-20220528154431532

二、从文件到Kafka

1、规划

按照规划,需要采集的用户行为日志文件分布在hadoop102,hadoop103两台日志服务器,故需要在hadoop102,hadoop103两台节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(JSON)进行校验,然后将校验通过的日志发送到Kafka

第一个要写的应该是从文件到Kafka中,Flume共有三大组件,分别是SourceChannelSink,对于这三个组件,这一部分选用:

  • TailDir Source:因为他可以实现断点续传、多目录的监控
  • Kafka Channel:Flume这么流行的一个主要原因就是与Kafka的集成,如果Flume传输数据的目的地是Kafka,那么只需要配置Kafka Channel就好,可以不用配置Kafka Sink

综上所述,这一部分使用:TailDir Source监控目录、使用Kafka Channel直接输出到Kafka,再TailDir之后还需要编写一个拦截器,用于拦截不是JSON格式的数据

image-20220528164022974

2、编写Flume配置文件

有了规划之后,直接开始写配置文件吧,进入flume的job文件,创建一个目录project,专门存储这个项目的flume配置文件

[wzq@hadoop102 ~]$ cd /opt/module/flume-1.9.0/job/
[wzq@hadoop102 job]$ mkdir project
[wzq@hadoop102 job]$ cd project

紧接着创建一个配置文件:(可以点我看完整版配置文件

[wzq@hadoop102 project]$ touch file_to_kafka.conf

然后就开始写这个配置文件吧!对于没有学过Flume的同学,欢迎访问Flume系列blog!

这个文件由四个部分组成:命名、Source、Channel和绑定,因为使用Kafka Channel可以不写Sink,所以这一部分没有Sink:

# Name the components on this agent
a1.sources = r1
a1.channels = c1

# Configure the TailDir source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/tail_dir.json
# 拦截器稍后配置


# Use Kafka Channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
# 这里不转换为FlumeEvent到Kafka
a1.channels.c1.parseAsFlumeEvent = false 

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

3、编写拦截器

配置好Flume之后,需要写一个拦截器,这个拦截器的作用是过滤掉文件中不是JSON格式的数据,这一部分在idea里面编写,读者可以直接点击这串字访问代码文件!

打开idea,新建一个Maven项目,名字为collect,随后打开pom.xml添加以下依赖:

<dependencies>
    <!-- flume依赖,scope设置为provided,打包时候不包含此包 -->
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>
    <!-- fastjson依赖,用于判断一个数据是否为JSON格式 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<!-- 打包插件 -->
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

随后创建一个包:com.wzq.flume.interceptor,在该包下创建一个JSONUtils类,这个类将作为工具类判断一个字符串是否为JSON文件:

package com.wzq.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

public class JSONUtils {
    // 判断是否为JSON格式文件
    public static boolean isJSONValidate(String log) {
        try {
            JSON.parse(log);    // 如果解析过来不报异常就是JSON格式文件
            return true;
        } catch (JSONException e) {
            return false;
        }
    }
}

然后就是编写正儿八经的拦截器了,配置拦截器有以下几个步骤:

  • 实现flume提供的Interceptor接口

  • 实现该接口的四个方法

    • initialize方法:初始化方法,如果希望配置一些什么参数,在这里可以写;当然如果要配合数据库的字段完成拦截这里也可以配置JDBC或其他链接

    • intercept方法:

      • // 处理单个Event的方法,为了解耦使用
        public Event intercept(Event event)
      • // 处理一个批次的Event,Flume内部调用该方法,这个方法调用上面的方法
        public List<Event> intercept(List<Event> events)
    • close方法:关闭连接方法

  • 实现Interceptor.Builder接口,需要在build方法中返回当前对象的实例

代码如下:

package com.wzq.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class LogInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 获取Event的body信息,转换为String格式
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        if (JSONUtils.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        Iterator<Event> iterator = events.iterator();
        while (iterator.hasNext()) {
            Event event = iterator.next();
            if (intercept(event) == null) {
                iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new LogInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

这就算写完这个拦截器了,在这之后可以打为jar包:

image-20220528203104204

随后将比较大的这个jar包上传到服务器flume安装路径的lib目录下:

image-20220528203130739

上传之后可以查看是否上传成功:

[wzq@hadoop102 flume-1.9.0]$ ll lib/ | grep flume-interceptor
-rw-r--r--. 1 wzq wzq  660656 5月  28 17:21 flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

不要忘记了还有一步,还需要在第二步编写的file_to_kafka.conf配置文件中,配置拦截器,加入以下内容:

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.wzq.flume.interceptor.LogInterceptor$Builder

4、测试运行

这时候从文件到Kafka这块算是已经打通了,现在可以测试了,我们期望的是在Hadoop102与Hadoop103运行flume,所以在开始之前应该做好分发工作:

[wzq@hadoop102 module]$ xsync flume-1.9.0/

为了这个脚本方便运行,可以写一个脚本控制Flume的启停:

[wzq@hadoop102 module]$ cd ~/bin/
[wzq@hadoop102 bin]$ vim f1.sh

在里面添加如下内容:(查看源文件

#!/bin/bash

case $1 in
"start") {
	for i in hadoop102 hadoop103
	do
		echo "-------------------启动 $i 采集flume-------------------"
		ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/job/project/file_to_kafka.conf >/dev/null 2>&1 &"
	done
};;
"stop") {
	for i in hadoop102 hadoop103
	do
		echo "-------------------停止 $i 采集flume-------------------"
		ssh $i "ps -ef | grep file_to_kafka.conf | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
	done
};;
*) {
	echo "Input Args Error..."
};;
esac

增加该文件的可执行权限:

[wzq@hadoop102 bin]$ chmod +x f1.sh 

还有一步工作就是创建一个Kafka的topic:topic_log

[wzq@hadoop102 bin]$ zk.sh start	# 启动zookeeper
[wzq@hadoop102 bin]$ kf.sh start	# 启动kafka
[wzq@hadoop102 bin]$ cd /opt/module/kafka-3.0.0/
[wzq@hadoop102 kafka-3.0.0]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic topic_log --partitions 3 --replication-factor 3

然后在hadoop103主机上创建一个消费者,消费topic_log主题的数据:

[wzq@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

ok,这时候在hadoop102启动flume的f1脚本,并且生产一点日志:

[wzq@hadoop102 bin]$ f1.sh start
-------------------启动 hadoop102 采集flume-------------------
-------------------启动 hadoop103 采集flume-------------------
[wzq@hadoop102 bin]$ lg.sh start
-------------------hadoop102-------------------
-------------------hadoop103-------------------

启动成功稍等片刻,可以看到hadoop103的消费者正在源源不断的消费数据:

image-20220528205457488

至此,从文件到Kafka的通道就打通了!

三、从Kafka到HDFS

1、规划

看第一节的规划,这个Flume配置应该在hadoop104主机!

接下来打通第二条通道, 从Kafka到HDFS,这一步的主要工作就是用Flume消费Kafka的topic_log主题,然后使用一个Channel,再使用HDFS Sink输出到HDFS:

image-20220528211611980

在这一步,使用:

  • Kafka Source:因为要消费Kafka的topic,这里还需要另外配置一个拦截器,这个拦截器的主要功能是为Event增加一个header,key是timestap时间戳,value为json字符串中ts字段(这个字段请参考行为日志数据格式的JSON格式)的值
  • File Channel:使用文件信道,保证可靠性
  • HDFS Sink:因为要输出到HDFS,所以需要使用这个Sink

2、编写Flume配置文件

有了规划之后,直接写配置文件,进入flume的job/project目录,创建一个文件:

[wzq@hadoop104 ~]$ cd /opt/module/flume-1.9.0/job/project/
[wzq@hadoop104 project]$ touch kafka_to_hdfs.conf

这个Flume配置文件由命名、kafka Source、file Channel、hdfs Sink、绑定信道组成:(可以点我看完整版配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Kafka source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_log
a1.sources.source1.batchSize = 5000
a1.sources.source1.batchDurationMillis = 2000
# 后面配置拦截器

# HDFS sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

# File Channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume-1.9.0/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume-1.9.0/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、编写拦截器

这个拦截器的作用主要就是给每个Event的头加上时间戳,在com.wzq.flume.interceptor包下创建TimeStampInterceptor类:读者可以直接点击这串字访问代码文件!

package com.wzq.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimeStampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);

        JSONObject jsonObject = JSONObject.parseObject(log);
        String ts = jsonObject.getString("ts");

        headers.put("timestamp", ts);   // 设置时间戳
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TimeStampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

写完之后按照上面的步骤打为jar包,把这个比较大的文件上传到hadoop104,flume安装目录的lib目录下(首先删除旧的这个文件)

image-20220528213917111

然后在flume配置文件中,加入拦截器:

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.wzq.flume.interceptor.TimeStampInterceptor$Builder

4、测试运行

首先启动Hadoop

[wzq@hadoop104 lib]$ myhadoop.sh start
 =================== 启动 hadoop 集群 ===================
 --------------- 启动 hdfs ---------------
Starting namenodes on [hadoop102]
Starting datanodes
Starting secondary namenodes [hadoop104]
 --------------- 启动 yarn ---------------
Starting resourcemanager
Starting nodemanagers
 --------------- 启动 historyserver ---------------

为了方便起见,直接在hadoop102主机上编写一个第二条通道的flume启停脚本:

[wzq@hadoop102 project]$ cd ~/bin/
[wzq@hadoop102 bin]$ vim f2.sh

在该文件中写入:(查看源文件

#!/bin/bash

case $1 in
"start") {
	echo "-------------------启动 hadoop104 采集flume-------------------"
	ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/job/project/kafka_to_hdfs.conf >/dev/null 2>&1 &"
};;
"stop") {
	echo "-------------------停止 hadoop104 采集flume-------------------"
	ssh hadoop104 "ps -ef | grep kafka_to_hdfs.conf | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
};;
*) {
	echo "Input Args Error..."
};;
esac

增加该文件的可执行权限:

[wzq@hadoop102 bin]$ chmod +x f2.sh

ok!至此大功告成啦!现在从第一个通道开始测试一下:

# hadoop103 消费topic_log数据
[wzq@hadoop103 kafka-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
# hadoop102 启动f1.sh脚本
[wzq@hadoop102 bin]$ f1.sh start
# hadoop102 启动f2.sh脚本
[wzq@hadoop102 bin]$ f2.sh start
# hadoop102 生成一些日志
[wzq@hadoop102 bin]$ lg.sh start

hadoop103可以消费到数据:

image-20220528215506764

Hadoop接收到数据:

image-20220528215527301

大功告成!!!

五、用户行为日志数据启停脚本

为了方便启动停止这个采集通道,可以写一个脚本:(查看源文件

[wzq@hadoop102 bin]$ cd ~/bin/
[wzq@hadoop102 bin]$ vim cluster.sh

写入以下内容:

#!/bin/bash

case $1 in
"start") {
	echo "=======================启动用户行为日志采集======================="

	# 启动zookeeper
	zk.sh start

	# 启动Hadoop集群
	myhadoop.sh start

	# 启动Kafka集群
	kf.sh start

	# 启动Flume第一条通道
	f1.sh start

	# 启动Flume第二条通道
	f2.sh start
};;
"stop") {
	echo "=======================停止用户行为日志采集======================="

	#停止 Flume消费集群
	f2.sh stop

	#停止 Flume采集集群
	f1.sh stop

	#停止 Kafka采集集群
	kf.sh stop

	#停止 Hadoop集群
	myhadoop.sh stop

	#停止 Zookeeper集群
	zk.sh stop
};;
*) {
	echo "Input Args Error..."
};;
esac

赋予权限:

[wzq@hadoop102 bin]$ chmod +x cluster.sh

参考资料