马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
1、集群摆设
1.1、JDK
- cat >> /etc/ansible/playbook/install-jdk.yml << EOF
- - hosts: cluster
- remote_user: root
- tasks:
- - name: 分发JDK
- copy: src=/opt/software/jdk-21.0.5_linux-x64_bin.tar.gz dest=/opt/software
- - name: 解压JDK
- shell: tar -xvzf /opt/software/jdk-21.0.5_linux-x64_bin.tar.gz -C /usr/local/java
- - name: 配置环境变量
- blockinfile:
- path: /etc/profile
- block: |
- export JAVA_HOME=/usr/local/java/jdk-21.0.5
- export PATH=$JAVA_HOME/bin:$PATH
- marker: "# {mark} JDK"
- EOF
复制代码- ansible-playbook install-jdk.yml
复制代码 1.2、底子情况设置
- cat >> /etc/ansible/playbook/modify_env.yml << EOF
- - hosts: cluster
- remote_user: root
- tasks:
- #设置主机名
- - name: set hostname
- shell: hostnamectl set-hostname {{hostname}}
- - name: distribute hosts to nodes
- copy:
- src: /etc/hosts
- dest: /etc
- #关闭防火墙
- - name: stop firewalld
- service:
- name: firewalld
- state: stopped
- enabled: no
- #关闭selinux
- - name: setenforce 0
- shell: "setenforce 0"
- failed_when: false
- - name: set selinux disabled
- replace:
- path: /etc/selinux/config
- regexp: '^SELINUX=enforcing'
- replace: 'SELINUX=disabled'
- - name: 设置最大文件句柄数
- lineinfile:
- path: /etc/security/limits.conf
- insertafter: '### AFTER THIS LINE'
- line: "{{ item }}"
- state: present
- with_items:
- - '* soft noproc 65536'
- - '* hard noproc 65536'
- - '* soft nofile 131072'
- - '* hard nofile 131072'
- - '* hard memlock unlimited'
- - '* soft memlock unlimited'
- - name: 关闭 THP
- lineinfile:
- path: /etc/rc.local
- line: |
- echo never > /sys/kernel/mm/transparent_hugepage/enabled
- echo never > /sys/kernel/mm/transparent_hugepage/defrag
- - name: Change permissions
- shell: chmod +x /etc/rc.d/rc.local
- - name: 关闭swap
- replace:
- path: /etc/fstab
- regexp: '^(\s*)([^#\n]+\s+)(\w+\s+)swap(\s+.*)$'
- replace: '#\1\2\3swap\4'
- backup: yes
- - name: Disable SWAP
- shell: |
- swapoff -a
- EOF
复制代码- ansible-playbook modify_env.yml
复制代码 1.3、免密
- cat >> /etc/ansible/playbook/ssh-pubkey.yml << EOF
- - hosts: cluster
- gather_facts: no
- remote_user: root
- tasks:
- - name: 免密登录
- authorized_key:
- user: root
- key: "{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"
- state: present
- EOF
复制代码- ansible-playbook ssh-pubkey.yml
复制代码 1.4、摆设Kafka
- cat >> /etc/ansible/playbook/install-kafka.yml << EOF
- - hosts: cluster
- remote_user: root
- tasks:
- - name: 分发 kafka
- copy: src=/opt/software/kafka_2.13-4.0.0.tgz dest=/opt/software
- - name: 解压 kafka
- shell: tar -xvzf /opt/software/kafka_2.13-4.0.0.tgz -C /opt/module/
- - name: rename to kafka
- shell: mv /opt/module/kafka_2.13-4.0.0 /opt/module/kafka
- - name: 赋权starrocks
- shell: chown -R starrocks:starrocks /opt/module/kafka
- - name: 配置环境变量
- blockinfile:
- path: /home/starrocks/.bashrc
- block: |
- # kafka
- export KAFKA_HOME=/opt/module/kafka
- export PATH=$KAFKA_HOME/bin:$PATH
- marker: "# {mark} KAFKA"
- EOF
复制代码- ansible-playbook install-kafka.yml
复制代码 1.5、修改Kafka设置
vi /opt/module/kafka/config/server.properties
- ############################# Server Basics #############################
- # The role of this server. Setting this puts us in KRaft mode
- process.roles=broker,controller
- # The node id associated with this instance's roles
- ## 为不同kafka 服务分配不同的 id值,其他服务id随ip的递增而增加
- node.id=1
- # List of controller endpoints used connect to the controller cluster
- controller.quorum.bootstrap.servers=kylin-01:9093
- ## 接入集群的kafka 服务的ip 和 编号,端口号使用监控和管理端口 9093
- controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
- listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093
- # Name of listener used for communication between brokers.
- inter.broker.listener.name=PLAINTEXT
- # Listener name, hostname and port the broker or the controller will advertise to clients.
- # If not set, it uses the value for "listeners".
- advertised.listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093
- # This is required if running in KRaft mode.
- controller.listener.names=CONTROLLER
- # listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
- listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
- # The number of threads that the server uses for receiving requests from the network and sending responses to the network
- num.network.threads=3
- # The number of threads that the server uses for processing requests, which may include disk I/O
- num.io.threads=8
- # The send buffer (SO_SNDBUF) used by the socket server
- socket.send.buffer.bytes=102400
- # The receive buffer (SO_RCVBUF) used by the socket server
- socket.receive.buffer.bytes=102400
- # The maximum size of a request that the socket server will accept (protection against OOM)
- socket.request.max.bytes=104857600
- ############################# Log Basics #############################
- # A comma separated list of directories under which to store log files
- log.dirs=/data/kafka/data
- # The default number of log partitions per topic. More partitions allow greater
- # parallelism for consumption, but this will also result in more files across
- # the brokers.
- num.partitions=1
- # This value is recommended to be increased for installations with data dirs located in RAID array.
- num.recovery.threads.per.data.dir=1
- ############################# Internal Topic Settings #############################
- offsets.topic.replication.factor=1
- share.coordinator.state.topic.replication.factor=1
- share.coordinator.state.topic.min.isr=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- ############################# Log Flush Policy #############################
- # The number of messages to accept before forcing a flush of data to disk
- #log.flush.interval.messages=10000
- # The maximum amount of time a message can sit in a log before we force a flush
- #log.flush.interval.ms=1000
- ############################# Log Retention Policy #############################
- # The minimum age of a log file to be eligible for deletion due to age
- log.retention.hours=168
- # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
- # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
- #log.retention.bytes=1073741824
- # The maximum size of a log segment file. When this size is reached a new log segment will be created.
- log.segment.bytes=1073741824
- # The interval at which log segments are checked to see if they can be deleted according
- # to the retention policies
- log.retention.check.interval.ms=300000
- # 最大消息大小512MB
- message.max.bytes=536870912
- replica.fetch.max.bytes=536870912
- # 最大请求字节大小1GB
- max.request.size=1073741824
复制代码 1.6、修改其它节点设置
- node.id=2
- controller.quorum.bootstrap.servers=kylin-02:9093
- controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
- listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
- advertised.listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
复制代码- node.id=3
- controller.quorum.bootstrap.servers=kylin-03:9093
- controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
- listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
- advertised.listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
复制代码 1.7、修改日志目录
vi bin/kafka-run-class.sh
- # Log directory to use
- # 修改日志目录
- LOG_DIR=/data/kafka/log
- if [ "x$LOG_DIR" = "x" ]; then
- LOG_DIR="$base_dir/logs"
- fi
复制代码 1.8、初始化集群
生成存储目录唯一ID
- bin/kafka-storage.sh random-uuid
复制代码 格式化 kafka 存储目录(每个节点都必要执行)
- bin/kafka-storage.sh format -t m6BZb8yRSzmdNvW9kzOoQg -c config/server.properties
复制代码 1.9、启动集群
每个节点都执行启动服务命令
- bin/kafka-server-start.sh -daemon config/server.properties
复制代码 检察服务日志
- tail -f /data/kafka/log/server.log
复制代码 检察 kafka 节点状态
- bin/kafka-broker-api-versions.sh --bootstrap-server kylin-01:9092
复制代码 检察 Kafka 的端口监听状态
- netstat -tuln | grep 9092
复制代码 使用 ps 命令检察 Kafka 历程
使用 top 或 htop 检察 Kafka 历程
2、操作命令
- ## 查看主题
- kafka-topics.sh --bootstrap-server kylin-01:9092 --list
-
- ## 查看主题明细
- kafka-topics.sh --bootstrap-server kylin-01:9092 --describe <topic-id>
-
- ## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
- kafka-topics.sh --bootstrap-server kylin-01:9092 --create --topic <topic-id> --partitions 5 --replication-factor 2
-
- ## 删除主题
- kafka-topics.sh --bootstrap-server kylin-01:9092 --delete --topic <topic-id>
-
- ## 查看消费者列表--list
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --list
-
- ## 查看指定消费组详情
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --group <group-id>
-
- ## 删除特定group
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --delete --group <group-id>
-
- ## 打开一个生产者
- kafka-console-producer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>
-
- ## 打开一个消费者
- kafka-console-consumer.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --consumer-property group.id=<group-id> --from-beginning
-
- ## 查看所有消费组详情--all-groups
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups
-
- 查询消费者成员信息--members
-
- ## 所有消费组成员信息
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups --members
-
- ## 指定消费组成员信息
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --members --group <group-id>
-
- ## 修改到最新offset
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-latest --execute
- ## 预演-重设位移位置
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --dry-run
- ## 重设位移位置
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --execute
- ## 获取指定时间戳的offset
- kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kylin-01:9092 --topic <topic-id> --time 1740499200000
- ## topic扩容
- kafka-topic.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --alter --partitions 16
- ## 指定时间范围获取数据
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print bash}'
- #
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print $0}'
- #
- kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --partition 0 --offset 100 --property print.timestamp=true | grep '1027729757'
复制代码 3、API
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |