ToB企服应用市场:ToB评测及商务社交产业平台
标题:
kafka千万级数据挤压问题办理
[打印本页]
作者:
美食家大橙子
时间:
2024-11-19 18:25
标题:
kafka千万级数据挤压问题办理
1. 背景
之前碰到过这样一个项目需求,我们需要从kafka中拿到数据,之后将数据进行转化为第三方需要的格式,再将数据推送到第三方的kafka中。由于每天的数据量快达到的千万级别,且每条数据中包罗了多张图片的base64格式,这就导致数据量多的同时每条数据报文还很大,因此数据消费速度是根本无法跟得上数据生产速度,造成数据堆积。
2. 办理方案
2.1 单节点
首先需要从单节点开始优化,使得每个节点的性能达到最大。
2.1.1 消费者端
增加消费者线程数,使消费者线程数和订阅的kafka的topic的分区数一致。每一个消费者可以消费多个分区,但是每个分区只能被一个消费者组中的一个消费者所消费。因此,在同一个消费者组中,当消费者数量和topic的分区数相等时,消费速率最高;若消费者数量大于分区数时,多余的消费者并不会和其他消费者同时消费数据,会造成资源浪费。
因此,我先修改了kafka消费者的concurrency参数,将其值设置为分区巨细。
但是,之后我利用kafka下令检察分区消费情况时,却发现多个分区只被一个消费者线程消费(一个线程并发消费了多个分区),这与我们最开始想的一个线程消费一个分区不相符。于是我放弃了通过设置来实现多线程消费的方式,利用代码控制线程消费分区情况:
1)消费端由最开始的@KafkaListener注解监听的方式修改为主动通过程序poll消息的方式。
2)在设置文件中设置要监听的分区数,通过代码精准控制每个消费者仅仅消费那一个分区。
实现代码思路如下:
1)设置文件(其中,start.partition为要消费的开始分区,end.partition为要消费的结束分区):
2)消费者伪代码:
public class ConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
/**
* 消费者线程的构造方法,
* @param properties kafka的配置信息
* @param topic 消费者订阅的topic
* @param i 要消费的分区数
*/
public ConsumerThread(Properties properties, String topic, Integer i) {
this.kafkaConsumer = new KafkaConsumer(properties);
this.kafkaConsumer.assign(Arrays.asList(new TopicPartition(topic, i)));
}
@Override
public void run() {
try {
while (true) {
// 拉取kafka中的数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("所属分区:" + record.partition());
//实现处理逻辑
……
}
}
} catch (Exception e) {
logger.error("线程执行异常", e);
} finally {
kafkaConsumer.close();
}
}
}
复制代码
3)创建消费者线程伪代码:
public static void initNormalTask() {
// 获取kafka配置信息
Properties properties = consumerProperty.initConfig();
// 获取配置文件中的开始分区,即start.partition
logger.info("start:" + ApplicationUtils.getStartPartition());
// 获取配置文件中的结束分区,即end.partition
logger.info("end:" + ApplicationUtils.getEndPartition());
// 根据分区数创建消费者线程,for循环中的i正好对应分区数
for (int i = ApplicationUtils.getStartPartition(); i <= ApplicationUtils.getEndPartition(); i++) {
new ConsumerThread(properties, normalTopic, i).start();
}
}
复制代码
通过以上三步,则将消费者线程和分区数进行了逐一对应。
2. 增加分区数量。
增加了分区数量,我们就可以相应地增加消费者的数量,从而进步了消费者的并行本领,也就进步了数据的消费本领(分区数量决定了消费者最大线程数)。
2.1.2 数据处理端
最开始,从kafka中获取到数据后,直接进行数据转化,然后推送至第三方,整个过程是一口吻完成的。试想,就算将数据直接推送至第三方,都未必能跟上数据的生产速度,更何况还需要对数据进行格式转化。
假如处理数据的地方对消息的处理非常迅速,那么poll拉取的频次也会更高,进而整体消费的性能也会提升;相反,假如在这里对消息的处理缓慢,比如进行一个事件性操作,大概等待一个RPC的同步相应,那么poll拉取的频次也会随之下降,进而造成整体消费性能的下降。因此,在数据处理时,利用多线程的方式,也是进步数据消费速率的一种方式。模型图如下:
优化战略:消费者从kafka监听到数据后,将数据放到阻塞队列中,然后定义数据处理线程,并创建多个数据处理线程从阻塞队列中获取数据、处理数据并将数据推送至第三方,这样就大大提升了一条数据从消费者获取到末了推送至第三方的速率。
2.2 多节点
由于将单个节点的性能调解到最大后,消费速度依然跟不上kafka生产数据的速度,还是有数据堆积,因此萌生了部署多个节点的想法。
最开始为了将集群的性能调到最大,部署了10个节点,每个节点消费一个分区,首先将数据处理线程设置为20个,在这种情况下,消费速度明显提升,已经不存在数据堆积的问题。
2.3 信号量
由于消费者监听到数据后,是存入了阻塞队列中,若节点重启,则会丢失阻塞队列中的数据,针对这一问题,准备尝试利用信号量办理。
信号量即Semaphore,是一种计数器,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源,通常用于那些资源有明确访问数量限制的场景,常用于限流 。通俗来说就是,如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于等于要获得的
复制代码
信号量数,信号量减相应数量,然后允许共享这个资源;否则,信号量将会把线程置入休眠直至计数器大于便是要获得的信号量数。当信号量利用完时,必须释放。
Semaphore是JUC(java.util.concurrent)包下的类,常用方法有:
1)Semaphore(int permits):构造方法,参数为初始信号量数;
2)void acquire():线程占用一个许可。可以传一个int参数,意为线程占用n个许可,没有许可时会阻塞住;
3)void release():线程释放一个许可。可以传一个int参数,意为线程释放n个许可;
4)int availablePermits(): 获取当前信号量可用的许可。
为了验证信号量是否可以解决阻塞队列导致数据丢失问题,我写了一个Demo:
复制代码
1)模拟kafka产生数据的代码:
public class KafkaThread implements Runnable {
private KafkaQueue queue = KafkaQueue.getInstance();
public void run() {
while (queue.size() < 10000000) {
queue.put(1);
}
}
}
复制代码
2)模拟从kafka中消费数据,获取到数据后,先执行Semaphore的acquire方法获取信号量,若还有信号量则向下执行,将数据存入MyQueue阻塞队列中,否则阻塞在这里:
public class ConsumerThread implements Runnable {
private KafkaQueue kafkaQueue = KafkaQueue.getInstance();
private MyQueue myQueue = MyQueue.getInstance();
public void run() {
while (kafkaQueue.size() > 0) {
try {
myQueue.acquire();
System.out.println(Thread.currentThread().getName() + "逻辑处理");
myQueue.put(kafkaQueue.get());
System.out.println("阻塞队列大小:" + myQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
3)模拟数据处理线程,从阻塞队列MyQueue中取出,进行数据处理,处理完毕后调用Semaphore的release方法,释放信号量:
public class DataHandleThread implements Runnable {
private MyQueue myQueue = MyQueue.getInstance();
public void run() {
while (true) {
Integer i = null;
if (myQueue.size() > 0) {
try {
i = myQueue.get();
System.out.println(Thread.currentThread().getName() + "处理逻辑");
myQueue.release();
System.out.println("信号量大小:" + myQueue.availablePermits());
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
复制代码
4)阻塞队列MyQueue,内置信号量作为属性,初始信号量值设置为10:
public class MyQueue {
private static ArrayBlockingQueue<Integer> queue;
private static Semaphore semaphore;
private MyQueue() {
queue = new ArrayBlockingQueue<Integer>(300);
semaphore = new Semaphore(10);
}
public void put(Integer i) throws InterruptedException {
queue.put(i);
}
public int get() throws InterruptedException {
return queue.take();
}
public int size() {
return queue.size();
}
private static class SingleTon {
private static MyQueue instance = new MyQueue();
}
public static MyQueue getInstance() {
return SingleTon.instance;
}
// 获取信号量
public void acquire() throws InterruptedException {
semaphore.acquire();
}
// 释放信号量
public void release() {
semaphore.release();
}
// 查看可用信号量数
public int availablePermits() {
return semaphore.availablePermits();
}
}
复制代码
5)主方法Main:
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 执行kafka线程,通过一直往KafkaQueue中存入数据来模拟kafka生产数据
executor.execute(new KafkaThread());
// 执行消费者线程,指从kafkaQueue中获取数据存入阻塞队列中,用来模拟从kafka中获取数据
executor.execute(new ConsumerThread());
// 执行生产者线程,指从阻塞队列中获取数据进行数据处理
for (int i = 0; i < 5; i++) {
executor.execute(new ProducerThread());
}
}
}
复制代码
6)执行效果(开启5个数据处理线程):
a. 开启一个消费者线程,若阻塞队列巨细大于信号量的初始值,则MyQueue队列中的数据个数最多与信号量巨细一致。阻塞队列巨细为300,信号量初始值为10,则队列中的数据最多有10个,执行效果如下:
pool-1-thread-2逻辑处理
阻塞队列大小:1
pool-1-thread-2逻辑处理
阻塞队列大小:2
pool-1-thread-2逻辑处理
阻塞队列大小:3
pool-1-thread-2逻辑处理
阻塞队列大小:4
pool-1-thread-2逻辑处理
阻塞队列大小:5
pool-1-thread-2逻辑处理
阻塞队列大小:6
pool-1-thread-2逻辑处理
阻塞队列大小:7
pool-1-thread-2逻辑处理
阻塞队列大小:8
pool-1-thread-2逻辑处理
阻塞队列大小:9
pool-1-thread-2逻辑处理
阻塞队列大小:10
pool-1-thread-3处理逻辑
信号量大小:1
pool-1-thread-2逻辑处理
阻塞队列大小:10
pool-1-thread-4处理逻辑
信号量大小:1
pool-1-thread-2逻辑处理
阻塞队列大小:10
......
复制代码
b. 开启一个消费者线程,若阻塞队列巨细小于信号量的初始值,则MyQueue队列中的数据个数最多与阻塞队列巨细一致。将阻塞队列巨细设置为5,信号量初始值为10,则队列中的数据最多有5个,执行效果如下:
pool-1-thread-2逻辑处理
阻塞队列大小:1
pool-1-thread-2逻辑处理
阻塞队列大小:2
pool-1-thread-2逻辑处理
阻塞队列大小:3
pool-1-thread-2逻辑处理
阻塞队列大小:4
pool-1-thread-2逻辑处理
阻塞队列大小:5
pool-1-thread-2逻辑处理
pool-1-thread-4处理逻辑
信号量大小:5
......
复制代码
因此,信号量在本项目中是适用的,可以将信号量的初始值设置成与数据处理线程每秒处理数据个数大致相同。
信号量也适用于同种线程对公共资源并发访问时,控制线程数的情况。如下Demo:
@Slf4j
public class TestMain {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
pool.execute(() -> {
try {
semaphore.acquire();
log.info("成功获取令牌");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.info("释放令牌");
semaphore.release();
}
});
}
}
}
复制代码
执行效果:
10:29:50.584 [pool-1-thread-2] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:50.584 [pool-1-thread-7] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:50.584 [pool-1-thread-8] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:50.585 [pool-1-thread-3] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:50.584 [pool-1-thread-1] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-3] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.596 [pool-1-thread-8] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.596 [pool-1-thread-7] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.596 [pool-1-thread-4] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-1] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.596 [pool-1-thread-6] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-5] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-9] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-2] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.597 [pool-1-thread-10] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:52.610 [pool-1-thread-6] INFO com.ljp.main.TestMain - 释放令牌
10:29:52.610 [pool-1-thread-10] INFO com.ljp.main.TestMain - 释放令牌
10:29:52.610 [pool-1-thread-4] INFO com.ljp.main.TestMain - 释放令牌
10:29:52.610 [pool-1-thread-9] INFO com.ljp.main.TestMain - 释放令牌
10:29:52.610 [pool-1-thread-5] INFO com.ljp.main.TestMain - 释放令牌
复制代码
由执行效果可以看出,10个线程并不是一次性都执行完的,根据打印的时间,看出前五个线程是同时进行的,因为我们将信号量的初始值设为了5,等有线程释放了信号量之后,其他线程再继承执行。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4