Kafka 4.0入门到熟练

打印 上一主题 下一主题

主题 1517|帖子 1517|积分 4551

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
1、集群摆设

1.1、JDK

  1. cat >> /etc/ansible/playbook/install-jdk.yml << EOF
  2. - hosts: cluster
  3.   remote_user: root
  4.   tasks:
  5.   - name: 分发JDK
  6.     copy: src=/opt/software/jdk-21.0.5_linux-x64_bin.tar.gz  dest=/opt/software
  7.   - name: 解压JDK
  8.     shell: tar -xvzf /opt/software/jdk-21.0.5_linux-x64_bin.tar.gz -C /usr/local/java
  9.   - name: 配置环境变量
  10.     blockinfile:
  11.       path: /etc/profile
  12.       block: |
  13.         export JAVA_HOME=/usr/local/java/jdk-21.0.5
  14.         export PATH=$JAVA_HOME/bin:$PATH
  15.       marker: "# {mark} JDK"
  16. EOF
复制代码
  1. ansible-playbook install-jdk.yml
复制代码
1.2、底子情况设置

  1. cat >> /etc/ansible/playbook/modify_env.yml << EOF
  2. - hosts: cluster
  3.   remote_user: root
  4.   tasks:
  5.     #设置主机名
  6.     - name: set hostname
  7.       shell: hostnamectl set-hostname {{hostname}}
  8.     - name: distribute hosts to nodes
  9.       copy:
  10.         src: /etc/hosts
  11.         dest: /etc
  12.     #关闭防火墙
  13.     - name: stop firewalld
  14.       service:
  15.         name: firewalld
  16.         state: stopped
  17.         enabled: no
  18.     #关闭selinux
  19.     - name: setenforce 0
  20.       shell: "setenforce 0"
  21.       failed_when: false
  22.     - name: set selinux disabled
  23.       replace:
  24.         path: /etc/selinux/config
  25.         regexp: '^SELINUX=enforcing'
  26.         replace: 'SELINUX=disabled'
  27.     - name: 设置最大文件句柄数
  28.       lineinfile:
  29.         path: /etc/security/limits.conf
  30.         insertafter: '### AFTER THIS LINE'
  31.         line: "{{ item }}"
  32.         state: present
  33.       with_items:
  34.         - '*       soft   noproc  65536'
  35.         - '*       hard   noproc  65536'
  36.         - '*       soft   nofile  131072'
  37.         - '*       hard   nofile  131072'
  38.         - '*       hard memlock unlimited'
  39.         - '*       soft memlock unlimited'
  40.     - name: 关闭 THP
  41.       lineinfile:
  42.         path: /etc/rc.local
  43.         line: |
  44.           echo never > /sys/kernel/mm/transparent_hugepage/enabled
  45.           echo never > /sys/kernel/mm/transparent_hugepage/defrag
  46.     - name: Change permissions
  47.       shell: chmod +x /etc/rc.d/rc.local
  48.     - name: 关闭swap
  49.       replace:
  50.         path: /etc/fstab
  51.         regexp: '^(\s*)([^#\n]+\s+)(\w+\s+)swap(\s+.*)$'
  52.         replace: '#\1\2\3swap\4'
  53.         backup: yes
  54.     - name: Disable SWAP
  55.       shell: |
  56.         swapoff -a
  57. EOF
复制代码
  1. ansible-playbook modify_env.yml
复制代码
1.3、免密

  1. cat >> /etc/ansible/playbook/ssh-pubkey.yml << EOF
  2. - hosts: cluster
  3.   gather_facts: no
  4.   remote_user: root
  5.   tasks:
  6.   - name: 免密登录
  7.     authorized_key:
  8.       user: root
  9.       key: "{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"
  10.       state: present
  11. EOF
复制代码
  1. ansible-playbook ssh-pubkey.yml
