目录
1. 项目先容
2. 内存池概念
2.1 池化技能
2.2 内存池和内存碎片
2.3 细看malloc
3. 定长内存池的实现
ObjectPool.hpp
4. 高并发内存池框架
5. thread cache测试
5.1 thread cache框架
5.2 ConcurrentAlloc.hpp
6. central cache测试
6.1 central cache框架
6.2 重要接口函数
7. page cache测试
7.1 page cache框架
7.2 重要接口函数
1. 项目先容
此项目实现的是一个高并发的内存池,它的原型是Google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替换系统的内存分配相关函数malloc和free。
tcmalloc的着名度黑白常高的,不少公司都在用它,比如Go语言就直接用它做了本身的内存分配器。后此项目就是把tcmalloc中最核心的框架简化拿出来,模拟实现出一个mini版的高并发内存池,目标就是学习tcmalloc的精华。
涉及知识点:此项目主要涉及C/C++、数据布局(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等方面的技能。
2. 内存池概念
在说内存池之前,得先相识一下“池化技能”。
2.1 池化技能
“池化技能”,就是程序先向系统申请过量的资源,然后本身进行管理,以备不时之需。
之所以要申请过量的资源,是因为申请和释放资源都有较大的开销,不如提前申请一些资源放入“池”中,当必要资源时直接从“池”中获取,不必要时就将该资源重新放回“池”中即可。如许使用时就会变得非常快捷,可以大大提高程序的运行效率。
在计算机中,有很多使用“池”这种技能的地方,除了内存池之外,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想就是:先启动若干数量的线程,让它们处于睡眠状态,当吸收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求后,线程又进入睡眠状态。
2.2 内存池和内存碎片
内存池概念:
内存池是指程序预先向操作系统申请一块足够大的内存,此后,当程序中必要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当释放内存的时候,并不是真正将内存返回给操作系统,而是将内存返回给内存池。当程序退出时(或某个特定时间),内存池才将之前申请的内存真正释放。
内存池主要办理的标题:
内存池主要办理的就是效率的标题,它能够避免让程序频繁的向系统申请和释放内存。其次,内存池作为系统的内存分配器,还必要尝试办理内存碎片的标题。
内存碎片分为内碎片和外碎片:
- 外碎片是一些空闲的小块内存区域,由于这些内存空间不一连,以至于合计的内存足够,但是不能满意一些内存分配申请需求。
- 内碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被使用。
内存池尝试办理的是外部碎片的标题,同时也尽可能的镌汰内部碎片的产生。
2.3 细看malloc
malloc和C语言内存管理复习:
C语言进阶⑰(动态内存管理)四个动态内存函数+动态通讯录+柔性数组-CSDN博客
C/C++中我们要动态申请内存并不是直接去堆申请的,而是通过malloc函数去申请的,包括C++中的new实际上也是封装了malloc函数的。
申请内存块时是先调用malloc,malloc再去向操作系统申请内存。malloc实际就是一个内存池,malloc相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程序用,当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。
malloc的实现方式有很多种,一样平常不同编译器平台用的都是不同的。比如Windows的VS系列中的malloc就是微软自行实现的,而Linux下的gcc用的是glibc中的ptmalloc。
3. 定长内存池的实现
定长内存池的实现可以用malloc也可以直接向堆申请页为单位的大块内存。
windows和Linux下怎样直接向堆申请页为单位的大块内存:
百度百科-验证百度百科是一部内容开放、自由的网络百科全书,旨在创造一个涵盖全部范畴知识,服务全部互联网用户的中文知识性百科全书。在这里你可以参与词条编辑,分享贡献你的知识。https://baike.baidu.com/item/VirtualAlloc/1606859?fr=aladdinhttps://baike.baidu.com/item/VirtualAlloc/1606859?fr=aladdinhttps://baike.baidu.com/item/VirtualAlloc/1606859?fr=aladdin
https://www.cnblogs.com/vinozly/p/5489138.htmlhttps://www.cnblogs.com/vinozly/p/5489138.htmlhttps://www.cnblogs.com/vinozly/p/5489138.htmlhttps://www.cnblogs.com/vinozly/p/5489138.html
这里用下Windows下的VirtualAlloc:
定长内存池中应该包含哪些成员变量?
对于向堆申请到的大块内存,可以用一个指针来对其进行管理,但仅用一个指针肯定是不够的,还必要用一个变量来纪录这块内存的长度。
由于此后必要将这块内存进行切分,为了方便切分操作,指向这块内存的指针最好是字符指针,因为指针的范例决定了指针向前或向后走一步有多大距离,对于字符指针来说,当必要向后移动 n 个字节时,直接对字符指针进行加 n 操作即可。
释放返来的定长内存块也必要被管理,可以将这些释放返来的定长内存块链接成一个链表,这里将管理释放返来的内存块的链表叫做自由链表,为了能找到这个自由链表,还必要一个指向自由链表的指针。
因此,定长内存池当中包含三个成员变量:
- _memory:指向大块内存的指针。
- _remainBytes:大块内存切分过程中剩余字节数。
- _freeList:还返来过程中链接的自由链表的头指针。
直接放个带注释的完整代码:
ObjectPool.hpp
UnitTest.cc:
- #include "ObjectPool.hpp"
- int main()
- {
- TestObjectPool();
- return 0;
- }
复制代码
多试频频也是差不多,可以看到Release下面差距还是挺大的,将近20倍。
4. 高并发内存池框架
高并发内存池办理的标题:
现代很多的开发环境都是多核多线程,因此在申请内存的时,必然存在激烈的锁竞争标题。malloc本身其实已经很优秀了,但是在并发场景下可能会因为频繁的加锁息争锁导致效率有所低落,而该项目标原型tcmalloc实现的就是一种在多线程高并发场景下更胜一筹的高并发内存池。
在实现内存池时我们一样平常必要考虑到效率标题和内存碎片的标题,但对于高并发内存池来说,我们还必要考虑在多线程环境下的锁竞争标题。
高并发内存池主要由以下三个部门构成:
- thread cache: 线程缓存是每个线程独有的,用于小于便是256KB的内存分配,每个线程独享一个thread cache。
- central cache: 中心缓存是全部线程所共享的,当thread cache必要内存时会按需从central cache中获取内存,而当thread cache中的内存满意一定条件时,central cache也会在合适的时机对其进行回收。
- page cache: 页缓存中存储的内存是以页为单位进行存储及分配的,当central cache必要内存时,page cache会分配出一定数量的页分配给central cache,而当central cache中的内存满意一定条件时,page cache也会在合适的时机对其进行回收,并将回收的内存尽可能的进行合并,组成更大的一连内存块,缓解内存碎片的标题。
- 每个线程都有一个属于本身的thread cache,也就意味着线程在thread cache申请内存时是不必要加锁的,而一次性申请大于256KB内存的环境是很少的,因此大部门环境下申请内存时都是无锁的,这也就是这个高并发内存池高效的地方。
- 每个线程的thread cache会根据本身的环境向central cache申请或归还内存,这就避免了出现单个线程的thread cache占用太多内存,而其余thread cache出现内存吃紧的标题。
- 多线程的thread cache可能会同时找central cache申请内存,此时就会涉及线程安全的标题,因此在访问central cache时是必要加锁的,但central cache实际上是一个哈希桶的布局,只有当多个线程同时访问同一个桶时才必要加锁,所以这里的锁竞争也不会很激烈。
各个部门的主要作用:
thread cache主要办理锁竞争的标题,每个线程独享本身的thread cache,当本身的thread cache中有内存时该线程不会去和其他线程进行竞争,每个线程只要在本身的thread cache申请内存就行了。
central cache主要起到一个居中调理的作用,每个线程的thread cache必要内存时从central cache获取,而当thread cache的内存多了就会将内存还给central cache,其作用雷同于一个中枢,因此取名为中心缓存。
page cache负责提供以页为单位的大块内存,当central cache必要内存时就会去向page cache申请,而当page cache没有内存了就会直接去找系统要,也就是直接去堆上按页申请内存块。
5. thread cache测试
5.1 thread cache框架
定长内存池只支持固定巨细内存块的申请释放,因此定长内存池中只必要一个自由链表管理释放返来的内存块。现在要支持申请和释放不同巨细的内存块,那么就必要多个自由链表来管理释放返来的内存块,因此thread cache实际上是一个哈希桶布局,每个桶中存放的都是一个自由链表。
基于前面的定长内存池,对自由链表这个布局进行封装成一个Common.hpp,此后其它部门共用的头文件和函数等都放Common.hpp:
- #pragma once
- #include <iostream>
- #include <vector>
- #include <algorithm>
- #include <ctime>
- #include <cassert>
- #include <thread>
- using std::cout;
- using std::endl;
- #ifdef _WIN32
- #include <windows.h>
- #else
- // ...
- #endif
- // 能不用宏就不用宏
- static const size_t MAX_BYTES = 256 * 1024; // max_bytes 256kb
- static const size_t NFREELIST = 208; // nfreelist 哈希桶总的桶数
- inline static void* SystemAlloc(size_t kpage) // 去堆上按页申请空间
- {
- #ifdef _WIN32
- void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
- #else
- // linux下brk mmap进程分配内存的系统调用等
- #endif
- if (ptr == nullptr)
- throw std::bad_alloc();
- return ptr;
- }
- static void*& NextObj(void* obj) // 取头上4/8字节
- {
- return *(void**)obj;
- }
- class FreeList // 管理切分好的小对象的自由链表
- {
- public:
- void Push(void* obj) // 头插
- {
- assert(obj);
- //*(void**)obj = _freeList; // 赋值给头4/8字节
- NextObj(obj) = _freeList; // 即上一行的注释
- _freeList = obj;
- }
- void* Pop() // 头删
- {
- assert(_freeList);
- void* obj = _freeList;
- _freeList = NextObj(obj); // _freeList指向下一个
- return obj;
- }
- bool Empty()
- {
- return _freeList == nullptr;
- }
- size_t& MaxSize()
- {
- return _maxSize;
- }
- private:
- void* _freeList = nullptr;
- size_t _maxSize = 1;
- };
复制代码 thread cache实际就是一个数组(哈希桶),数组中存储的就是一个个的自由链表,不可能每一个字节都安排一个自由链表(申请的字节小的时候可以给大一点),至于这个数组中到底存储了多少个自由链表,就必要看在进行字节数对齐时具体用的是什么映射对齐规则了。
怎样进行对齐?
这些内存块是会被链接到自由链表上的,因此一开始肯定是按8字节进行对齐是最合适的,因为必须包管这些内存块,无论是在32位平台下还是64位平台下,都至少能够存储得下一个指针。
但如果全部的字节数都按照8字节进行对齐的话,那么我们就必要建立256 × 1024 ÷ 8 = 32768个桶,这个数量还是比较多的,实际上可以让不同范围的字节数按照不同的对齐数进行对齐。
有了字节数的对齐规则后,就必要提供两个对应的函数,分别用于获取某一字节数对齐后的字节数,以及该字节数对应的哈希桶下标。关于处理对齐和映射的函数,可以将其封装到一个类当中。
看一个计算对象巨细的对齐映射规则的类,放到Common.hpp:
- class SizeClass // 计算对象大小的对齐映射规则的类
- {
- public:
- // 整体控制在最多10%左右的内碎片浪费
- // [1,128] 8byte对齐 freelist[0,16)
- // [128+1,1024] 16byte对齐 freelist[16,72)
- // [1024+1,8*1024] 128byte对齐 freelist[72,128)
- // [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
- // [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
- static inline size_t _RoundUp(size_t bytes, size_t alignNum) // align对齐
- {
- //size_t alignSize;
- //if (bytes % alignNum != 0)
- //{
- // alignSize = (bytes / alignNum + 1) * alignNum;
- //}
- //else
- //{
- // alignSize = bytes;
- //}
- //return alignSize;
- return ((bytes + alignNum - 1) & ~(alignNum - 1)); // 即上面注释的代码
- // &上(alignNum-1)的取反,二进制后面的几个数都&成0了
- // 带入1-128,要与&的就是7的取反,就是后3位都成0,对齐到8的倍数了 -> 26 -> 26+8-1=33 -> 33 &(~7) = 32
- // 带入129-1024,要&的就是15的取反,就是后4位都成0,对齐到16的倍数了...
- }
- static inline size_t RoundUp(size_t size) // 返回size对齐以后的值
- {
- if (size <= 128) {
- return _RoundUp(size, 8);
- }
- else if (size <= 1024) {
- return _RoundUp(size, 16);
- }
- else if (size <= 8 * 1024) {
- return _RoundUp(size, 128);
- }
- else if (size <= 64 * 1024) {
- return _RoundUp(size, 1024);
- }
- else if (size <= 256 * 1024) {
- return _RoundUp(size, 8 * 1024);
- }
- else {
- assert(false); // 理论不会走到这,断言一下
- return -1; // 有些编译器会检查返回值,顺便返回一个值
- }
- }
- static inline size_t _Index(size_t bytes, size_t align_shift) // 看自己是哪个桶
- {
- //if (bytes % alignNum == 0)
- //{
- // return bytes / alignNum - 1;
- //}
- //else
- //{
- // return bytes / alignNum;
- //}
- return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
- // 1 + 7 8 -> 1左移3位是8, 8-1是7, 1到8 加7 = 8到15
- // 2 9
- // ... -> 8到15 -> 右移三位相当于除8 -> 变为1 -> 再减1,即0号桶
- // 8 15
- // 9 + 7 16
- // 10
- // ...
- // 16 23
- }
- static inline size_t Index(size_t bytes) // 计算映射的哪一个自由链表桶
- {
- assert(bytes <= MAX_BYTES);
- // 每个区间有多少个链
- static int group_array[4] = { 16, 56, 56, 56 };
- if (bytes <= 128) {
- return _Index(bytes, 3);
- }
- else if (bytes <= 1024) {
- return _Index(bytes - 128, 4) + group_array[0]; // 加上0号桶里的数量
- }
- else if (bytes <= 8 * 1024) {
- return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
- }
- else if (bytes <= 64 * 1024) {
- return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
- }
- else if (bytes <= 256 * 1024) {
- return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
- }
- else {
- assert(false);
- return -1;
- }
- }
- };
复制代码 每个线程都有一个本身独享的thread cache,那应该怎样创建这个thread cache呢?不能将这个thread cache创建为全局的,因为全局变量是全部线程共享的,如许就不可避免的必要锁来控制,增加了控制本钱和代码复杂度。
要实现每个线程无锁的访问属于本身的thread cache,必要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它地点的线程是全局可访问的,但是不能被其他线程访问到,如许就保持了数据的线程独立性。
- static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
复制代码 但不是每个线程被创建时就立马有了属于本身的thread cache,而是当该线程调用相关申请内存的接口时才会创建本身的thread cache,因此在申请内存的函数中会包含以下逻辑。
- // 通过TLS 每个线程无锁的获取自己专属的ThreadCache对象
- if (pTLSThreadCache == nullptr)
- {
- pTLSThreadCache = new ThreadCache;
- }
复制代码 5.2 ConcurrentAlloc.hpp
- #pragma once#include "ThreadCache.hpp"static void* ConcurrentAlloc(size_t size){ // 通过TLS 每个线程无锁的获取自己专属的ThreadCache对象
- if (pTLSThreadCache == nullptr)
- {
- pTLSThreadCache = new ThreadCache;
- } cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl; return pTLSThreadCache->Allocate(size);}static void ConcurrentFree(void* ptr, size_t size){ assert(pTLSThreadCache); pTLSThreadCache->Deallocate(ptr, size);}
复制代码 少写一个函数的ThreadCache.hpp:
- #pragma once#include "Common.hpp"#include "CentralCache.hpp"class ThreadCache{public: void* FetchFromCentralCache(size_t index, size_t size); // 从中心内存获取对象 void* Allocate(size_t size); // 申请内存对象 void Deallocate(void* ptr, size_t size); // 释放内存对象private: FreeList _freeLists[NFREELIST];};// TLS thread local storage 线程本地存储static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
- // 声明pTLSThreadCache为一个线程本地存储的静态变量,并初始化为nullptr/下面是函数实现void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) // 从中心内存获取对象{ return nullptr; // debug}void* ThreadCache::Allocate(size_t size) // 申请内存对象{ assert(size <= MAX_BYTES); size_t alignSize = SizeClass::RoundUp(size); size_t index = SizeClass::Index(size); if (!_freeLists[index].Empty()) { return _freeLists[index].Pop(); } else { return FetchFromCentralCache(index, alignSize); // 从下一层获取对象 }}void ThreadCache::Deallocate(void* ptr, size_t size) // 释放内存对象{ assert(ptr); assert(size <= MAX_BYTES); // 找对映射的自由链表桶,对象插入进入 size_t index = SizeClass::Index(size); _freeLists[index].Push(ptr);}
复制代码 UnitTest.cc:
- #include "ConcurrentAlloc.hpp"
- void Alloc1()
- {
- for (size_t i = 0; i < 5; ++i)
- {
- void* ptr = ConcurrentAlloc(6);
- }
- }
- void Alloc2()
- {
- for (size_t i = 0; i < 5; ++i)
- {
- void* ptr = ConcurrentAlloc(7);
- }
- }
- void TLSTest()
- {
- std::thread t1(Alloc1);
- t1.join();
- std::thread t2(Alloc2);
- t2.join();
- }
- int main()
- {
- //TestObjectPool();
- TLSTest();
- return 0;
- }
复制代码
此时每个线程都通过TLS无锁地获取了本身专属的对象。
6. central cache测试
6.1 central cache框架
thread cache中:当线程申请某一巨细的内存时,如果thread cache中对应的自由链表不为空,那么直接取出一个内存块进行返回即可,但如果此时该自由链表为空,那么这时thread cache就必要向central cache申请内存了。
central cache的布局与thread cache是一样的,它们都是哈希桶的布局,而且它们遵循的对齐映射规则都是一样的。如许做的好处就是,当thread cache的某个桶中没有内存了,就可以直接到central cache中对应的哈希桶里去取内存就行了。
central cache与thread cache的不同之处:
central cache与thread cache有两个显着不同的地方,首先,thread cache是每个线程独享的,而central cache是全部线程共享的,因为每个线程的thread cache没有内存了都会去找central cache,因此在访问central cache时是必要加锁的。
但central cache在加锁时并不是将整个central cache全部锁上了,central cache在加锁时用的是桶锁,也就是说每个桶都有一个锁。此时只有当多个线程同时访问central cache的同一个桶时才会存在锁竞争,如果是多个线程同时访问central cache的不同桶就不会存在锁竞争。
central cache与thread cache的第二个不同之处就是,thread cache的每个桶中挂的是一个个切好的内存块,而central cache的每个桶中挂的是一个个的span。
每个span管理的都是一个以页为单位的大块内存,每个桶里面的若干span是按照双链表的情势链接起来的,而且每个span里面还有一个自由链表,这个自由链表里面挂的就是一个个切好了的内存块,根据其地点的哈希桶这些内存块被切成了对应的巨细。
怎么看页号?页号本质与地址是一样的,它们都是一个编号,只不外地址是以一个字节为一个单位,而页号是以多个字节为一个单位。
每个程序运行起来后都有本身的进程地址空间,在32位平台下,进程地址空间的巨细是2^32;而在64位平台下,进程地址空间的巨细就是2^64。
页的巨细一样平常是4K或者8K,以8K为例。在32位平台下,进程地址空间就可以被分成2^32 / 2^13 = 2^19个页;在64位平台下,进程地址空间就可以被分成2^64 / 2^13 = 2^41个页,此时不能用一个无符号整型来存储页号,这必要借助条件编译来办理这个标题:
在Common.hpp:加上下面的条件编译:
- // 定义大块内存起始页的页号的类型
- #ifdef _WIN64 // 64位下两个都有定义,32位下只有_WIN32被定义了,所以_WIN64放前面
- typedef unsigned long long PAGE_ID; // 八字节
- #elif _WIN32
- typedef size_t PAGE_ID; // 四字节
- #else
- // linux下其它的宏
- #endif
复制代码 再在common.hpp加上span的界说和管理span的双向循环链表(page cache也要用):
- struct Span // 管理多个连续页大块内存跨度的结构
- {
- PAGE_ID _pageId = 0; // 大块内存起始页的页号
- size_t _n = 0; // 页的数量
- Span* _next = nullptr; // 双向链表的结构
- Span* _prev = nullptr;
- size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数
- void* _freeList = nullptr; // 切好的小块内存的自由链表
- };
- class SpanList // 双向带头循环链表
- {
- public:
- SpanList()
- {
- _head = new Span;
- _head->_next = _head;
- _head->_prev = _head;
- }
- Span* Begin()
- {
- return _head->_next;
- }
- Span* End()
- {
- return _head;
- }
- bool Empty()
- {
- return _head->_next == _head;
- }
- void PushFront(Span* span)
- {
- Insert(Begin(), span);
- }
- Span* PopFront()
- {
- Span* front = _head->_next;
- Erase(front);
- return front;
- }
- void Insert(Span* pos, Span* newSpan)
- {
- assert(pos);
- assert(newSpan);
- Span* prev = pos->_prev;
- // prev newspan pos
- prev->_next = newSpan;
- newSpan->_prev = prev;
- newSpan->_next = pos;
- pos->_prev = newSpan;
- }
- void Erase(Span* pos)
- {
- assert(pos);
- assert(pos != _head);
- Span* prev = pos->_prev;
- Span* next = pos->_next;
- prev->_next = next;
- next->_prev = prev;
- }
- private:
- Span* _head;
- public:
- std::mutex _mtx; // 桶锁
- };
复制代码 central cache的映射规则和thread cache是一样的,因此central cache里面哈希桶的个数也是208,但central cache每个哈希桶中存储就是我们上面界说的双链表布局。
每个线程都有一个属于本身的thread cache,这里是用TLS来实现每个线程无锁的访问属于本身的thread cache的。而central cache和page cache在整个进程中只有一个,对于这种只能创建一个对象的类,可以将其设置为单例模式。
单例模式可以包管系统中该类只有一个实例,并提供一个访问它的全局访问点,该实例被全部程序模块共享。单例模式又分为饿汉模式和懒汉模式,懒汉模式相对较复杂,这里使用饿汉模式就足够了。小部门CentralCache.hpp:
- #pragma once
- #include "Common.hpp"
- #include "PageCache.hpp"
- class CentralCache // 单例的饿汉模式
- {
- public:
- static CentralCache* GetInstance()
- {
- return &_sInst;
- }
- Span* GetOneSpan(SpanList& list, size_t byte_size); // // 从page cache获取一个非空的span
- // 从中心缓存获batchNum个size大小的对象给thread cache
- size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
- private:
- SpanList _spanLists[NFREELIST];
- private:
- CentralCache()
- {}
- CentralCache(const CentralCache&) = delete;
- static CentralCache _sInst;
- };
复制代码 当thread cache向central cache申请内存时,central cache应该给出多少个对象呢?这是一个值得思考的标题,如果central cache给的太少,那么thread cache在短时间内用完了又会来申请;但如果一次性给的太多了,可能thread cache用不完也就浪费了。
我们这里接纳了一个慢开始反馈调治算法。当thread cache向central cache申请内存时,如果申请的是较小的对象,那么可以多给一点,但如果申请的是较大的对象,就可以少给一点。
看个慢开始反馈调治算法的函数:(放在Common.hpp的SizeClass类里,MAX_BYTES放上面)
- static const size_t MAX_BYTES = 256 * 1024; // max_bytes 256kb
- static size_t NumMoveSize(size_t size) // 一次thread cache从中心缓存获取多少个
- {
- assert(size > 0);
- // [2, 512],一次批量移动多少个对象的(慢启动)上限值
- // 小对象一次批量上限高
- // 小对象一次批量上限低
- int num = MAX_BYTES / size;
- if (num < 2)
- {
- num = 2;
- }
- if (num > 512)
- {
- num = 512;
- }
- return num;
- }
复制代码 通过上面这个函数,就可以根据所需申请的对象的巨细计算出具体给出的对象个数,而且可以将给出的对象个数控制到2~512个之间。也就是说,就算thread cache要申请的对象再小,最多一次性给出512个对象,就算thread cache要申请的对象再大,至少一次性给出2个对象。
6.2 重要接口函数
ThreadCache.hpp里的FetchFromCentralCache:
- void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) // 从中心内存获取对象
- {
- // 慢开始反馈调节算法
- // 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
- // 2、如果你不断要这个size大小内存需求,那么batchNum就会不断增长,直到上限
- // 3、size越大,一次向central cache要的batchNum就越小
- // 4、size越小,一次向central cache要的batchNum就越大
- size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
- // windows.h里有一个min的宏,min前不能加std::了
- if (_freeLists[index].MaxSize() == batchNum)
- {
- _freeLists[index].MaxSize() += 1;
- }
- void* start = nullptr;
- void* end = nullptr; // 找CentralCache批量获取batchNum个size大小的空间
- size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
- assert(actualNum > 0); // actualNum实际得到的(不一定得到batchNum个)
- if (actualNum == 1)
- {
- assert(start == end);
- }
- else // 大于1,其余的挂到自由链表
- {
- _freeLists[index].PushRange(NextObj(start), end);
- }
- return start;
- }
复制代码 CentralCache.hpp里的FetchRangeObj:
- CentralCache CentralCache::_sInst;
- Span* CentralCache::GetOneSpan(SpanList& list, size_t size) // 从page cache获取一个非空的span
- {
- return nullptr; // debug
- }
- // 从中心缓存获batchNum个size大小的对象给thread cache
- size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
- {
- size_t index = SizeClass::Index(size);
- _spanLists[index]._mtx.lock(); // 加桶锁
- Span* span = GetOneSpan(_spanLists[index], size);
- assert(span);
- assert(span->_freeList);
- // 从span中获取batchNum个对象,如果不够batchNum个,有多少拿多少
- start = span->_freeList;
- end = start;
- size_t i = 0;
- size_t actualNum = 1; // 要返回的实际拿到的数量
- while (i < batchNum - 1 && NextObj(end) != nullptr)
- {
- end = NextObj(end); // 往后走,有多少拿多少
- ++i;
- ++actualNum;
- }
- span->_freeList = NextObj(end);
- NextObj(end) = nullptr;
- span->_useCount += actualNum;
- _spanLists[index]._mtx.unlock();
- return actualNum;
- }
复制代码
在这GetOneSpan还没写,能正常编译就行了。
7. page cache测试
7.1 page cache框架
page cache与central cache一样,它们都是哈希桶的布局,而且page cache的每个哈希桶中里挂的也是一个个的span,这些span也是按照双链表的布局链接起来的。
central cache的映射规则与thread cache保持一致,而page cache的映射规则与它们都不雷同。page cache的哈希桶映射规则接纳的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span,以此类推。
central cache每个桶中的span被切成了一个个对应巨细的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,怎样切分申请到的这个span就应该由central cache本身来决定。
page cache当中究竟有多少个桶,这就要看你最大想挂几页的span了,这里最大挂128页的span,为了让桶号与页号对应起来,可以将第0号桶空出来不消,因此必要将哈希桶的个数设置为129。
- static const size_t NPAGES = 129; // PageCache里的桶数(0号空出)
复制代码 为什么这里最大挂128页的span呢?因为线程申请单个对象最大是256KB,而128页可以被切成4个256KB的对象,因此是足够的。当然,如果你想在page cache中挂更大的span也是可以的,根据具体的需求进行设置就行了。
在page cache获取一个n页的span的过程:
在page cache的第n号桶中取出一个span返回给central cache即可,但如果第n号桶中没有span了,这时并不是直接转而向堆申请一个n页的span,而是要继续在后面的桶当中寻找span。
直接向堆申请以页为单位的内存时,应该尽量申请大块一点的内存块,因为此时申请到的内存是一连的,当线程必要内存时可以将其切小后分配给线程,而当线程将内存释放后又可以将其合并成大块的一连内存。如果我们向堆申请内存时是小块小块的申请的,那么申请到的内存就不一定是一连的了。
因此,当第n号桶中没有span时,可以继续找第n+1号桶,因为可以将n+1页的span切分成一个n页的span和一个1页的span,这时就可以将n页的span返回,而将切分后1页的span挂到1号桶中。但如果后面的桶当中都没有span,这时就只能向堆申请一个128页的内存块,并将其用一个span布局管理起来,然后将128页的span切分成n页的span和128-n页的span,此中n页的span返回给central cache,而128-n页的span就挂到第128-n号桶中。
page cache的实现梳理:
当每个线程的thread cache没有内存时都会向central cache申请,此时多个线程的thread cache如果访问的不是central cache的同一个桶,那么这些线程是可以同时进行访问的。这时central cache的多个桶就可能同时向page cache申请内存的,所以page cache也是存在线程安全标题的,因此在访问page cache时也必须要加锁。
但是在page cache这里不使用桶锁,因为当central cache向page cache申请内存时,page cache可能会将其他桶当中大页的span切小后再给central cache。此外,当central cache将某个span归还给page cache时,page cache也会尝试将该span与其他桶当中的span进行合并。
也就是说,在访问page cache时,可能必要访问page cache中的多个桶,如果page cache用桶锁就会出现大量频繁的加锁息争锁,导致程序的效率低下。因此在访问page cache时使用没有使用桶锁,而是用一个锁将整个page cache给锁住。
而thread cache在访问central cache时,只必要访问central cache中对应的哈希桶就行了,因为central cache的每个哈希桶中的span都被切分成了对应巨细,thread cache只必要根据本身所需对象的巨细访问central cache中对应的哈希桶即可,不会访问其他哈希桶,因此central cache可以用桶锁。
page cache在整个进程中也是只能存在一个的,因此这里也设计为饿汉模式:
- #pragma once
- #include "Common.hpp"
- class PageCache
- {
- public:
- static PageCache* GetInstance() // 单例模式
- {
- return &_sInst;
- }
- Span* NewSpan(size_t k); // 获取一个K页的span给CentralCache
- std::mutex _pageMtx;
- private:
- SpanList _spanLists[NPAGES];
- PageCache()
- {}
- PageCache(const PageCache&) = delete;
- static PageCache _sInst;
- };
复制代码 7.2 重要接口函数
CentralCache里的GetOneSpan(获取一个非空的span):
thread cache向central cache申请对象时,central cache必要先从对应的哈希桶中获取到一个非空的span,然后从这个非空的span中取出若干对象返回给thread cache。
首先是先遍历central cache对应哈希桶当中的双链表,如果该双链表中有非空的span,那么直接将该span进行返回即可。
如果遍历双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就必要向page cache申请内存块了。
向page cache申请多大的内存块呢?可以根据具体所需对象的巨细来决定,就像之前根据对象的巨细计算出,thread cache一次向central cache申请对象的个数上限,现在我们是根据对象的巨细计算出,central cache一次应该向page cache申请几页的内存块。
可以先根据对象的巨细计算出,thread cache一次向central cache申请对象的个数上限,然后将这个上限值乘以单个对象的巨细,就算出了具体必要多少字节,最后再将这个算出来的字节数转换为页数,如果转换后不够一页,那么我们就申请一页,否则转换出来是几页就申请几页。也就是说,central cache向page cache申请内存时,要求申请到的内存尽量能够满意thread cache向central cache申请时的上限。
SizeClass里加个NumMovePage:
- static size_t NumMovePage(size_t size) // 计算一次向系统获取几个页
- {
- size_t num = NumMoveSize(size); // thread cache一次向central cache申请对象的个数上限
- size_t npage = num * size; // num个size大小的对象所需的字节数
- npage >>= PAGE_SHIFT; // 将字节数转换为页数,npage 除8再除1024
- if (npage == 0)
- {
- npage = 1;
- }
- return npage;
- }
复制代码 PAGE_SHIFT代表页巨细转换偏移,这里以页的巨细为8K为例,PAGE_SHIFT的值就是13。
当central cache申请到若干页的span后,还必要将这个span切成一个个对应巨细的对象挂到该span的自由链表当中,首先必要计算出该span的起始地址,可以用这个span的起始页号乘以一页的巨细即可得到这个span的起始地址,然后用这个span的页数乘以一页的巨细就可以得到这个span所管理的内存块的巨细,用起始地址加上内存块的巨细即可得到这块内存块的结束位置:
- Span* CentralCache::GetOneSpan(SpanList& list, size_t size) // 从page cache获取一个非空的span
- {
- // 查看当前的spanlist中是否有还有未分配对象的span
- Span* it = list.Begin();
- while (it != list.End())
- {
- if (it->_freeList != nullptr)
- {
- return it;
- }
- else // 没有未分配的对象就往后走
- {
- it = it->_next;
- }
- }
- // 找后面要空间时先把central cache的锁解掉,别的线程释放内存就能访问这个桶
- list._mtx.unlock();
- // 走到这里说没有空闲span了,只能找page cache要
- PageCache::GetInstance()->_pageMtx.lock(); // 对page cache一整个加锁
- Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
- PageCache::GetInstance()->_pageMtx.unlock();
- // 对获取span进行切分,不需要加锁,因为此时别的线程访问不到这个span
- // 计算span的大块内存的起始地址 和大块内存的大小(字节数)
- char* start = (char*)(span->_pageId << PAGE_SHIFT); // 起始地址,如_pageId是100->100 * 8*1024
- size_t bytes = span->_n << PAGE_SHIFT; // 页数乘8K,和上一行一样
- char* end = start + bytes;
- // 把大块内存切成自由链表 链接起来
- span->_freeList = start; // 先切一块下来去做头,方便尾插
- start += size; // 加等到下一块
- void* tail = span->_freeList;
- int i = 1;
- while (start < end)
- {
- ++i;
- NextObj(tail) = start; // tail指向start
- tail = start; // tail = NextObj(tail; // 更新tail
- start += size; // start往后走
- }
- NextObj(tail) = nullptr;
- // 切好span以后,需要把span挂到桶里面去的时候,再加锁
- list._mtx.lock();
- list.PushFront(span); // span头插到list
- return span;
- }
复制代码 为什么是尾插呢?因为如果是将切好的对象尾插到自由链表,这些对象看起来是按照链式布局链接起来的,而实际它们在物理上是一连的,这时当我们把这些一连内存分配给某个线程使用时,可以提高该线程的CPU缓存使用率。
Span* PageCache::NewSpan(size_t k)获取一个k页的span:
调用上述的GetOneSpan从central cache的某个哈希桶获取一个非空的span时,如果遍历哈希桶中的双链表后发现双链表中没有span,或该双链表中的span都为空,那么此时central cache就必要向page cache申请若干页的span了,下面就来说说怎样从page cache获取一个k页的span。
因为page cache是直接按照页数进行映射的,因此要从page cache获取一个k页的span,就应该直接先去找page cache的第k号桶,如果第k号桶中有span,直讨论删一个span返回给central cache就行。所以这里必要给SpanList类添加对应的Empty和Pop函数。
如果page cache的第k号桶中没有span,就应该继续找后面的桶,只要后面恣意一个桶中有一个n页span,就可以将其切分成一个k页的span和一个n-k页的span,然后将切出来k页的span返回给central cache,再将n-k页的span挂到page cache的第n-k号桶即可。
但如果后面的桶中也都没有span,此时就必要向堆申请一个128页的span了,在向堆申请内存时,直接调用我们封装的SystemAlloc函数即可。
必要留意的是,向堆申请内存后得到的是这块内存的起始地址,此时必要将该地址转换为页号。由于向堆申请内存时都是按页进行申请的,因此直接将该地址除以一页的巨细即可得到对应的页号。
- PageCache PageCache::_sInst;
- Span* PageCache::NewSpan(size_t k) // 获取一个K页的span
- {
- assert(k > 0 && k < NPAGES);
- if (!_spanLists[k].Empty()) // 先检查第k个桶里面有没有span
- {
- return _spanLists[k].PopFront(); // 有就直接返回
- }
- // 检查一下后面的桶里面有没有span,如果有->进行切分
- for (size_t i = k + 1; i < NPAGES; ++i)
- {
- if (!_spanLists[i].Empty())
- {
- Span* nSpan = _spanLists[i].PopFront();
- Span* kSpan = new Span;
- // 在nSpan的头部切一个k页span下来返回,nSpan再挂到对应映射的位置
- kSpan->_pageId = nSpan->_pageId; // _pageId类似地址
- kSpan->_n = k;
- nSpan->_pageId += k; // 起始页的页号往后走
- nSpan->_n -= k; // 页数减等k
- _spanLists[nSpan->_n].PushFront(nSpan);
- return kSpan;
- }
- }
- // 走到这说明后面没有大页的span了->去找堆要一个128页的span
- Span* bigSpan = new Span;
- void* ptr = SystemAlloc(NPAGES - 1);
- bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; // 转换为页号
- bigSpan->_n = NPAGES - 1;
- _spanLists[bigSpan->_n].PushFront(bigSpan);
- return NewSpan(k); // 把新申请的span插入后再递归调用一次自己(避免代码重复)
- }
复制代码 向堆申请到128页的span后,必要将其切分成k页的span和128-k页的span,但是为了尽量避免出现重复的代码,最好不要再编写对应的切分代码。可以先将申请到的128页的span挂到page cache对应的哈希桶中,然后再递归调用该函数就行了,此时在往后找span时就一定会在第128号桶中找到该span,然后进行切分。
这里有一个标题:当central cache向page cache申请内存时,central cache对应的哈希桶是处于加锁的状态的,那在访问page cache之前我们应不应该把central cache对应的桶锁解掉呢?
这里建议在访问page cache前,先把central cache对应的桶锁解掉。虽然此时central cache的这个桶当中是没有内存供其他thread cache申请的,但thread cache除了申请内存还会释放内存,如果在访问page cache前将central cache对应的桶锁解掉,那么此时当其他thread cache想要归还内存到central cache的这个桶时就不会被壅闭。
因此在调用NewSpan函数之前,必要先将central cache对应的桶锁解掉,然后再将page cache的大锁加上,当申请到k页的span后,必要将page cache的大锁解掉,但此时不必要立刻获取到central cache中对应的桶锁。因为central cache拿到k页的span后还会对其进行切分操作,因此可以在span切好后必要将其挂到central cache对应的桶上时,再获取对应的桶锁。
改一下GetOneSpan函数伪代码就是如许的:
- // 找后面要空间时先把central cache的锁解掉,别的线程释放内存就能访问这个桶
- list._mtx.unlock();
- // 走到这里说没有空闲span了,只能找page cache要
- PageCache::GetInstance()->_pageMtx.lock(); // 对page cache一整个加锁
- Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
- PageCache::GetInstance()->_pageMtx.unlock();
- // 对获取span进行切分,不需要加锁,因为此时别的线程访问不到这个span
- // 切好span以后,需要把span挂到桶里面去的时候,再加锁
- list._mtx.lock();
- list.PushFront(span); // span头插到list
- return span;
复制代码 到这就可以系统的调试一下程序了,看看转换成页号的:
下一步就是释放内存了,此博客篇幅太长就放在下篇博客了。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |