kafka菜鸟教程

打印 上一主题 下一主题

主题 1522|帖子 1522|积分 4566

一、kafka原理

1、kafka是一个高性能消息队列系统,可以或许处理大规模的数据流,并提供低延长的数据传输,它可以或许以每秒数十万条消息的速度进行读写操作。
二、kafka优点

1、服务解耦

(1)进步系统的可维护性
   通过服务解耦,可以将系统分解为独立的部分,当需要更新或修复某个服务时,可以独立地进行操作,而不会影响到其他服务的正常运作。这大大减少了维护工作的难度和所需时间。
‌(2)加强系统的可扩展性
      解耦后的系统更容易扩展。添加新功能或服务通常不会影响现有系统的其他部分,从而快速相应市场和用户的需求变革。
2、高吞吐量、低延长

    kafka每秒可以处理几十万条消息,它的延长最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
3、可扩展性

    集群支持热扩展(kafka-reassign-partitions.sh)分区重分配、迁移
4、持久性、可靠性


消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
5、容错性


答应集群中节点失败(若副本数目为n,则答应n-1个节点失败)
6、高并发


支持数千个客户端同时读写
三、主要概念


1. 主题 (Topic)



Kafka 中的消息以主题 (Topic) 为单位进行构造。每个主题代表一个消息流,消息生产者向主题发送消息,消息消费者从主题消费消息。
2. 分区 (Partition)


每个主题可以分为多个分区 (Partition),每个分区是一个有序、不可变的消息序列。分区的存在使得 Kafka 可以或许水平扩展,可以处理大量数据并提供高吞吐量。
3. 副本 (Replica)


为了包管数据的高可用性,Kafka 答应每个分区有多个副本 (Replica),这些副本存储在不同的服务器上。这样,即使某个服务器故障,数据仍然可用。
4. 生产者 (Producer)


生产者是向 Kafka 主题发送消息的客户端。生产者可以选择将消息发送到特定的分区,也可以让 Kafka 根据某种计谋(如轮询)决定将消息发送到哪个分区。
5. 消费者 (Consumer)


消费者是从 Kafka 主题消费消息的客户端。消费者通常属于某个消费者组 (Consumer Group),一个消费者组中的多个消费者可以并行消费同一个主题的不同分区,进步消费速度和效率。
6. 经纪人 (Broker)


Kafka 集群由多个经纪人 (Broker) 构成,每个经纪人是一个 Kafka 实例。经纪人负责存储消息并处理消息的读写请求。
7. ZooKeeper


ZooKeeper 是一个分布式和谐服务,Kafka 使用 ZooKeeper 来管理集群元数据,如主题、分区、经纪人等信息。
四、kafka安装教程

   1、 点此链接进入官网下载地址
   2、点击 图1红色方框DOWNLOAD KAFKA 

                            图1
3、点击图2选中的链接下载即可

图2
3、将kafka解压到服务器后修改配置项kafka_2.13-4.0.0\config\zookeeper.properties
  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements.  See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License.  You may obtain a copy of the License at
  7. #
  8. #    http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # the directory where the snapshot is stored.
  16. dataDir=/tmp/zookeeper
  17. # the port at which the clients will connect
  18. clientPort=2181
  19. # disable the per-ip limit on the number of connections since this is a non-production config
  20. maxClientCnxns=0
  21. # Disable the adminserver by default to avoid port conflicts.
  22. # Set the port to something non-conflicting if choosing to enable this
  23. admin.enableServer=true
  24. admin.enable=true
  25. audit.enable=true
  26. # admin.serverPort=8080
