羊蹓狼 发表于 2024-11-30 06:55:19

【在Linux世界中追寻巨大的One Piece】多线程(三)

目录
1 -> Linux线程同步
1.1 -> 条件变量
1.2 -> 同步概念与竞态条件
1.3 -> 条件变量函数
1.4 -> 为什么pthread_cond_wait需要互斥量
1.5 -> 条件变量使用规范
2 -> 生产者消耗者模型
2.1 -> 为什么要使用生产者消耗者模型
2.2 -> 生产者消耗者模型优点
2.3 -> 基于BlockingQueue的生产者消耗者模型
2.4 -> C++ queue模仿阻塞队列的生产消耗模型
2.5 -> POSIX信号量
2.6 -> 基于环形队列的生产消耗模型
3 -> 线程池
3.1 -> 线程池概念
3.2 -> 线程池应用场景
3.3 -> 线程池示例
4 -> 线程安全的单例模式
4.1 -> 什么是单例模式
4.2 -> 单例模式的特点
4.3 -> 饿汉方式实现单例模式
4.4 -> 懒汉方式实现单例模式
4.5 -> 懒汉方式实现单例模式(线程安全版本)
5 -> STL,智能指针和线程安全
5.1 -> STL中的容器是否是线程安全的
5.2 -> 智能指针是否是线程安全的
https://i-blog.csdnimg.cn/direct/60fba52dae4245d8b07a0dc75dcde229.png
1 -> Linux线程同步

1.1 -> 条件变量



[*]当一个线程互斥地访问某个变量时,它大概发现在其它线程改变状态之前,它什么也做不了。
[*]比方一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种环境就需要用到条件变量。
1.2 -> 同步概念与竞态条件



[*]同步:在包管数据安全的条件下,让线程可以或许按照某种特定的顺序访问临界资源,从而有用制止饥饿标题,叫做同步。
[*]竞态条件:因为时序标题,而导致步伐异常,我们称之为竞态条件。在线程场景下,这种标题不难理解。

1.3 -> 条件变量函数

初始化

int pthread_cond_init(pthread_cond_t* restrict cond, const pthread_condattr_t* restrict attr);

参数:
cond:要初始化的条件变量
attr:NULL
烧毁
int pthread_cond_destroy(pthread_cond_t* cond);
等待条件满意
int pthread_cond_wait(pthread_cond_t* restrict cond, pthread_mutex_t* restrict mutex);

参数:
cond:要在这个条件变量上等待
mutex:互斥量 叫醒等待
int pthread_cond_broadcast(pthread_cond_t* cond);
int pthread_cond_signal(pthread_cond_t* cond); 简朴案例
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

pthread_cond_t cond;
pthread_mutex_t mutex;

void* r1(void* arg)
{
        while (1)
        {
                pthread_cond_wait(&cond, &mutex);
                printf("活动\n");
        }
}

void* r2(void* arg)
{
        while (1)
        {
                pthread_cond_signal(&cond);
                sleep(1);
        }
}

int main(void)
{
        pthread_t t1, t2;

        pthread_cond_init(&cond, NULL);
        pthread_mutex_init(&mutex, NULL);

        pthread_create(&t1, NULL, r1, NULL);
        pthread_create(&t2, NULL, r2, NULL);

        pthread_join(t1, NULL);
        pthread_join(t2, NULL);

        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond);
}      # ./a.out        活动        活动        活动    1.4 -> 为什么pthread_cond_wait需要互斥量



[*]条件等待是线程间同步的一种手段,如果只有一个线程,条件不满意,一直等下去都不会满意,以是必须要有一个线程通过某些操纵,改变共享变量,使原先不满意的条件变得满意,而且友好的通知等待在条件变量上的线程。
[*]条件不会无缘无端的突然变得满意,必然会牵涉到共享数据的变化。以是一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。

https://i-blog.csdnimg.cn/direct/a13f587fd6e249f0826b830a7da979c1.png


[*]按照上面的说法,设计出如下的代码:先上锁,发现条件不满意,解锁,然后等待在条件变量上就行了。
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false)
{
        pthread_mutex_unlock(&mutex);
        //解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
        pthread_cond_wait(&cond);
        pthread_mutex_lock(&mutex);

}
pthread_mutex_unlock(&mutex);

[*]由于解锁和等待不是原子操纵。调用解锁之后,pthread_cond_wait之前,如果已经有其他线程获取到互斥量,摒弃条件满意,发送了信号,那么pthread_cond_wait将错过这个信号,大概会导致线程永远阻塞在这个pthread_cond_wait。以是解锁和等待必须是一个原子操纵。
[*]int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);进入该函数后,会去看条件量是否即是0,即是0,就把互斥量变成1,直到cond_wait返回,把条件量改成1,把互斥量恢复成原样。
1.5 -> 条件变量使用规范



[*]等待条件代码

pthread_mutex_lock(&mutex);
while (条件为假)
        pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);

[*]给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex); 2 -> 生产者消耗者模型

2.1 -> 为什么要使用生产者消耗者模型

生产者消耗者模式就是通过一个容器来解决生产者和消耗者的强耦合标题。生产者和消耗者相互之间不直接通讯,而通过阻塞队列来进行通讯,以是生产者生产完数据之后不消等待消耗者处理,直接扔给阻塞队列,消耗者不找生产者索要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,均衡了生产者和消耗者的处理能力。这个阻塞队列就是用来给生产者和消耗者解耦的。
2.2 -> 生产者消耗者模型优点



[*]解耦
[*]支持并发
[*]支持忙闲不均

https://i-blog.csdnimg.cn/direct/77b578c187604ab0b1ad5c3c1a4ad9f0.png
2.3 -> 基于BlockingQueue的生产者消耗者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消耗者模型的数据结构。其与普通的队列区别在于:当队列为空时,从队列获取元素的操纵将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操纵也会被阻塞,直到有元素被从队列中取出(以上的操纵都是基于不同的线程来说的,线程在对阻塞队列进程操纵时会被阻塞)。

https://i-blog.csdnimg.cn/direct/db2132b3193c41298f43ed9d18ee90a6.png
2.4 -> C++ queue模仿阻塞队列的生产消耗模型

代码:

#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>

#define NUM 8

class BlockQueue
{
private:
        std::queue<int> q;
        int cap;
        pthread_mutex_t lock;
        pthread_cond_t full;
        pthread_cond_t empty;

private:
        void LockQueue()
        {
                pthread_mutex_lock(&lock);
        }

        void UnLockQueue()
        {
                pthread_mutex_unlock(&lock);
        }

        void ProductWait()
        {
                pthread_cond_wait(&full, &lock);
        }

        void ConsumeWait()
        {
                pthread_cond_wait(&empty, &lock);
        }

        void NotifyProduct()
        {
                pthread_cond_signal(&full);
        }

        void NotifyConsume()
        {
                pthread_cond_signal(&empty);
        }

        bool IsEmpty()
        {
                return (q.size() == 0 ? true : false);
        }

        bool IsFull()
        {
                return (q.size() == cap ? true : false);
        }

public:
        BlockQueue(int _cap = NUM) :cap(_cap)
        {
                pthread_mutex_init(&lock, NULL);
                pthread_cond_init(&full, NULL);
                pthread_cond_init(&empty, NULL);
        }

        void PushData(const int& data)
        {
                LockQueue();
                while (IsFull())
                {
                        NotifyConsume();
                        std::cout << "queue full, notify
                                consume data, product stop." << std::endl;
                                ProductWait();
                }

                q.push(data);
                // NotifyConsume();
                UnLockQueue();
        }

        void PopData(int& data)
        {
                LockQueue();
                while (IsEmpty())
                {
                        NotifyProduct();
                        std::cout << "queue empty, notify
                                product data, consume stop." << std::endl;
                                ConsumeWait();
                }

                data = q.front();
                q.pop();
                // NotifyProduct();
                UnLockQueue();
        }

        ~BlockQueue()
        {
                pthread_mutex_destroy(&lock);
                pthread_cond_destroy(&full);
                pthread_cond_destroy(&empty);
        }
};

void* consumer(void* arg)
{
        BlockQueue* bqp = (BlockQueue*)arg;
        int data;
        for (; ; )
        {
                bqp->PopData(data);
                std::cout << "Consume data done : " << data <<
                        std::endl;
        }
}

//more faster
void* producter(void* arg)
{
        BlockQueue* bqp = (BlockQueue*)arg;
        srand((unsigned long)time(NULL));
        for (; ; )
        {
                int data = rand() % 1024;
                bqp->PushData(data);
                std::cout << "Prodoct data done: " << data <<
                        std::endl;
                // sleep(1);
        }
}

int main()
{
        BlockQueue bq;

        pthread_t c, p;

        pthread_create(&c, NULL, consumer, (void*)&bq);
        pthread_create(&p, NULL, producter, (void*)&bq);

        pthread_join(c, NULL);
        pthread_join(p, NULL);

        return 0;
} 2.5 -> POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操纵,达到无冲突的访问共享资源目的。但POSIX可以用于线程间同步。
初始化信号量

#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);

参数:
pshared : 0 表示线程间共享,非零表示进程间共享
value:信号量初始值 烧毁信号量
int sem_destroy(sem_t* sem);
等待信号量
功能:等待信号量,会将信号量的值减 1
int sem_wait(sem_t * sem); //P() 发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1。
int sem_post(sem_t * sem);//V() 生产者-消耗者的例子是基于queue的,其空间可以动态分配,现在基于固定巨细的环形队列重写这个步伐(POSIX信号量)。
2.6 -> 基于环形队列的生产消耗模型



[*]环形队列采用数组模仿,用模运算来模仿环状特性。

https://i-blog.csdnimg.cn/direct/419108dc18184bc09f29229b5392ff8a.png


[*]环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,以是可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。

https://i-blog.csdnimg.cn/direct/ad461c2df28641718f3230b66a4d3f26.png


[*]但是现有的信号量这个计数器,是简朴的进行多进程间的同步过程。
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>

#define NUM 16

class RingQueue
{
private:
        std::vector<int> q;
        int cap;
        sem_t data_sem;
        sem_t space_sem;
        int consume_step;
        int product_step;

public:
        RingQueue(int _cap = NUM) :q(_cap), cap(_cap)
        {
                sem_init(&data_sem, 0, 0);
                sem_init(&space_sem, 0, cap);
                consume_step = 0;
                product_step = 0;
        }

        void PutData(const int& data)
        {
                sem_wait(&space_sem); // P
                q = data;
                consume_step++;
                consume_step %= cap;
                sem_post(&data_sem); //V
        }

        void GetData(int& data)
        {
                sem_wait(&data_sem);
                data = q;
                product_step++;
                product_step %= cap;
                sem_post(&space_sem);
        }

        ~RingQueue()
        {
                sem_destroy(&data_sem);
                sem_destroy(&space_sem);
        }
};

void* consumer(void* arg)
{
        RingQueue* rqp = (RingQueue*)arg;
        int data;
        for (; ; )
        {
                rqp->GetData(data);
                std::cout << "Consume data done : " << data << std::endl;
                sleep(1);
        }
}

//more faster
void* producter(void* arg)
{
        RingQueue* rqp = (RingQueue*)arg;
        srand((unsigned long)time(NULL));
        for (; ; )
        {
                int data = rand() % 1024;
                rqp->PutData(data);
                std::cout << "Prodoct data done: " << data << std::endl;
                // sleep(1);
        }
}

int main()
{
        RingQueue rq;

        pthread_t c, p;

        pthread_create(&c, NULL, consumer, (void*)&rq);
        pthread_create(&p, NULL, producter, (void*)&rq);

        pthread_join(c, NULL);
        pthread_join(p, NULL);

} 3 -> 线程池

3.1 -> 线程池概念

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和团体性能。而线程池维护着多个线程,等待着监视管理者分配可并发实验的任务。这制止了在处理短时间任务时创建与烧毁线程的代价。线程池不仅可以或许包管内核的充分使用,还能防止太过调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络 sockets 等的数量。
3.2 -> 线程池应用场景


[*]需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求如许的任务,使用线程池技术黑白常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet毗连请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
[*]对性能要求苛刻的应用,比如要求服务器灵敏相应客户请求。
[*]担当突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池环境下,将产生大量线程,虽然理论上大部门操纵体系线程数量最大值不是标题,短时间内产生大量线程大概使内存到达极限,出现错误。
3.3 -> 线程池示例

threadpool.hpp
#ifndef __M_TP_H__
#define __M_TP_H__

#include <iostream>
#include <queue>
#include <pthread.h>

#define MAX_THREAD 5

typedef bool (*handler_t)(int);

class ThreadTask
{
private:
        int _data;
        handler_t _handler;
public:
        ThreadTask() :_data(-1), _handler(NULL) {}

        ThreadTask(int data, handler_t handler)
        {
                _data = data;
                _handler = handler;
        }

        void SetTask(int data, handler_t handler)
        {
                _data = data;
                _handler = handler;
        }

        void Run()
        {
                _handler(_data);
        }
};

class ThreadPool
{
private:
        int _thread_max;
        int _thread_cur;
        bool _tp_quit;
        std::queue<ThreadTask*> _task_queue;
        pthread_mutex_t _lock;
        pthread_cond_t _cond;

private:
        void LockQueue()
        {
                pthread_mutex_lock(&_lock);
        }

        void UnLockQueue()
        {
                pthread_mutex_unlock(&_lock);
        }

        void WakeUpOne()
        {
                pthread_cond_signal(&_cond);
        }

        void WakeUpAll()
        {
                pthread_cond_broadcast(&_cond);
        }

        void ThreadQuit()
        {
                _thread_cur--;
                UnLockQueue();
                pthread_exit(NULL);
        }

        void ThreadWait()
        {
                if (_tp_quit)
                {
                        ThreadQuit();
                }
                pthread_cond_wait(&_cond, &_lock);
        }

        bool IsEmpty()
        {
                return _task_queue.empty();
        }

        static void* thr_start(void* arg)
        {
                ThreadPool* tp = (ThreadPool*)arg;
                while (1)
                {
                        tp->LockQueue();
                        while (tp->IsEmpty())
                        {
                                tp->ThreadWait();
                        }

                        ThreadTask* tt;
                        tp->PopTask(&tt);
                        tp->UnLockQueue();
                        tt->Run();

                        delete tt;
                }

                return NULL;
        }

public:
        ThreadPool(int max = MAX_THREAD) :_thread_max(max),
                _thread_cur(max),
                _tp_quit(false)
        {
                pthread_mutex_init(&_lock, NULL);
                pthread_cond_init(&_cond, NULL);
        }

        ~ThreadPool()
        {
                pthread_mutex_destroy(&_lock);
                pthread_cond_destroy(&_cond);
        }

        bool PoolInit()
        {
                pthread_t tid;
                for (int i = 0; i < _thread_max; i++)
                {
                        int ret = pthread_create(&tid, NULL, thr_start,
                                this);
                        if (ret != 0)
                        {
                                std::cout << "create pool thread error\n";

                                return false;
                        }
                }

                return true;
        }

        bool PushTask(ThreadTask* tt)
        {
                LockQueue();
                if (_tp_quit)
                {
                        UnLockQueue();
                        return false;
                }

                _task_queue.push(tt);
                WakeUpOne();
                UnLockQueue();

                return true;
        }

        bool PopTask(ThreadTask** tt)
        {
                *tt = _task_queue.front();
                _task_queue.pop();

                return true;
        }

        bool PoolQuit()
        {
                LockQueue();
                _tp_quit = true;
                UnLockQueue();
                while (_thread_cur > 0)
                {
                        WakeUpAll();
                        usleep(1000);
                }

                return true;
        }

};
#endif main.cpp
bool handler(int data)
{
        srand(time(NULL));
        int n = rand() % 5;
        printf("Thread: %p Run Tast: %d--sleep %d sec\n", pthread_self(), data, n);
        sleep(n);

        return true;
}

int main()
{
        int i;
        ThreadPool pool;
        pool.PoolInit();
        for (i = 0; i < 10; i++)
        {
                ThreadTask* tt = new ThreadTask(i, handler);
                pool.PushTask(tt);
        }

        pool.PoolQuit();

        return 0;
}      g++ -std=c++0x test.cpp -o test -pthread -lrt    4 -> 线程安全的单例模式

4.1 -> 什么是单例模式

单例模式(Singleton Pattern)是一种设计模式,旨在确保一个类只有一个实例,并提供一个全局访问点来获取这个实例。这种模式常用于管理共享资源,如配置信息、线程池、缓存等。单例模式的核心头脑是控制对象的实例化过程,使得在整个应用步伐中只有一个实例存在,而且所有对该实例的访问都通过同一个访问点进行。
4.2 -> 单例模式的特点


[*]唯一性:单例模式确保一个类只有一个实例。这意味着在整个应用步伐的生命周期中,无论在何处调用该类的实例,都将返回同一个对象。
[*]全局访问点:单例模式提供了一个全局访问点,通常是一个静态方法,用于获取该类的唯一实例。这使得在步伐的任何地方都可以或许方便地访问该实例。
[*]延迟初始化:单例模式的实例通常在第一次被请求时才会创建,这有助于节流资源。这种延迟初始化的特性也被称为懒加载(Lazy Initialization)。
[*]线程安全:在多线程环境下,单例模式需要确保其唯一性和全局访问的正确性。这通常通过同步机制来实现,如使用锁或其他并发控制手段。
[*]控制实例化:单例模式的构造函数通常是私有的,这防止了外部代码通过通例的构造函数创建新的实例。
[*]可扩展性:单例模式可以通过继承或其他方式进行扩展,以满意不同的应用需求。
[*]资源管理:单例模式常用于管理共享资源,如数据库毗连、线程池、配置信息等,确保这些资源在整个应用步伐中只有一个实例,从而提高资源的使用效率和管理便利性。
[*]性能优化:由于单例模式只创建一个实例,因此可以减少内存开销和提高性能,特别是在处理大量数据或频仍访问的对象时。
[*]简化代码结构:单例模式可以简化代码结构,因为它提供了一个单一的、全局的访问点,使得代码的维护和理解更加容易。
[*]适用场景:单例模式适用于那些在整个应用步伐中只需要一个实例的场景,如日志记录器、配置管理器、数据库毗连池等。


4.3 -> 饿汉方式实现单例模式


template <typename T>
class Singleton
{
        static T data;
public:
        static T* GetInstance()
        {
                return &data;
        }
}; 只要通过Singleton这个包装类来使用T对象,则一个进程中只有一个T对象的实例。
4.4 -> 懒汉方式实现单例模式

template <typename T>
class Singleton
{
        static T* inst;
public:
        static T* GetInstance()
        {
                if (inst == NULL)
                {
                        inst = new T();
                }
                return inst;
        }
}; 存在一个严重的标题,线程不安全。
第一个调用GetInstance时,如果两个线程同时调用,大概会创建出两份T对象的实例。
但是后续再次调用,就没有标题了。

4.5 -> 懒汉方式实现单例模式(线程安全版本)


// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
        volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
        static std::mutex lock;

public:
        static T* GetInstance()
        {
                if (inst == NULL)
                { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
                        lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
                        if (inst == NULL)
                        {
                                inst = new T();
                        }
                        lock.unlock();
                }

                return inst;
        }
}; 注意:

[*]加锁解锁的位置
[*]双重if判断,制止不必要的锁竞争
[*]volatile关键字防止过分优化

5 -> STL,智能指针和线程安全

5.1 -> STL中的容器是否是线程安全的

不是。
缘故原由是,STL的设计初衷是将性能挖掘到极致,而一旦涉及到加锁包管线程安全,会对性能造成巨大的影响。
而且对于不同的容器,加锁方式的不同,性能大概也不同(比方hash表的锁表和锁桶)。
因此STL默认不是线程安全。如果需要在多线程环境下使用,往往需要调用者自行包管线程安全。
5.2 -> 智能指针是否是线程安全的

对于unique_ptr,由于只是在当前代码块范围内见效,因此不涉及线程安全标题。
对于shared_ptr,多个对象需要共用一个引用计数变量,以是会存在线程安全标题。但是标准库实现的时间考虑到了这个标题,基于原子操纵(CAS)的方式包管shared_ptr可以或许高效,原子的操纵引用计数。


感谢各位大佬支持!!!
互三啦!!!



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【在Linux世界中追寻巨大的One Piece】多线程(三)