消息发布和消息消费
server.properties配置文件说明:
broker.id=0 #kafka实例节点唯一标识
num.network.threads=3 #broker处理消息的最大线程数,一般是cpu的核数
num.io.threads=8 #broker处理磁盘io的线程数
socket.send.buffer.bytes=102400 #socket的一个发送缓冲区大小
socket.receive.buffer.bytes=102400 #socket的一个接入缓冲区大小
socket.request.max.bytes=104857600 #socket请求的最大数值
log.dirs=/tem/kafka-logs #log文件地址
num.partitions=1 #指定一个新创建的topic会包含的那些分区的一个默认值
num.recovery.threads.per.data.dir=1 #配置kafka用于自身恢复的一些机制,默认情况下,每个文件夹下就是1个线程
#log.flush.interval.messages=1000 #当消息有1000条时就刷新到磁盘上
#log.flush.interval.ms=1000 #每1000秒时就把消息刷新到磁盘
log.retention.hours=168 #过期时间,日志数据保存的一个最大时间,默认是7天
#log.retention.bytes=1073741824 #日志保存的最大字节数
log.retention.check.interval.ms=300000 #日志片段检查周期
kafka存放在磁盘上,可以配置多个目录
socket发送缓冲区、socket接入缓冲区;socket请求最大数值,防止OOM溢出
broka处理消息的最大进程数和处理IO的最大进程数
kafka安装依赖Scala
启动kafka: bin/kafka-server-start.sh config/server.properties
kafka topic操作界面:bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看topic情况:bin/kafka-topics.sh --list --bootstrap-server localhost:9092
打开producer开始push消息:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
打开consumer开始消费消息: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
server.properties配置文件说明:
broker.id=0 #kafka实例节点唯一标识
num.network.threads=3 #broker处理消息的最大线程数,一般是cpu的核数
num.io.threads=8 #broker处理磁盘io的线程数
socket.send.buffer.bytes=102400 #socket的一个发送缓冲区大小
socket.receive.buffer.bytes=102400 #socket的一个接入缓冲区大小
socket.request.max.bytes=104857600 #socket请求的最大数值
log.dirs=/tem/kafka-logs #log文件地址
num.partitions=1 #指定一个新创建的topic会包含的那些分区的一个默认值
num.recovery.threads.per.data.dir=1 #配置kafka用于自身恢复的一些机制,默认情况下,每个文件夹下就是1个线程
#log.flush.interval.messages=1000 #当消息有1000条时就刷新到磁盘上
#log.flush.interval.ms=1000 #每1000秒时就把消息刷新到磁盘
log.retention.hours=168 #过期时间,日志数据保存的一个最大时间,默认是7天
#log.retention.bytes=1073741824 #日志保存的最大字节数
log.retention.check.interval.ms=300000 #日志片段检查周期