一、Flume和Kafka集成
1.1Flume作为生产者
1.1.1启动Zookeeper和Kafka集群
1.1.2启动kafka消费者
shell>cd /data/app/kafka
shell>bin/kafka-console-comsumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName
1.1.3安装Flume
1.1.4修改配置文件
shell>cd /data/app/flume
shell>vim conf/log4j.properties
###修改flume.log.dir
flume.log.dir=/data/app/flume/logs
1.1.5编写Flume文件
shell>cd /data/app/flume
shell>mkdir jobs
shell>vim jobs/file_to_kafka.conf
file_to_kafka.conf
#1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/app/kafka/applog/app.*
a1.sources.r1.positionFile = /data/app/flume/taildir_position.json
#3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#4 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.asks = 1
a1.sinks.k1.kafka.producer.linger.ms =1
#5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.1.6启动Flume
shell>cd /data/app/flume
shell>bin/flume0ng agent -c conf/ -n al -f jobs/file_to_kafka.conf &
1.1.7 测试
shell>cd /data/app/kafka/applog
shell>vim app1
###content 输入hello,再kafka的消费者控制台可以看到输出
1.2 Flume作为消费者
1.2.1 启动Zookeeper、kafka和flume
1.2.2 编写Flume启动配置文件
shell>cd /data/app/flume
shell>mkdir jobs
shell>vim jobs/kafka_to_file.conf
shell>bin/flume-ng agent -c conf/ -n a1 -f jobs/kafka_to_file.conf -Dflume.root.logger=INFO,console
#1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 配置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSink
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
#3 配置channel
a1.channel.c1.type = memory
a1.channel.c1.capacity = 1---
a1.channels.c1.transactionCapacity = 100
#4 配置sink
a1.sinksk1.type = logger
#5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1.2.3启动Kafka生产者
shell>cd /data/app/flume
shell>bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topicName
然后在kafka输入消息,会在flume的控制台输出
二、Flink和Kafka集成
2.1Flink作为生产者发送到Kafka
2.1.1 创建JAVA-maven项目
2.1.2 引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
2.1.3将log4j.properties文件添加到resources里
log4j.rootLogger=error,stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:5L) : %m%n
log4j.appender.R=org.apachelog4j.RollingFileAppender
log4j.appender.R.File = ../log/agent.log
log4j.appender.R.MaxFileSize = 1024KB
log4j.appender.R.MaxBackupIndex = 1
log4j.appender.R.layout = org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:6L) : %m%n
2.1.4 创建Flinke生产者类
package flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.ArrayList;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
//1获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//三个节点则参数为3
env.setParallelism(3);
//2准备数据源
ArrayList<String> wordlist = new ArrayList<>();
wordlist.add("hello");
wordlist.add("fox");
DataStreamSource<String> stream = env.fromCollection(wordlist);
//创建一个Kafka 生产者
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
//设置监听主题
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("first",new SimpleStringSchema(),properties);
//3 添加数据源 kafka 生产者
stream.addSink(kafkaProducer);
//4 执行代码
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.1.5 开启Kafka消费者模式验证
shell>cd /data/app/kafka
shell>bin/kafka-console-comsumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic first
2.2 Flink作为Kafka的消费者
package flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
//1获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//三个节点则参数为3
env.setParallelism(3);
//2创建一个消费者
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//设置监听主题
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("first",new SimpleStringSchema(),properties);
//3 关联消费者和 flink 流
env.addSource(kafkaConsumer).print();
//4 执行代码
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、SpringBoot和Kafka集成
3.1创建一个SpringBoot应用
https://juejin.cn/post/6844903791603499016#heading-12
四、Kafka集成Spark
4.1Spark作为生产者
4.1.1 创建一个maven项目
右键“Add Framework Support”==>勾选“scala”
右键“Mark Directory as ”–>”Source Root”
4.1.2 添加依赖
spark-streaming-kafka-0-10_2.12
version:3.0.0
4.1.3添加 log4j.properties
将log4j.properties文件添加到resources,就能更改打印日志级别为error