Kafka系列(六)外部系统的集成

一、Flume和Kafka集成

1.1Flume作为生产者

1.1.1启动Zookeeper和Kafka集群

1.1.2启动kafka消费者

shell>cd /data/app/kafka
shell>bin/kafka-console-comsumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName

1.1.3安装Flume

1.1.4修改配置文件

shell>cd /data/app/flume
shell>vim conf/log4j.properties
###修改flume.log.dir
flume.log.dir=/data/app/flume/logs

1.1.5编写Flume文件

shell>cd /data/app/flume
shell>mkdir jobs
shell>vim jobs/file_to_kafka.conf

file_to_kafka.conf

#1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/app/kafka/applog/app.*
a1.sources.r1.positionFile = /data/app/flume/taildir_position.json

#3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#4 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.asks = 1
a1.sinks.k1.kafka.producer.linger.ms =1

#5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
架构能力Kafka系列(六)外部系统的集成插图

1.1.6启动Flume

shell>cd /data/app/flume
shell>bin/flume0ng agent -c conf/ -n al -f jobs/file_to_kafka.conf &

1.1.7 测试

shell>cd /data/app/kafka/applog
shell>vim app1
###content 输入hello,再kafka的消费者控制台可以看到输出

1.2 Flume作为消费者

1.2.1 启动Zookeeper、kafka和flume

1.2.2 编写Flume启动配置文件

shell>cd /data/app/flume
shell>mkdir jobs
shell>vim jobs/kafka_to_file.conf
shell>bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console
#1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 配置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSink
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id

#3 配置channel
a1.channel.c1.type = memory
a1.channel.c1.capacity = 1---
a1.channels.c1.transactionCapacity = 100

#4 配置sink
a1.sinksk1.type = logger

#5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
架构能力Kafka系列(六)外部系统的集成插图1

1.2.3启动Kafka生产者

shell>cd /data/app/flume
shell>bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName

然后在kafka输入消息,会在flume的控制台输出

二、Flink和Kafka集成

2.1Flink作为生产者发送到Kafka

2.1.1 创建JAVA-maven项目

2.1.2 引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>

2.1.3将log4j.properties文件添加到resources里

log4j.rootLogger=error,stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:5L) : %m%n

log4j.appender.R=org.apachelog4j.RollingFileAppender
log4j.appender.R.File = ../log/agent.log
log4j.appender.R.MaxFileSize = 1024KB
log4j.appender.R.MaxBackupIndex = 1

log4j.appender.R.layout = org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:6L) : %m%n

2.1.4 创建Flinke生产者类

package flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.ArrayList;
import java.util.Properties;

public class Producer {
    public static void main(String[] args) {
        //1获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //三个节点则参数为3
        env.setParallelism(3);
        //2准备数据源
        ArrayList<String>  wordlist = new ArrayList<>();
        wordlist.add("hello");
        wordlist.add("fox");
        DataStreamSource<String> stream = env.fromCollection(wordlist);

        //创建一个Kafka 生产者
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");

        //设置监听主题
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("first",new SimpleStringSchema(),properties);
        //3 添加数据源 kafka 生产者
        stream.addSink(kafkaProducer);
        //4 执行代码
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
架构能力Kafka系列(六)外部系统的集成插图2

2.1.5 开启Kafka消费者模式验证

shell>cd /data/app/kafka
shell>bin/kafka-console-comsumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic first

2.2 Flink作为Kafka的消费者

package flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        //1获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //三个节点则参数为3
        env.setParallelism(3);
        //2创建一个消费者
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");


        //设置监听主题
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("first",new SimpleStringSchema(),properties);
        //3 关联消费者和 flink 流
        env.addSource(kafkaConsumer).print();
        //4 执行代码
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
架构能力Kafka系列(六)外部系统的集成插图3

三、SpringBoot和Kafka集成

3.1创建一个SpringBoot应用

架构能力Kafka系列(六)外部系统的集成插图4

https://juejin.cn/post/6844903791603499016#heading-12

四、Kafka集成Spark

4.1Spark作为生产者

4.1.1 创建一个maven项目

右键“Add Framework Support”==>勾选“scala”
右键“Mark Directory as ”–>”Source Root”

4.1.2 添加依赖

spark-streaming-kafka-0-10_2.12
version:3.0.0

4.1.3添加 log4j.properties

将log4j.properties文件添加到resources,就能更改打印日志级别为error

4.1.4 创建一个Scala的文件(object)

架构能力Kafka系列(六)外部系统的集成插图5

4.2Spark作为消费者

4.2.1添加依赖
spark-streaming-kafka-0-10_2.12 3.0.0
spark-core_2.12 3.0.0
spark-streaming_2.12 3.0.0

4.2.2 Spark消费者

架构能力Kafka系列(六)外部系统的集成插图6