kafka常用命令

本文基于kafka0.10.1.1版本介绍常用命令。

1. 集群启停

1
2
3
4
5
6
7
8
9
# 前台启动broker
$ bin/kafka-server-start.sh <path>/server.properties
# ctrl C关闭

# 后台启动broker
$ bin/kafka-server-start.sh -daemon <path>/server.properties

# 关闭broker
$ bin/kafka-server-stop.sh

2. topic相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 查看topic列表
$ bin/kafka-topics.sh --zookeeper localhost:2181 --list 或者
$ bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list

# 查看topic详情
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicname

# 创建topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 3 --topic topicname

# 删除topic
$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicname
# 注:不能真正删除topic,只是把这个topic标记为删除(marked for deletion),要彻底把topic删除必须把kafka中与当前topic相关的数据目录和zookeeper中与当前topic相关的路径一并删除

# 修改topic
$ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 6 --topic topicname

3. group相关

注意:查看consumer group列表有新、旧两种命令,分别查看新版(0.10以后,信息保存在broker中)consumer列表和老版(0.9之前,信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap–server和zookeeper参数。

1
2
3
4
5
6
7
8
9
10
11
12
# 查看指定group.id 的消费者消费情况, 包括积压
$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group groupid --describe

# 重设消费者组位移
# 最早处
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-earliest --execute
# 最新处
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-latest --execute
# 某个offset位置
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-offset 2000 --execute
# 调整到某个时间之后得最早位移
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group groupname --reset-offsets --all-topics --to-datetime 2019-09-15T00:00:00.000

4. producer和consumer

producer

1
2
3
4
5
6
7
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicname 
参数含义:
--compression-codec lz4 压缩类型
--request-required-acks all acks的值
--timeout 3000 linger.ms的值
--message-send-max-retries 10 retries的值
--max-partition-memory-bytes batch.size值

consumer

1
2
3
4
5
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning
# 指定groupid
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --consumer-property group.id=old-consumer-group
# 指定分区
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicname --from-beginning --partition 0
坚持原创技术分享,您的支持将鼓励我继续创作!

------本文结束 感谢您的阅读------