一、初识MQ
1.1 什么是MQ
Message Queue(MQ),消息队列中间件。很多⼈都说:MQ 通过将消息的发送和接收分离来实现应⽤程序的异步和解偶,这个给⼈的直觉是——MQ 是异步的,⽤来解耦的,但是这个只是 MQ 的效果⽽不是⽬的。
MQ 真正的⽬的是为了通讯,屏蔽底层复杂的通讯协议,定义了⼀套应⽤层的、更加简单的通讯协议。
⼀个分布式系统中两个模块之间通讯要么是HTTP,要么是⾃⼰开发的(rpc) TCP,但是这两种协议其实都是原始的协议。
HTTP 协议很难实现两端通讯——模块 A 可以调⽤ B,B 也可以主动调⽤ A,如果要做到这个两端都要背上WebServer,⽽且还不⽀持⻓连接(HTTP 2.0 的库根本找不到)。TCP 就更加原始了,粘包、⼼跳、私有的协议,想⼀想头⽪就发麻。
MQ 所要做的就是在这些协议之上构建⼀个简单的“协议”——⽣产者/消费者模型。MQ 带给我们的“协议”不是具体的通讯协议,⽽是更⾼层次通讯模型。它定义了两个对象——发送数据的叫⽣产者;接收数据的叫消费者, 提供⼀个SDK 让我们可以定义⾃⼰的⽣产者和消费者实现消息通讯⽽⽆视底层通讯协议
1.2 同步和异步通讯
微服务间通讯有同步和异步两种方式:
- 同步通讯:就像打电话,需要实时响应。
- 异步通讯:就像发邮件,不需要马上回复。
两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。
1.1.1 同步通讯
我们之前学习的Feign调用就属于同步方式
虽然调用可以实时得到结果,但存在下面的问题:
总结:
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
1.1.2 异步通讯
异步调用则可以避免上述问题:
举个栗子:
用户下单后应该立即响应订单创建成功,然后异步的开启一个线程去在数据库中创建订单、扣减库存、加积分等等
再举个栗子:
我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。
在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。
订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。
为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。
- 发布者发布事件到Broker,不关心谁来订阅事件。
- 订阅者从Broker订阅事件,不关心谁发来的消息。
Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。
好处:
- 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能
好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是我们今天要学习的MQ技术。
1.3 技术对比
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比:
ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
1.4 MQ的两种模式
- 点对点模式
- 发布/订阅模式
二、初识Kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
2.1 Kafka的使用场景
- 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
2.2 Kafka基本概念
kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是并没有完全遵循JMS规范。
首先,让我们来看一下基础的消息(Message)相关术语:
名称 | 解释 |
Broker | 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 |
Topic | Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic |
Producer | 消息生产者,向Broker发送消息的客户端 |
Consumer | 消息消费者,从Broker读取消息的客户端 |
ConsumerGroup | 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息 |
Partition | 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的 |
Replica(副本) | 一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower |
Leader | 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader |
Follower | 每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader |
服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
2.3 Topic与Partition
在Kafka中,Topic就是一个主题,生产者往topic里面发送消息,消费者从topic里面捞数据进行消费。
假设现在有一个场景,如果我们现在有100T的数据需要进行消费,但是现在我们一台主机上面并不能存储这么多数据该怎么办呢?
其实做法很简单,就是将海量的数据进行切割,并且在Topic中添加分区的概念,每一个分区都对应一台主机,并且存储切分到的数据
当然为了实现高可用,其实分区可以实现主从架构,这个后面再了解
这样做的好处是:
- 分区存储,可以解决一个topic中文件过大无法存储的问题
- 提高了读写的吞吐量,读写可以在多个分区中同时进行
三、Kafka基本使用
3.1 部署前的准备
- 安装jdk
yum install -y java-1.8.0-openjdk-devel.x86_64 \
&& (
cat <<EOF
#set java environment
JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME CLASSPATH PATH
EOF
) >> /etc/profile && source /etc/profile && java -version
- 安装zk(安装?安装什么?3.0之后kafka自带zookeeper好吧,这一步直接省略)
docker run -d \
-e TZ="Asia/Shanghai" \
-p 2181:2181 \
-v /home/docker/zookeeper/data:/data \
--name zookeeper \
--restart always zookeeper
- 官网下载kafka的压缩包:http://kafka.apache.org/downloads
这里使用 清华大学开源软件镜像站下载
mkdir /usr/local/kafka \
&& cd /usr/local/kafka \
&& wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.6.0/kafka_2.13-3.6.0.tgz \
&& tar -zvxf kafka_2.13-3.6.0.tgz \
&& rm -rf kafka_2.13-3.6.0.tgz
- 解压缩至如下路径
/usr/local/kafka/
- 修改配置文件:/usr/local/kafka/kafka_2.13-3.6.0/config/server.properties
注意:这里请不要填localhost:9092 ,localhost表示只能通过本机连接,可以设置为0.0.0.0或本地局域网地址
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://localhost:9092
#kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址,/kafka表示所有文件创建在/kafka下,便于管理
zookeeper.connect=localhost:2181/kafka
- 添加kafka环境变量
Linux将环境变量存在 /etc/profile 中,我们要添加新的环境变量要修改这个文件
使用命令 sudo vim /etc/profile 再输入此时登录用户的密码,进入此文件的编辑模式配置or使用以下命令配置
(
#KAFKA_HOME
export KAFKA_HOME=/usr/local/kafka/kafka_2.13-3.6.0
export PATH=$PATH:$KAFKA_HOME/bin
EOF
) >> /etc/profile && source /etc/profile
3.2 启动kafka服务器
在启动Kafka服务器之前成功启动Zookeeper服务器,并且它正在监听默认端口(2181)
进入到bin目录下,使用命令来启动
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
进入到bin目录下,使用命令来启动
./kafka-server-start.sh -daemon ../config/server.properties
验证是否启动成功:
方式一:
使用命令
#显示包含 "server.properties" 文件的进程信息
ps -aux | grep server.properties
可以看到非常多的信息,说明启动成功
方式二:
使用以下命令启动ZooKeeper客户端:
./zookeeper-shell.sh localhost:2181
进入到zk中的节点看id是0的broker有没有存在(上线)
ls /brokers/ids
server.properties核心配置详解:
Property | Default | Description |
broker.id | 0 | 每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。 |
listeners | PLAINTEXT://192.168.65.60:9092 | server接受客户端连接的端口,ip配置kafka本机ip即可 |
zookeeper.connect | localhost:2181 | zookeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样 |
num.partitions | 1 | 创建topic的默认分区数 |
default.replication.factor | 1 | 自动创建topic的默认副本数量,建议设置为大于等于2 |
min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常 |
delete.topic.enable | false | 是否允许删除主题 |
3.3 Kafka核心概念之Topic
在Kafka中,Topic是一个非常重要的概念,topic可以实现消息的分类,不同消费者订阅不同的topic
partition(分区)是kafka的一个核心概念,kafka将1个topic分成了一个或多个分区,每个分区在物理上对应一个目录
分区目录下存储的是该分区的日志段(segment),包括日志的数据文件和两个索引文件
执行以下命令创建名为test的topic,这个topic只有一个partition,并且备份因子也设置为1:
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1
查看当前kafka内有哪些topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list
3.4 发送消息
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。
在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
先丢几条消息进去试试
3.5 消费消息
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息
- 方式一:从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
- 方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
几个注意点:
- 消息会被存储
- 消息是顺序存储
- 消息是有偏移量的
- 消费时可以指明偏移量进行消费
3.6 消费者偏移量
在上面我们展示了两种不同的消费方式,根据偏移量消费和从头开始消费,其实这个偏移量可以我们自己进行维护
我们进入我们在server.properties里面配置的日志文件地址/usr/local/data/kafka-logs
我们可以看到默认一共有五十个偏移量地址,里面就记录了当前消费的偏移量。
我们先关注test-0这个文件
我们进入这个文件,可以看到其中有个log文件,里面就保存了Topic发送的数据
- 生产者将消息发送给broker,broker会将消息保存在本地的日志文件中
/usr/local/kafka/kafka-logs/主题-分区/00000000.log
- 消息的保存是有序的,通过offset偏移量来描述消息的有序性
- 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置
3.7 单播消息
我们现在假设有一个场景,有一个生产者,两个消费者,问:生产者发送消息,是否会同时被两个消费者消费?
我们可以实践一下
创建一个topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test2 --partitions 1
创建一个生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test2
分别在两个终端上面创建两个消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2
这里就要引申出一个概念:消费组,当我们配置多个消费者在一个消费组里面的时候,其实只会有一个消费者进行消费
这样其实才符合常理,毕竟一条消息被消费一次就够了
我们可以通过命令--consumer-property group.id=testGroup在设置消费者时将其划分到一个消费组里面
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test2
这个时候,如果消费组里面有一个消费者挂掉了,就会由其他消费者来进行消费
小结一下:两个消费者在同一个组,只有一个能接到消息,两个在不同组或者未指定组则都能收到
3.8 多播消息
当多个消费组同时订阅一个Topic时,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息
// 消费组1
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test2
// 消费组2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test2
3.9 查看消费组的详细信息
通过以下命令可以查看到消费组的详细信息:
# 查看当前所有的消费组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看指定消费组具体信息,比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
3.10 创建分区
我们在上面已经了解了Topic与Partition的概念,现在我们可以通过以下命令给一个topic创建多个分区
# 创建两个分区的主题
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test3 --partitions 2
# 查看下创建的topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list
现在我们再进到日志文件中看一眼,可以看到日志是以分区来命名的
3.11 分区细节
我们知道分区文件中
- 00000.log: 这个文件中保存的就是消息
- __consumer_offsets-49:
kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题:consumer_offsets。
因此kafka为了提升这个主题的并发性,默认设置了50个分区。
-
- 提交到哪个分区:通过hash函数:hash(consumerGroupId) % __consumer_offsets主题的分区数
- 提交到该主题中的内容是:key是consumerGroupId + topic + 分区号,value就是当前offset的值
- 文件中保存的消息,默认保存7天。七天到后消息会被删除。
3.12 docker一键部署kafka
docker-compose -f docker-compose-kafka.yml -p kafka up -d
version: '3'
services:
# 可以不单独创建
zookepper:
image: wurstmeister/zookeeper # 原镜像`wurstmeister/zookeeper`
container_name: zookeeper # 容器名为'zookeeper'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "/etc/localtime:/etc/localtime"
ports: # 映射端口
- "2181:2181"
kafka:
image: wurstmeister/kafka # 原镜像`wurstmeister/kafka`
container_name: kafka # 容器名为'kafka'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "/etc/localtime:/etc/localtime"
environment: # 设置环境变量,相当于docker run命令中的-e
KAFKA_ADVERTISED_HOST_NAME: localhost # TODO 本机IP,请输入网卡ip,而不是回环口ip
KAFKA_ADVERTISED_PORT: 9092 # 端口
KAFKA_BROKER_ID: 0 # 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 # TODO 将kafka的地址端口注册给zookeeper
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 配置kafka的监听端口
KAFKA_ZOOKEEPER_CONNECT: localhost:2181 # TODO zookeeper地址
KAFKA_CREATE_TOPICS: "hello_world"
ports: # 映射端口
- "9092:9092"
depends_on: # 解决容器依赖启动先后问题(先启动zookeeper)
- zookepper
kafka-manager:
image: sheepkiller/kafka-manager # 原镜像`sheepkiller/kafka-manager`
container_name: kafka-manager # 容器名为'kafka-manager'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
environment: # 设置环境变量,相当于docker run命令中的-e
ZK_HOSTS: localhost:2181 # TODO zookeeper地址
APPLICATION_SECRET: zhengqing
KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启kafka-manager权限校验
KAFKA_MANAGER_USERNAME: admin # 登陆账户
KAFKA_MANAGER_PASSWORD: 123456 # 登陆密码
ports: # 映射端口
- "9000:9000"
depends_on: # 解决容器依赖启动先后问题
- kafka
可以不单独创建zookeeper
version: '3'
services:
kafka:
image: wurstmeister/kafka # 原镜像`wurstmeister/kafka`
container_name: kafka # 容器名为'kafka'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "/etc/localtime:/etc/localtime"
environment: # 设置环境变量,相当于docker run命令中的-e
KAFKA_ADVERTISED_HOST_NAME: localhost # TODO 本机IP
KAFKA_ADVERTISED_PORT: 9092 # 端口
KAFKA_BROKER_ID: 0 # 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 # TODO 将kafka的地址端口注册给zookeeper
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 配置kafka的监听端口
KAFKA_ZOOKEEPER_CONNECT: localhost:2181 # TODO zookeeper地址
KAFKA_CREATE_TOPICS: "hello_world"
ports: # 映射端口
- "9092:9092"
kafka-manager:
image: sheepkiller/kafka-manager # 原镜像`sheepkiller/kafka-manager`
container_name: kafka-manager # 容器名为'kafka-manager'
restart: unless-stopped # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
environment: # 设置环境变量,相当于docker run命令中的-e
ZK_HOSTS: localhost:2181 # TODO zookeeper地址
APPLICATION_SECRET: zhengqing
KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启kafka-manager权限校验
KAFKA_MANAGER_USERNAME: admin # 登陆账户
KAFKA_MANAGER_PASSWORD: 123456 # 登陆密码
ports: # 映射端口
- "9100:9000"
depends_on: # 解决容器依赖启动先后问题
- kafka
四、Kafka命令
我们来将命令汇总总结一下
4.1 Topic相关命令
在上面我们简单使用kafka后,我们来小结一下kafka中的命令,其实主要有三类:
- topic命令:对应脚本kafka-topics.sh
- 生产者命令:对应脚本kafka-console-producer.sh
- 消费者命令:对应脚本kafka-console-consumer.sh
首先我们想要的所有命令都可以通过sh kafka-topics.sh看到,主要的命令有:
参数 | 描述 |
–bootstrap-server <String: server to connect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
–create | 创建主题 |
–delete | 删除主题 |
–alter | 修改主题 |
–list | 查看所有主题 |
–describe | 查看主题详细描述 |
–partitions <Integer: # of partitions> | 设置分区数 |
–replication-factor <Integer: replication factor> | 设置分区副本 |
–config <String: name=value> | 更新系统默认的配置 |
4.2 producer相关命令
参数 | 描述 |
–bootstrap-server <String: server to connect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
4.3 consumer相关命令
参数 | 描述 |
–bootstrap-server <String: server to connect to> | 连接的 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
–from-beginning | 从头开始消费 |
–group <String: consumer group id> | 指定消费者组名称 |
五、Kafka集群
5.1 伪分布式搭建
通过三个不同的端口来搭建
- 创建三个server.properties文件
# 0 1 2
broker.id=2
# 9092 9093 9094
listeners=PLAINTEXT://192.168.65.60:9094
# kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2
- 通过命令来启动三台broker(单独在三个窗口启动)
sh kafka-server-start.sh -daemon ../config/server.properties
sh kafka-server-start.sh -daemon ../config/server2.properties
sh kafka-server-start.sh -daemon ../config/server3.properties
- 校验是否启动成功
进入到zk中查看/brokers/ids中过是否有三个znode(0,1,2)
5.2 分布式搭建
首先我们要有三台主机
主机名 | IP |
liang | 172.16.1.7 |
dd1 | 172.16.1.4 |
dd2 | 172.16.1.12 |
将上面的主机信息分别配置到每台机器的/etc/hosts目录下
172.16.1.7 liang
172.16.1.4 dd1
172.16.1.12 dd2
修改主机名
vi /etc/hostname
依次填入对应的主机,使用bash立即生效
免密钥设置
这一步非常重要,如果不设置后面集群通信会失败
我们先产生本机的RSA密钥
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
密钥产生后会出现在 ~/.ssh/id_rsa目录中
解释一下这三个文件:
- authorized_keys:就是为了让不同机器之间使用ssh不需要用户名和密码。采用了数字签名RSA或者DSA来完成这个操作,我们只需要将其他机器的id_rsa.pub放到此目录下,其他机器ssh访问本机器时就不需要账号密码了
- id_rsa:即RSA算法生成的私钥
- id_rsa.pub:即RSA算法生成的公钥(上面有密钥和最后面的用户标识)
我们需要将每台机器上的公钥添加到其他主机的authorized_keys中
将第一台主机上的kafka传给其他两台主机,反撇号加pwd表示传到对应主机的当前目录下
scp -r kafka/ dd2:`pwd`
scp -r kafka/ dd1:`pwd`
5.3 修改kafka配置
我们需要在每台机器上修改一下配置文件
#这里的id不能重复
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://liang:9092
#kafka的消息存储文件
log.dir=/usr/local/kafka/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=liang:2181
我们写一个脚本来批量启动kafka
#! /bin/bash
case $1 in
"start"){
for i in liang dd1 dd2
do
echo " --------启动 $i Kafka-------"
ssh $i "sh /usr/local/kafka/kafka_2.13-2.8.2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.13-2.8.2/config/server.properties"
done
};;
"stop"){
for i in liang dd1 dd2
do
echo " --------停止 $i Kafka-------"
ssh $i "sh /usr/local/kafka/kafka_2.13-2.8.2/bin/kafka-server-stop.sh"
done
};;
esac
我们在zk中已经可以看到三台kafka上线了
5.4 副本的概念
在创建主题时,除了指明了主题的分区数以外,还指明了副本数,那么副本是一个什么概念呢?
我们现在创建一个主题、两个分区、三个副本的topic(注意:副本只有在集群下才有意义)
./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create --topic my-replicated-topic \
--partitions 2 \
--replication-factor 3
描述:
sh kafka-topics.sh \
# 指定启动的机器
--bootstrap-server localhost:9092 \
# 创建一个topic
--create --topic my-replicated-topic \
# 设置分区数为1
--partitions 2 \
# 设置副本数为3
--replication-factor 3
注意:
“./ kafka-topics.sh” 只能在当前工作目录中执行脚本,而 “sh kafka-topics.sh” 可以在任何位置执行脚本,只要提供了脚本的正确路径
另外可以将脚本的路径添加到系统的环境变量中,这样就可以在任何位置直接使用脚本的文件名来执行它。
我们查看一下分区的详细信息
# 查看topic情况
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
- leader
kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader
- follower
接收leader的同步的数据
- isr
可以同步和已同步的节点会被存入到isr集合中。这里有一个细节:如果isr中的节点性能较差,会被提出isr集合。
此时,broker、主题、分区、副本 这些概念就全部展现了
- 集群中有多个broker
- 创建主题时可以指明主题有多个分区(把消息拆分到不同的分区中存储)
- 可以为分区创建多个副本,不同的副本存放在不同的broker里
5.5 集群消费
5.5.1 向集群发送消息
./kafka-console-producer.sh --broker-list node1:9092,node1:9093,node1:9094 --topic my-replicated-topic
5.5.2 从集群中消费消息
# 伪分布式
./kafka-console-consumer.sh --bootstrap-server node1:9092,node1:9093,node1:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic
# 分布式
./kafka-console-consumer.sh --bootstrap-server liang:9092,dd1:9092,dd2:9092 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic
5.5.3 指定消费组来消费消息
./kafka-console-consumer.sh --bootstrap-server node1:9092,node1:9093,node1:9094 --from-beginning --consumer-property group.id=testGroup1 --topic my-replicated-topic
这里有一个细节,结合上面的单播消息我们很容易可以想到下面的这种情况,因为一个Partition只能被一个consumer Group里面的一个consumer,所有很容易就可以形成组内单播的现象,即:
- 多Partition与多consumer一一对应
- 这样的好处是:
- 分区存储,可以解决一个topic中文件过大无法存储的问题
- 提高了读写的吞吐量,读写可以在多个分区中同时进行
Kafka这种通过分区与分组进行并行消费的方式,让kafka拥有极大的吞吐量
小结一下:
- 一个partition只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性,但是多个partion的多个消费者消费的总的顺序性是得不到保证的,那怎么做到消费的总顺序性呢?这个后面揭晓答案
- partition的数量决定了消费组中消费者的数量,建议同一个消费组中消费者的数量不要超过partition的数量,否则多的消费者消费不到消息
- 如果消费者挂了,那么会触发rebalance机制(后面介绍),会让其他消费者来消费该分区
- kafka通过partition 可以保证每条消息的原子性,但是不会保证每条消息的顺序性
六、Kafka API基本使用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
6.1 生产者核心概念
在消息发送的过程中,涉及到了两个线程
- main 线程
- Sender 线程
在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator, Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker
在main线程中,消息的生产,要经历拦截器、序列化器和分区器,其中一个分区就会创建一个队列,这样方便数据的管理
其中队列默认是32M,而存放到队列里面的数据也会经过压缩为16k再发往send线程进行发送,但是这样也会有问题,就是如果只有一条消息,难道就不发送了吗?其实还有一个参数linger.ms,用来表示一条消息如果超过这个时间就会直接发送,不用管大小,其实可以类比坐车的场景,人满或者时间到了 都发车
send线程发送给kafka集群的时候,我们需要联系到上面的Topic与Partition已经消费组,形成一个Partition对应consumer Group里面的一个consumer这种组内单播的效果,进行并发读写
6.2 生产者代码编写
这里我们用了上面集群状态下创建的分区my-replicated-topic
这里如果显示连接失败,可以看一下配置文件里面的listeners=PLAINTEXT://host:9092 是不是写了localhost
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
*
* @author Eureka
* @since 2022/10/23 23:03
*/
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 设置参数
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.2:9092,192.168.2.2:9093,192.168.2.2:9094");
// 把发送的key从字符串序列化为字节数组,这里不采用jdk的序列化,而是自定义序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2. 创建生产消息的客户端,传入参数
Producer<String, String> producer = new KafkaProducer<>(props);
// 3.创建消息
// key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka");
//4. 发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
6.2.1 同步发送
我们在上面代码中是这样发送消息的
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
可以看到消息发出后有一个get(),其实这里有一个过程,就是Broker需要在收到消息后回复一个ACK表示确认收到
如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数3次
这里的应答ack有三个取值
- 0:生产者发送过来的数据,不需要应答
- 1:生产者发送过来的数据,Leader收到数据后会应答
- -1(all),生产者发送过来的数据,Leader和ISR队列里面所有的结点收齐数据后应答,-1与
6.2.2 异步发送
异步发送的代码如下
//5.异步发送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息失败:" + exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
如果我们直接执行,是看不到异步回调代码执行的,我们需要让主线程暂停下来
CountDownLatch countDownLatch = new CountDownLatch(1);
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
System.err.println("发送消息失败:" + Arrays.toString(exception.getStackTrace()));
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
countDownLatch.countDown();
});
countDownLatch.await();
观察结果,这样确实是进行异步回调了
6.2.3 生产者中的ack的配置
在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢?此时ack有3个配置:
- ack = 0 kafka-cluster不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢消息的,效率是最高的
- ack=1(默认): 多副本之间的leader已经收到消息,并把消息写入到本地的log中,才会返回ack给生产者,性能和安全性是最均衡的s
- ack=-1/all。里面有默认的配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完后,才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但性能最差。
下面是关于ack和重试(如果没有收到ack,就开启重试)的配置
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在
接收者那边做好消息接收的幂等性处理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
6.3 关于消息发送的缓冲区
- kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- kafka本地线程会去缓冲区中一次拉16k的数据,发送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- 如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
七、Java客户端消费者的实现细节
7.1 消费者的基本实现
package com.qf.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MySimpleConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//1.创建一个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//2. 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* 3.poll() API 是拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
7.2 关于消费者自动提交和手动提交offset
1)提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的某个主题+消费的某个分区及消费的偏移量,这样的信息提交到集群的_consumer_offsets主题里面。
2)自动提交
消费者poll消息下来以后就会自动提交offset
// 是否自动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
注意:自动提交会丢消息。因为消费者将消息poll下来后可能还没来得及进行消费就挂了,但是ack是poll完消息就提交了,所以会丢消息
3)手动提交
需要把自动提交的配置改成false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交又分成了两种:
- 手动同步提交
在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑
while (true) {
/*
* poll() API 是拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
//所有的消息已消费完
if (records.count() > 0) {//有消息
// 手动同步提交offset,当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
consumer.commitSync();//=======阻塞=== 提交成功
}
}
}
- 手动异步提交
在消息消费完后提交,不需要等到集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
while (true) {
/*
* poll() API 是拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
//所有的消息已消费完
if (records.count() > 0) {
// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});
}
}
}
7.3 长轮询poll消息
- 默认情况下,消费者一次会poll500条消息。
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
- 代码中设置了长轮询的时间是1000毫秒
while (true) {
/*
* poll() API 是拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
意味着:
- 如果一次poll到500条,就直接执行for循环
- 如果这一次没有poll到500条。且时间在1秒内,那么长轮询继续poll,要么到500条,要么到1s
- 如果多次poll都没达到500条,且1秒时间到了,那么直接执行for循环
如果两次poll的间隔超过30s,集群会认为该消费者的消费能力过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少一点
// 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
我们可以想一想为什么kafka要这么做,这个其实和之前的send消息的时候一样,send消息的时候我们也有两个参数batch.size和linger.ms,当我们要发送的数据达到16KB或者超过linger.ms时间才会把消息发送出去
这里消费者消费消息也是同理,通过长轮询poll消息,保证每次处理的消息默认至少为500条,这样都是为了增加吞吐量
总结一下过程:
- 消费者建立了与broker之间的长连接,开始poll消息。
- 默认一次poll500条消息(如果消费能力弱可以设置小一点,防止被踢出集群)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
可以根据消费速度的快慢来设置,因为如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。
可以通过这个值进行设置:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
- 如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次长轮询结束。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- 消费者发送心跳的时间间隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
- kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
7.4 消费者的健康状态检查
消费者每隔1s向kafka集群发送心跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费组的rebalance机制,将该分区交给消费组里的其他消费者进行消费。
//consumer给broker发送心跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
7.5 指定分区和偏移量、时间消费
- 指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
- 从头消费(回溯消费)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
- 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
- 指定时间消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
System.out.println();
//根据消费里的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
7.6 新消费组的消费offset规则
新消费组中的消费者在启动以后,默认会从当前分区的最后一条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
- Latest:默认的,消费新消息
- earliest:第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
八、Springboot中使用Kafka
8.1 引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
8.2 编写配置文件
server:
port: 8080
spring:
kafka:
bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384 # 每次发送时多少一批次 这里设置的是16kb
buffer-memory: 33554432 # 设置内存缓存区32Mb
acks: 1 # leader收到消息后就返回ack
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group # 组内单播,组间广播
enable-auto-commit: false # 关闭消费自动提交
auto-offset-reset: earliest # 新消费组启动会从头信息消费
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500 # 每次长轮询拉取多少条消息
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
redis:
host: 172.16.253.21
8.3 编写消息生产者
package com.qf.kafka.spring.boot.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/msg")
public class MyKafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/send")
public String sendMessage(){
kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
return "send success!";
}
}
8.4 编写消费者
package com.qf.kafka.spring.boot.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
ack.acknowledge();
}
}
这里细心的同学可能会注意到,我们在手动进行poll消息的时候,是拿到一批消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)),但是上面的代码好像并没有去拿一批记录,这样是简写的写法,其实也可以拿一批记录
只是这样我们需要for循环去处理消息,不太优雅,所以可以交给spring给我们循环消息,我们专注处理一条消息即可
8.5 消费者中配置消费主题、分区和偏移量
@KafkaListener(groupId = "testGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
},concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议小于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
ack.acknowledge();
}
九、kafka集群中的controller、rebalance、HW
9.1 controller
什么是controller呢?其实就是集群中的一个broker,当集群中的leader挂掉时需要controller来组织进行选举
那么集群中谁来充当controller呢?
每个broker启动时会向zk创建一个临时序号节点,获得的序号最小的那个broker将会作为集群中的controller,负责这么几件事:
- 当集群中有一个副本的leader挂掉,需要在集群中选举出一个新的leader,选举的规则是从isr集合中最左边获得
- 当集群中有broker新增或减少,controller会同步信息给其他broker
- 当集群中有分区新增或减少,controller会同步信息给其他broker
9.2 rebalance机制
- 前提:消费组中的消费者没有指明分区来消费
- 触发的条件:当消费组中的消费者和分区的关系发生变化的时候
- 分区分配的策略:在rebalance之前,分区怎么分配会有这么三种策略
- range:根据公式计算得到每个消费者消费哪几个分区:第一个消费者是(分区总数 / 消费者数量 )+ 1,之后的消费者是分区总数/消费者数量(假设 n=分区数/消费者数量 = 2, m=分区数%消费者数量 = 1,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区)
- 轮询:大家轮着来
- sticky:粘合策略,如果需要rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要进行全部的重新分配。建议开启
9.3 HW和LEO
LEO是某个副本最后消息的消息位置(log-end-offset)
HW是已完成同步的位置。消息在写入broker时,且每个broker完成这条消息的同步后,hw才会变化。在这之前消费者是消费不到这条消息的。在同步完成之后,HW更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失。
十、Kafka中的优化问题
10.1 如何防止消息丢失
- 生产者:
1)使用同步发送
2)把ack设成1或者all,并且设置同步的分区数>=2
- 消费者:把自动提交改成手动提交
10.2 如何防止重复消费
在防止消息丢失的方案中,如果生产者发送完消息后,因为网络抖动,没有收到ack,但实际上broker已经收到了。
此时生产者会进行重试,于是broker就会收到多条相同的消息,而造成消费者的重复消费。
怎么解决:
- 生产者关闭重试:会造成丢消息(不建议)
- 消费者解决非幂等性消费问题:
所谓的幂等性:多次访问的结果是一样的。对于rest的请求(get(幂等)、post(非幂等)、put(幂等)、delete(幂等))
解决方案:
-
- 在数据库中创建联合主键,防止相同的主键 创建出多条记录
- 使用分布式锁,以业务id为锁。保证只有一条记录能够创建成功
10.3 如何做到消息的顺序消费
其实我们知道在发送消息的时候我们可以通过设置key来指定发送的分区,所以首先我们一定要指定key然后发到同一个分区
- 生产者:使用同步的发送,并且通过设置key指定路由策略,只发送到一个分区中;ack设置成非0的值。
- 消费者:主题只能设置一个分区,消费组中只能有一个消费者;不要设置异步线程防止异步导致的乱序,或者设置一个阻塞队列进行异步消费
kafka的顺序消费使用场景不多,因为牺牲掉了性能,但是比如rocketmq在这一块有专门的功能已设计好。
10.4 如何解决消息积压问题
1)消息积压问题的出现
消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致kafka中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个kafka对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩。
2)消息积压的解决方案
- 在这个消费者中,使用多线程,充分利用机器的性能进行消费消息。
- 通过业务的架构设计,提升业务层面消费的性能。
- 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度
- 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者。该消费者将poll下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。——不常用
10.5 实现延时队列的效果
1)应用场景
订单创建后,超过30分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现
2)具体方案
- kafka中创建创建相应的主题
- 消费者消费该主题的消息(轮询)
- 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没支付)
- 如果是:去数据库中修改订单状态为已取消
- 如果否:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次向kafka拉取该offset及之后的消息,继续进行判断,以此反复。
十一、Kafka-eagle监控平台
11.1 搭建
- 去kafka-eagle官网下载压缩包
http://download.kafka-eagle.org/
- 分配一台虚拟机
- 虚拟机中安装jdk
- 解压缩kafka-eagle的压缩包
- 给kafka-eagle配置环境变量
export KE_HOME=/usr/local/kafka-eagle
export PATH=$PATH:$KE_HOME/bin
- 需要修改kafka-eagle内部的配置文件:vim system-config.properties
修改里面的zk的地址和mysql的地址
- 进入到bin中,通过命令来启动
./ke.sh start
11.2 平台的使用
文章出处:https://blog.csdn.net/qq_56265207/article/details/133970553
文章评论