复制代码
1.4、摆设Kafka

  1. cat >> /etc/ansible/playbook/install-kafka.yml << EOF
  2. - hosts: cluster
  3.   remote_user: root
  4.   tasks:
  5.   - name: 分发 kafka
  6.     copy: src=/opt/software/kafka_2.13-4.0.0.tgz  dest=/opt/software
  7.   - name: 解压 kafka
  8.     shell: tar -xvzf /opt/software/kafka_2.13-4.0.0.tgz -C /opt/module/
  9.   - name: rename to kafka
  10.     shell: mv /opt/module/kafka_2.13-4.0.0 /opt/module/kafka
  11.   - name: 赋权starrocks
  12.     shell: chown -R starrocks:starrocks /opt/module/kafka
  13.   - name: 配置环境变量
  14.     blockinfile:
  15.       path: /home/starrocks/.bashrc
  16.       block: |
  17.         # kafka
  18.         export KAFKA_HOME=/opt/module/kafka
  19.         export PATH=$KAFKA_HOME/bin:$PATH
  20.       marker: "# {mark} KAFKA"
  21. EOF
复制代码
  1. ansible-playbook install-kafka.yml
复制代码
1.5、修改Kafka设置

vi /opt/module/kafka/config/server.properties
  1. ############################# Server Basics #############################
  2. # The role of this server. Setting this puts us in KRaft mode
  3. process.roles=broker,controller
  4. # The node id associated with this instance's roles
  5. ## 为不同kafka 服务分配不同的 id值,其他服务id随ip的递增而增加
  6. node.id=1
  7. # List of controller endpoints used connect to the controller cluster
  8. controller.quorum.bootstrap.servers=kylin-01:9093
  9. ## 接入集群的kafka 服务的ip 和 编号,端口号使用监控和管理端口 9093
  10. controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
  11. listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093
  12. # Name of listener used for communication between brokers.
  13. inter.broker.listener.name=PLAINTEXT
  14. # Listener name, hostname and port the broker or the controller will advertise to clients.
  15. # If not set, it uses the value for "listeners".
  16. advertised.listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093
  17. # This is required if running in KRaft mode.
  18. controller.listener.names=CONTROLLER
  19. # listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
  20. listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  21. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  22. num.network.threads=3
  23. # The number of threads that the server uses for processing requests, which may include disk I/O
  24. num.io.threads=8
  25. # The send buffer (SO_SNDBUF) used by the socket server
  26. socket.send.buffer.bytes=102400
  27. # The receive buffer (SO_RCVBUF) used by the socket server
  28. socket.receive.buffer.bytes=102400
  29. # The maximum size of a request that the socket server will accept (protection against OOM)
  30. socket.request.max.bytes=104857600
  31. ############################# Log Basics #############################
  32. # A comma separated list of directories under which to store log files
  33. log.dirs=/data/kafka/data
  34. # The default number of log partitions per topic. More partitions allow greater
  35. # parallelism for consumption, but this will also result in more files across
  36. # the brokers.
  37. num.partitions=1
  38. # This value is recommended to be increased for installations with data dirs located in RAID array.
  39. num.recovery.threads.per.data.dir=1
  40. ############################# Internal Topic Settings  #############################
  41. offsets.topic.replication.factor=1
  42. share.coordinator.state.topic.replication.factor=1
  43. share.coordinator.state.topic.min.isr=1
  44. transaction.state.log.replication.factor=1
  45. transaction.state.log.min.isr=1
  46. ############################# Log Flush Policy #############################
  47. # The number of messages to accept before forcing a flush of data to disk
  48. #log.flush.interval.messages=10000
  49. # The maximum amount of time a message can sit in a log before we force a flush
  50. #log.flush.interval.ms=1000
  51. ############################# Log Retention Policy #############################
  52. # The minimum age of a log file to be eligible for deletion due to age
  53. log.retention.hours=168
  54. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  55. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  56. #log.retention.bytes=1073741824
  57. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  58. log.segment.bytes=1073741824
  59. # The interval at which log segments are checked to see if they can be deleted according
  60. # to the retention policies
  61. log.retention.check.interval.ms=300000
  62. # 最大消息大小512MB
  63. message.max.bytes=536870912
  64. replica.fetch.max.bytes=536870912
  65. # 最大请求字节大小1GB
  66. max.request.size=1073741824
复制代码
1.6、修改其它节点设置

  1. node.id=2
  2. controller.quorum.bootstrap.servers=kylin-02:9093
  3. controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
  4. listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
  5. advertised.listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
复制代码
  1. node.id=3
  2. controller.quorum.bootstrap.servers=kylin-03:9093
  3. controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
  4. listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
  5. advertised.listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
复制代码
1.7、修改日志目录

vi bin/kafka-run-class.sh
  1. # Log directory to use
  2. # 修改日志目录
  3. LOG_DIR=/data/kafka/log
  4. if [ "x$LOG_DIR" = "x" ]; then
  5.   LOG_DIR="$base_dir/logs"
  6. fi
复制代码
1.8、初始化集群

生成存储目录唯一ID
  1. bin/kafka-storage.sh random-uuid
复制代码
格式化 kafka 存储目录(每个节点都必要执行)
  1. bin/kafka-storage.sh format -t m6BZb8yRSzmdNvW9kzOoQg -c config/server.properties
复制代码
1.9、启动集群

每个节点都执行启动服务命令
  1. bin/kafka-server-start.sh -daemon config/server.properties
复制代码
检察服务日志
  1. tail -f /data/kafka/log/server.log
复制代码
检察 kafka 节点状态
  1. bin/kafka-broker-api-versions.sh --bootstrap-server kylin-01:9092
复制代码
检察 Kafka 的端口监听状态
  1. netstat -tuln | grep 9092
复制代码
使用 ps 命令检察 Kafka 历程
  1. ps aux | grep kafka
复制代码
使用 top 或 htop 检察 Kafka 历程
  1. top -p <PID>
复制代码
2、操作命令

  1. ## 查看主题
  2. kafka-topics.sh --bootstrap-server kylin-01:9092 --list
  3. ## 查看主题明细
  4. kafka-topics.sh --bootstrap-server kylin-01:9092 --describe <topic-id>
  5. ## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
  6. kafka-topics.sh --bootstrap-server kylin-01:9092 --create --topic <topic-id> --partitions 5 --replication-factor 2
  7. ## 删除主题
  8. kafka-topics.sh --bootstrap-server kylin-01:9092 --delete --topic <topic-id>
  9. ## 查看消费者列表--list
  10. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --list
  11. ## 查看指定消费组详情
  12. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --group <group-id>
  13. ## 删除特定group
  14. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --delete --group <group-id>
  15. ## 打开一个生产者
  16. kafka-console-producer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>
  17. ## 打开一个消费者
  18. kafka-console-consumer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>  --consumer-property group.id=<group-id>  --from-beginning
  19. ## 查看所有消费组详情--all-groups
  20. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups
  21. 查询消费者成员信息--members
  22. ## 所有消费组成员信息
  23. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups --members
  24. ## 指定消费组成员信息
  25. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --members --group <group-id>
  26. ## 修改到最新offset
  27. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-latest --execute
  28. ## 预演-重设位移位置
  29. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --dry-run
  30. ## 重设位移位置
  31. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --execute
  32. ## 获取指定时间戳的offset
  33. kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kylin-01:9092 --topic <topic-id> --time 1740499200000
  34. ## topic扩容
  35. kafka-topic.sh  --bootstrap-server kylin-01:9092 --topic <topic-id> --alter --partitions 16
  36. ## 指定时间范围获取数据
  37. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print bash}'
  38. #
  39. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print $0}'
  40. #
  41. kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --partition 0 --offset 100 --property print.timestamp=true | grep '1027729757'
复制代码
3、API


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

八卦阵

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表