注意:
新版Kafka(3.0以上版本)已经集成了zookeeper,不需要安装zookeeper了(当然也可以独立安装)
1.下载
https://kafka.apache.org/downloads
下载版本为:
kafka_2.13-3.6.0.tgz
2.解压
tar -zxvf kafka_2.13-3.6.0.tgz
3.创建自定义目录、日志目录及zookeeper数据目录
mkdir /opt/kafka
mkdir /opt/kafka/logs/kafka
mkdir /opt/kafka/logs/zookeeper
mkdir /opt/kafka/data/zookeeper
mv /home/install-package/kafka_2.13-3.6.0 /opt/kafka
4.编辑配置文件
cd /opt/kafka/kafka_2.13-3.6.0/config
vim zookeeper.properties
zookeeper.properties修改内容:
dataDir=/opt/kafka/data/zookeeper # 数据保存目录
dataLogDir=/opt/kafka/logs/zookeeper # 新增配置,日志保存目录
vim server.properties
server.properties修改内容:
listeners=PLAINTEXT://:9092 # 放开注释
log.dirs=/opt/kafka/logs/kafka # 日志保存目录
5.启动zookeeper
cd /opt/kafka/kafka_2.13-3.6.0/bin
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
6.启动kafka
cd /opt/kafka/kafka_2.13-3.6.0/bin
./kafka-server-start.sh -daemon ../config/server.properties
7.检查是否启动
命令:
jps
查看是否有QuorumPeerMain及Kafka,QuorumPeerMain为zookeeper
8.生产消息(创建名为testTopic的队列)
cd /opt/kafka/kafka_2.13-3.6.0/bin
./kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
9.消费消息(重开一个终端监听testTopic队列的消息,在生产消息的终端发消息,此终端收消息)
cd /opt/kafka/kafka_2.13-3.6.0/bin
./kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
10.注意事项
kafka配置文件server.properties属性描述:
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://:9092 #当前kafka对外提供服务的端口默认是9092
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/logs/kafka #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多
个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
min.insync.replicas=1 #当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
delete.topic.enable=false #是否允许删除主题
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
zookeeper.connect=localhost:2181#设置zookeeper的连接端口
后台启动:
nohup /opt/kafka/kafka_2.13-3.6.0/bin/kafka-server-start.sh /opt/kafka/kafka_2.13-3.6.0/config/server.properties 1>/dev/null 2>&1 &
11.常用命令
创建名为test的topic,这个topic只有一个partition,并且备份因子也设置为1:
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1
查看当前kafka内有哪些topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list
发送消息
kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。
在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic。
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费消息
对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用kafka的消费者消息的客户端,从指定kafka服务器的指定topic中消费消息
方式一:从最后一条消息的偏移量+1开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
方式二:从头开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
删除topic
./kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
注意点:
消息会被存储
消息是顺序存储
消息是有偏移量的
消费时可以指明偏移量进行消费
12.常见问题
1.kafka日志kafkaServer.out报错KeeperErrorCode=NodeExists
解决办法:清空kafka安装目录下logs文件夹里的所有日志文件
文章评论