复制代码
4、修改配置项 kafka_2.13-4.0.0\config\server.properties     
  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements.  See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License.  You may obtain a copy of the License at
  7. #
  8. #    http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
  17. # See kafka.server.KafkaConfig for additional details and defaults
  18. #
  19. ############################# Server Basics #############################
  20. # The id of the broker. This must be set to a unique integer for each broker.
  21. broker.id=1
  22. ############################# Socket Server Settings #############################
  23. # The address the socket server listens on. If not configured, the host name will be equal to the value of
  24. # java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
  25. #   FORMAT:
  26. #     listeners = listener_name://host_name:port
  27. #   EXAMPLE:
  28. #     listeners = PLAINTEXT://your.host.name:9092
  29. listeners=PLAINTEXT://10.11.22.122:9092
  30. # Listener name, hostname and port the broker will advertise to clients.
  31. # If not set, it uses the value for "listeners".
  32. #advertised.listeners=PLAINTEXT://your.host.name:9092
  33. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
  34. #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  35. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  36. num.network.threads=3
  37. # The number of threads that the server uses for processing requests, which may include disk I/O
  38. num.io.threads=8
  39. # The send buffer (SO_SNDBUF) used by the socket server
  40. socket.send.buffer.bytes=102400
  41. # The receive buffer (SO_RCVBUF) used by the socket server
  42. socket.receive.buffer.bytes=102400
  43. # The maximum size of a request that the socket server will accept (protection against OOM)
  44. socket.request.max.bytes=104857600
  45. ############################# Log Basics #############################
  46. # A comma separated list of directories under which to store log files
  47. log.dirs=/app/test/kafka/kafka_2.13-3.2.3/kafka-logs
  48. # The default number of log partitions per topic. More partitions allow greater
  49. # parallelism for consumption, but this will also result in more files across
  50. # the brokers.
  51. num.partitions=1
  52. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  53. # This value is recommended to be increased for installations with data dirs located in RAID array.
  54. num.recovery.threads.per.data.dir=1
  55. ############################# Internal Topic Settings  #############################
  56. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
  57. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
  58. offsets.topic.replication.factor=1
  59. transaction.state.log.replication.factor=1
  60. transaction.state.log.min.isr=1
  61. ############################# Log Flush Policy #############################
  62. # Messages are immediately written to the filesystem but by default we only fsync() to sync
  63. # the OS cache lazily. The following configurations control the flush of data to disk.
  64. # There are a few important trade-offs here:
  65. #    1. Durability: Unflushed data may be lost if you are not using replication.
  66. #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
  67. #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
  68. # The settings below allow one to configure the flush policy to flush data after a period of time or
  69. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
  70. # The number of messages to accept before forcing a flush of data to disk
  71. #log.flush.interval.messages=10000
  72. # The maximum amount of time a message can sit in a log before we force a flush
  73. #log.flush.interval.ms=1000
  74. ############################# Log Retention Policy #############################
  75. # The following configurations control the disposal of log segments. The policy can
  76. # be set to delete segments after a period of time, or after a given size has accumulated.
  77. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
  78. # from the end of the log.
  79. # The minimum age of a log file to be eligible for deletion due to age
  80. log.retention.hours=168
  81. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  82. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  83. #log.retention.bytes=1073741824
  84. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  85. log.segment.bytes=1073741824
  86. # The interval at which log segments are checked to see if they can be deleted according
  87. # to the retention policies
  88. log.retention.check.interval.ms=300000
  89. ############################# Zookeeper #############################
  90. # Zookeeper connection string (see zookeeper docs for details).
  91. # This is a comma separated host:port pairs, each corresponding to a zk
  92. # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
  93. # You can also append an optional chroot string to the urls to specify the
  94. # root directory for all kafka znodes.
  95. #zookeeper.connect=10.11.22.121:2181,10.11.22.122:2181,10.11.22.124:2181
  96. zookeeper.connect=10.11.22.122:2181
  97. # Timeout in ms for connecting to zookeeper
  98. zookeeper.connection.timeout.ms=180000
  99. ############################# Group Coordinator Settings #############################
  100. # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
  101. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
  102. # The default value for this is 3 seconds.
  103. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
  104. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
  105. group.initial.rebalance.delay.ms=0
复制代码
5、下载zookeeper
按照图3所示点击网站链接下载zookeeper
(1点击链接到zookeeper下载网址
           图3
解压到服务器进入到apache-zookeeper-3.9.3-bin\conf目次下新建文件zoo.cfg
  1. # The number of milliseconds of each tick
  2. tickTime=2000
  3. # The number of ticks that the initial
  4. # synchronization phase can take
  5. initLimit=10
  6. # The number of ticks that can pass between
  7. # sending a request and getting an acknowledgement
  8. syncLimit=5
  9. # the directory where the snapshot is stored.
  10. # do not use /tmp for storage, /tmp here is just
  11. # example sakes.
  12. dataDir=/app/install-test/zk/zookeeper-3.4.6/zkdata
  13. dataLogDir=/app/install-test/zk/zookeeper-3.4.6/logs
  14. # the port at which the clients will connect
  15. clientPort=2181
  16. # the maximum number of client connections.
  17. # increase this if you need to handle more clients
  18. #maxClientCnxns=60
  19. #
  20. # Be sure to read the maintenance section of the
  21. # administrator guide before turning on autopurge.
  22. #
  23. # https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
  24. #
  25. # The number of snapshots to retain in dataDir
  26. #autopurge.snapRetainCount=3
  27. # Purge task interval in hours
  28. # Set to "0" to disable auto purge feature
  29. #autopurge.purgeInterval=1
  30. audit.enable=true
  31. ## Metrics Providers
  32. #
  33. # https://prometheus.io Metrics Exporter
  34. #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
  35. #metricsProvider.httpHost=0.0.0.0
  36. #metricsProvider.httpPort=7000
  37. #metricsProvider.exportJvmInfo=true
  38. server.1=10.11.22.122:2888:2889
  39. #server.2=10.11.22.123:2888:2889
  40. #server.3=10.11.22.124:2888:2889
复制代码
  6、修改各配置项后先启动zookeeper服务,进入到kafka_2.13-4.0.0文件夹启动下令如下
  1. ./bin/zookeeper-server-start.sh  -daemon ./config/zookeeper.properties
复制代码
启动成功后检察进程
  1. ps -ef| grep zookeeper
复制代码
启动成功后如图4所示

图4
7、切换到目次apache-zookeeper-3.9.3-bin\bin目次下输入下令启动zookeeper客户端
  1. ./zkCli.sh  -daemon
复制代码
启动成功后如图5所示

图5

8、切换到kafka_2.13-4.0.0文件夹输入kafka启动下令
  1. ./bin/kafka-server-start.sh  -daemon ./config/server.properties
复制代码
启动成功后检察进程如图6所示
  1. ps -ef| grep kafka
复制代码

     图6 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

干翻全岛蛙蛙

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表