Apollo Cyber 学习条记

[复制链接]
发表于 昨天 02:45 | 显示全部楼层 |阅读模式
目次
0 Introduction
What
Why
Advantage
1 Example
2 Concept
3 Flow Chart
4 Module
4.1 Transport
4.1.1 Share Memory
4.1.1.1 Segment
4.1.1.1.1 State
4.1.1.1.2 Block
4.1.1.1.3 Common
4.1.1.2 Notifier
4.1.1.2.1 ConditionNotifier
4.1.1.2.2 MulticastNotifier
4.1.2 Transmitter
4.1.2.1 IntraTransmitter
4.1.2.2 ShmTransmitter
4.1.2.3 RtpsTransmitter
4.1.2.4 HybridTransmitter
4.1.3 Receiver
4.1.3.1 IntraReceiver
4.1.3.2 ShmReceiver
4.1.3.3 RtpsReceiver
4.1.3.4 HybridReceiver
4.1.4 Summary
4.2 Service Discovery
4.2.1 Partcipant
4.2.2 Container
4.2.2.1 SingleValueWarehouse
4.2.2.2 MultiValueWarehouse
4.2.2.3 Graph
4.2.3 NodeManager
4.2.4 ServiceManager
4.2.5 ChannelManager
4.2.6 Summary
4.3 Scheduler
4.3.1 CRoutine
4.3.1.1 State
4.3.1.2 Context
4.3.1.3 ContextPool
4.3.1.4 Atomic_flag
4.3.1.5 Function
4.3.2 Processor
4.3.3 SchedulerConf
4.3.4 Classic
4.3.4.1 ClassicContext
4.3.4.2 SchedulerClassic
4.3.5 Choreography
4.3.5.1 ChoreographyContext
4.3.5.2 SchedulerChoreography
4.3.6 CRoutine Pool
4.3.6.1 TaskManager
4.3.6.2 Task
4.3.7 Summary
4.4 Data
4.4.1 CacheBuffer
4.4.2 ChannelBuffer
4.4.3 DataNotifier
4.4.4 DataDispatcher
4.4.5 DataVisitor
4.4.6 Summary
4.5 Node
4.5.1 Writer
4.5.2 Reader
4.5.2.1 Blocker
4.5.2.2 ReceiverManager
4.5.3 Client
4.5.4 Service
4.5.5 Summary
5 Others
5.1 ClassLoaderModule
5.1.1 ClassLoaderManager
5.1.2 ClassLoader
5.1.3 SharedLibrary
5.1.4 Utility
5.2 Mainboard
5.2.1 ModuleArgument
5.2.2 ModuleController
5.2.3 Main
5.3 Base
5.3.1 Signal
5.3.1.1 Slot
5.3.1.2 Signal
5.3.1.3 Connection
5.3.2 BoundedQueue
5.4 Timer
5.4.1 TimerTask
5.4.2 TimerBucket
5.4.3 TimingWheel
References


0 Introduction

What

   cyber 是⼀个专⻔为⾃动驾驶场景操持的运⾏时框架。 它基于会合式盘算模子 ,在性能、延长和数据吞吐量⽅⾯进⾏了⾼度优化。 [1]
  以下简称框架
Why



  • 为什么要写这个⽂档

    • 对于认识cyber的⼈ ,可以通过⽂档查缺补漏
    • 对于不认识cyber的⼈ ,盼望可以通过⽂档学习框架团体视图与详细逻辑 ,是⼊⻔⼿册,⽆需随处搜索⽂档看
    • 作为组内分享提要使⽤

   

  • 为什么会有 cyber [1]

    • 在⾃动驾驶场景中,须要的是⼀个有效的会合式盘算模子,对⾼性能有要求,包罗⾼并发、低延长和⾼吞吐量
    • 已经从开辟转向产物化,随着在实际天下中的⼤规模摆设,我们看到了对最⾼鲁棒性和⾼性能的需求。这就是为什么我们花了数年时间构建框架,以满⾜⾃动驾驶办理⽅案的要求

  Advantage

   

  • 使⽤ cyber 的重要长处:[1]

    • 加速开辟

      • 具有数据融合功能的界阐明白的使命接⼝
      • ⼀系列开辟⼯具
      • ⼤量传感器驱动步调

    • 简化摆设

      • ⾼效⾃顺应的消息通讯
      • 具有资源意识的可设置⽤⼾级调治步调
      • 可移植,依赖更少

    • 赋能⾃动驾驶

      • 默认的运⾏时框架
      • 为⾃动驾驶搭建专⽤模块


  1 Example

就拿保举的 talker、listener ⼊⼿,在 apollo 的编译产出⽬录中分别启动以上两个⼯具
终端 1
  1. source cyber/setup.bash
  2. export GLOG_alsologtostderr=1
  3. ./bazel-bin/cyber/examples/talker
复制代码
终端 2
  1. source cyber/setup.bash
  2. export GLOG_alsologtostderr=1
  3. ./bazel-bin/cyber/examples/listener
复制代码
⽇志
Talker
  1. I0831 12:09:54.647799 2441916 talker.cc:43] [talker]talker sent a message! No. 0
  2. I0831 12:09:55.647987 2441916 talker.cc:43] [talker]talker sent a message! No. 1
  3. I0831 12:09:56.647899 2441916 talker.cc:43] [talker]talker sent a message! No. 2
  4. I0831 12:09:57.647931 2441916 talker.cc:43] [talker]talker sent a message! No. 3
  5. I0831 12:09:58.647884 2441916 talker.cc:43] [talker]talker sent a message! No. 4
  6. I0831 12:09:59.647895 2441916 talker.cc:43] [talker]talker sent a message! No. 5
  7. I0831 12:10:00.647893 2441916 talker.cc:43] [talker]talker sent a message! No. 6
  8. I0831 12:10:01.647935 2441916 talker.cc:43] [talker]talker sent a message! No. 7
  9. I0831 12:10:02.647927 2441916 talker.cc:43] [talker]talker sent a message! No. 8
  10. I0831 12:10:03.647889 2441916 talker.cc:43] [talker]talker sent a message! No. 9
  11. I0831 12:10:04.647902 2441916 talker.cc:43] [talker]talker sent a message! No. 10
  12. I0831 12:10:05.647905 2441916 talker.cc:43] [talker]talker sent a message! No. 11
复制代码

Listener
  1. I0831 12:09:55.648399 2441894 listener.cc:23] [listener]Received message seq-> 1
  2. I0831 12:09:55.648418 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
  3. I0831 12:09:56.647986 2441894 listener.cc:23] [listener]Received message seq-> 2
  4. I0831 12:09:56.648017 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
  5. I0831 12:09:57.648029 2441894 listener.cc:23] [listener]Received message seq-> 3
  6. I0831 12:09:57.648074 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
  7. I0831 12:09:58.648011 2441894 listener.cc:23] [listener]Received message seq-> 4
  8. I0831 12:09:58.648043 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
  9. I0831 12:09:59.648080 2441894 listener.cc:23] [listener]Received message seq-> 5
  10. I0831 12:09:59.648118 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
  11. I0831 12:10:00.647995 2441894 listener.cc:23] [listener]Received message seq-> 6
  12. I0831 12:10:00.648036 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
  13. I0831 12:10:01.648051 2441894 listener.cc:23] [listener]Received message seq-> 7
  14. I0831 12:10:01.648074 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
  15. I0831 12:10:02.648077 2441894 listener.cc:23] [listener]Received message seq-> 8
  16. I0831 12:10:02.648123 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
  17. I0831 12:10:03.647994 2441894 listener.cc:23] [listener]Received message seq-> 9
  18. I0831 12:10:03.648032 2441894 listener.cc:24] [listener]msgcontent->Hello, apollo
复制代码
出现以上⽇志即代表 demo 运⾏乐成,两个模块可以进⾏实时通讯。看起来很简朴的功能,实在 cyber 内部的流程相称复杂。流程后⾯会详述。
源码


  • Talker: https://github.com/ApolloAuto/apollo/blob/master/cyber/examples/talker.cc
  • Listener: https://github.com/ApolloAuto/apollo/blob/master/cyber/examples/listener.cc
如今你的视⻆大概是这个样⼦,只是⼀个宏观

2 Concept

cyber 中有很多的概念,要想弄懂 框架,就要先弄懂这些概念,下⾯枚举⼀部分常⽤的根本概念


  • Component

    • 基于 cyber 开辟的模块,⽐如感知、推测、规划、控制、监控监控均以 Component 的情势存在, Component 之间使⽤ Channel 大概 Service 进⾏通讯
    • 两个基于框架开辟组件的⼩例子

      • Component
      • TimerComponent


  • Channel

    • Component 之间进⾏发布/订阅之间的通道,雷同于 kafka 中的 topic 的概念

      • Writer

        • 负责向 Channel ⾥⾯发布数据

      • Reader

        • 负责从 Channel ⾥⾯订阅数据



  • Service

    • Component 之间进⾏双向通讯的载体,就是传统的 C/S 模式, 雷同于传统 RPC 中的概念

      • Server

        • 创建 Service 的地⽅即为 Server, 负责处置惩罚 Client 的哀求

      • Client

        • ⽤户对服务发出哀求,并得到相应



  • Node

    • 是 Writer/Reader 和 Server/Client 的容器,在 Componet 中利⽤ Node 来实现差异的通讯情势
    • 视⻆变革


  • Topology

    • 在⼀个由浩繁模块构成体系中,每个模块中的服务发现模块都会生存当前体系其他模块的信息,⽤有向图来表现。Writer、Reader、Server、Client 都是有向图中的极点,Channel 是 Writer 指向 Reader 的边,Service是 Server 指向 Client 的边。因此我们称 Writer 和 Server为上游,Reader 和 Client 为鄙俚。

⼀个随意的假设体系⽰意图如下

3 Flow Chart

⼀般性来讲 ,⼀个框架内部会有若⼲个流程 ,⾄少会有两个流程 ,⼀个初始化 ,⼀个数据流。 对于cyber 来讲 , 内部也会有相干流程


  • 初始化流程
  • Writer/Reader 模子的发布订阅流程
  • Client/Server 模子的交互流程
  • 调治流程
  • 服务发现流程
4 Module

框架 内部就是三个⼤模块 ,分别为


  • 通讯
  • 服务发现
  • 调治
三个⼤模块依赖或多或少依赖其他⼩模块来做的。
4.1 Transport



  • 代码⽬录

    • cyber/transport

  • 设置协议

    • cyber/proto/transport_conf.proto

  • 模块作⽤

    • 消息通过差异⽅式进⾏发送与吸取

      • Intra

        • 同历程内部通讯

      • SHM

        • 同呆板差异历程通过共享内存的⽅式通讯

      • Rtps

        • 差异呆板差异历程通过 DDS ⽅式通讯

      • Hybrid

        • ⾃顺应 Intra、SHM、 Rtps 功能进⾏通讯



  • 模块流程

    • 提交消息
    • 吸取消息

4.1.1 Share Memory

框架内部的共享内存实现的根本思绪是将消息的本⾝存在可读写的 Segment 中,然后将消息的 Meta 关照到监听器中。
   参考 Linux 共享内存形貌 [2]
  System V 共享内存是较旧的共享内存 API。POSIX 共享内存提供了⼀个更简朴、更好地可设
  计的接⼝。
  另⼀⽅⾯,相⽐于 System V 共享内存⽅式,POSIX 共享存储器不太⼴泛(尤其是在较旧的系
  统上)
  4.1.1.1 Segment



  • XsiSegment

    • System V shared memory

      • ShmXXX
      • 不可写回⽂件体系


  • PosixSegment

    • POSIX shared memory

      • mmap
      • 可写回⽂件体系


   一个共享内存的例子[3]
  每个 Channel 都独占一个 Segment,每个 Segment 在内存中的表现图如下 (v8.0)

v10.0排布发生了变革
State --> Block --> ArenaBlock --> Buffer_x --> ArenaBuffer_x

新增一个ArenaSegment,布局图如下

4.1.1.1.1 State

状态类中就有四个原子变量以及相应的 Get、Set、Add 利用。


  • need_remap_: 标记当前的 Segment 是否要重新初始化

    • 当消息的长度大于 Block 中 buf 的长度时间,设置这个标记为 true

  • seq_: 当前的 block 索引,也可以明白为RingBuffer的索引,只会⾃增,外层会做取余利用
  • reference_count_:当前 Segment 的引用计数,为0时会真正析构
  • ceiling_msg_size_:消息长度上限
4.1.1.1.2 Block

Block 内部很简朴,就三个成员变量


  • lock_num_:原子变量,用于带次数的读自旋,写入互斥,浅易的读写锁
  • msg_size_: 消息的长度
  • msg_info_size_: 消息 Meta 的长度
内部实际的消息 buffer地点是在 Segment 类中持有
4.1.1.1.3 Common

无论是基于哪种方式实现的共享内存,实际的 buffer 的获取去读写的流程是一样的。
获取一个可写 buffer 逻辑


  • 判定当前参数合法性(WritableBlock)
  • 判定当前 Segment 是否初始化,如果没有初始化须要去初始化(OpenOrCreate)
  • 根据消息的长度判定是否须要重新初始化
  • 不停的找到下一个可以写的块,找到为止

    • 原子索引State::seq(uint32_t)不停增大,并与块的数目取余,实现了一个 RingBuffer

  • 添补 WritableBlock,并返回乐成
获取一个可读 buffer 逻辑


  • 判定当前参数合法性(ReadableBlock)
  • 判定当前 Segment 是否初始化,如果没有初始化须要去初始化(OpenOnly)
  • 判定当前可读块的索引是否合法
  • 根据 state 的标记位是否须要重新初始化
  • 根据对应的可读索引获取相干的读锁
  • 获取相干的 buffer
4.1.1.2 Notifier

被共享的数据在写到 Segment 中后,应该有某种方式去关照监听者,框架内部实现了两种方式
4.1.1.2.1 ConditionNotifier


通过 System V 共享内存的方式,每个呆板上开辟一块内存。
读者通过不停查抄内存中数据是否变革,其内部数据布局也一个固定巨细(4096)的 RingBuffer,buffer 中的每个元素是可读信息的 Meta(ReadableBlock)。
4.1.1.2.2 MulticastNotifier

无非是利用 UDP 的方式举行组播关照,不再赘述。


  • 关照方发送消息到指定的 fd

    • sendto

  • 通过监听指定的 fd 是否读停当,停当后再读取消息

    • poll
    • recvfrom

4.1.2 Transmitter

通讯模块利用数据提交器(Transmitter)来发送数据
留意:Transmitter 与 Channel 是逐一对应的关系,即一个 Transmitter 只能对一个 Channel 举行提交数据
4.1.2.1 IntraTransmitter

同历程内部通讯相比力跨历程方式非常简朴,省去的中心的媒介,直接调用 IntraDispatcher 的回调就可以。但是利用场景也受限定,由于很少有同历程的 Writer/Reader 的需求,一样平常都是历程级别的通讯。
4.1.2.2 ShmTransmitter

共享内存方式提交数据流程 (平常方式)


  • 查抄共享内存提交器是否被启用
  • 根据待提交消息的长度去申请相应巨细的可写块
  • 将待提交消息和 Meta 序列化到可写块中
  • 根据可写块、Channel、Host 创建一个可读信息(ReadableBlock)
  • 将可读信息关照给监听器
零拷贝提交数据流程


  • 调用Writer申请一个Message指针

    • 调用链

      • Writer --> HybirdTransmitter --> ShmTransmitter --> ProtobufArenaManager::AcquireArenaMessage

    • 申请逻辑

      • 对ArenaBuffer上写锁
      • 基于ArenaBuffer创建Arena
      • 基于Arena创建一个自界说析构的Message指针

        • 析构中判定解写锁

      • 留意: ArenaBuffer并没有解写锁


  • 用户添补Message
  • 调用Writer的Write接口将添补好的Message写出去

    • 申请一个ArenaBlock
    • 创建一个MessageWrapper
    • 把消息序列化到MessageWrapper中,并取得新消息的指针
    • 将MessageWrapper中的数据拷贝(memcpy)到ArenaBuffer (Segment)中, 1K

      • ExtendedStruct数据

        • addr_offset
        • channel_id


    • 序列化MessageInfo到ArenaBuffer末了
    • 根据可写块、Channel、Host 创建一个可读信息(ReadableBlock)
    • 将可读信息关照给监听器

4.1.2.3 RtpsTransmitter

流程


  • 查抄 Rtps 提交器是否被启用
  • 序列化消息成字符串到 UnderlayMessage 信息内里去

    • 留意: Rtps 方式通报的消息是 UnderlayMessage 布局

  • 根据消息 Meta 来构建 rtps 的写参数
  • 调用 rtps 的 publisher 写消息

    • eprosima::fastrtps 库实现 Write 函数天然支持关照,以是无需利用者关心

4.1.2.4 HybridTransmitter

HybridTransmitter 可以明白为根据拓扑变革动态开启差异方式通讯的 Transmitter。
内部在创建&&初始化的时间,初始化了三种通讯方式的实例,只不外没有 enable。
现在框架中的 Writer 默认创建的是 HybridTransmitter,在 Writer 到场拓扑前,在 ChannelManager 注册了一个回调,当有 Channel 级别的拓扑变革消息到临时,会实行这个回调,回调内部会根据拓扑信息(是否同 Channel,是否同呆板,是否同历程)举行判定,如果属于当前 Writer 的 Channel,会动态的调用对应 Transmitter 的 enable 以此来做真正的开启对应 Transmitter 的功能。
   这也就是为什么 talker/listener 模子中 listener 纵然比 talker 先启动却还是收不到第一个(序号为 0)包的缘故原由,那就是 talker 内部的 Writer 写第一个消息时拓扑中还没有 listener 的 Reader,以是 Writer 在写消息前应该判定一下是否有对应的 Reader 在读。固然,纵然没有提前判定,实际上也没有真正的写入消息,只不外从语义上更加完备而已。
  issue链接: https://github.com/ApolloAuto/apollo/issues/14989
  4.1.3 Receiver

通讯模块利用数据吸取器(Receiver)来吸取数据,内部的Dispatcher临时忽略即可。
4.1.3.1 IntraReceiver

同历程内部的 Receiver 无需启动线程举行监听,只须要初始化长处理差异 Channel 的 handler 即可。IntraTransmitter 写入即触发。
Why: 只不外 IntraDispatcher 内部有封装了一层名为 ChannelChain 的布局来做实际存储回调&&关照的地方,差异于一样平常的映射,它多了一层 message_type 的判定
4.1.3.2 ShmReceiver

共享内存监听方启动了一个单独的线程,这个线程还可以单独设置与 cpu 亲和性设置
流程


  • notifier 监听到有可读信息
  • 判定当前可读信息是否合法

    • 是否是本呆板信息
    • 当前 channel 是否有可读段

  • 从当前 channel 的可读段中申请可读块
  • 实行回调(非用户)

    • 从可读块中分析成 pb
    • 将消息写到指定的 ChannelBuffer 内里并关照对应协程工作
    • v10.0版本留意读锁解锁位置,存疑?

  • 从当前 channel 的可读段中开释可读块
4.1.3.3 RtpsReceiver

Rtps 吸取器借助 eprosima::fastrtps::Subscriber 实现非常简朴,创建 subscriber 的时间注册一个 listener 即可,消息监听机制在 rtps 内部就已经做好了。
4.1.3.4 HybridReceiver

对应 HybridTransmitter,HybridReceiver 也会根据拓扑变革来动态监听来自差异通讯方式的消息。
框架内部的 Reader 默认创建的是 HybridReceiver,内部包罗 Intra、Shm、Rtps receiver。Reader 初始化时也在 ChannelManager 中注册了一个回调,会根据拓扑变革来动态的开启和关闭对应的 Receiver。
4.1.4 Summary

平常共享内存的过程v8.0

零拷贝共享内存的过程v10.0

学习完通讯模块,如今的视角应该是这个样子。

4.2 Service Discovery

   在传统互联网大型后端微服务架构中,服务与服务之间经常以 RPC(Remote Procedure Call)情势举行调用,那么就要知道对方的服务在那里,有多少,是否康健,服务发现需求就应运而生。以 BNS(Baidu Naming Service)为例,Client 只须要知道鄙俚服务的名字(可以明白为一个字符串,实际上是一个服从某种规则的字符串),就可以通过 Baidu-RPC 举行访问,非常简朴。BNS 负责提供服务注册、服务获取和康健查抄的功能,Baidu-RPC 基于 BNS 给的 IP 列表举行做负载均衡。BNS 就可以明白为一种服务发现的实现,只不外是中心化的,全部的服务都必须要注册才可以。
  而 cyber 框架差异于互联网大型后端架构,是运行在车载 os(终端)上,以是就不大概引入一个中心化的组件来做这件事,究竟上也不须要,由于在算力有限定的情况下,服务数目也是有限定的,因此须要借助广播等情势举行去中心化的服务发现机制,实际上 DDS 协议非常得当此场景,框架层也接纳 eProsima 实现的 Fast-DDS 来实现服务发现功能。
框架层的服务发实际现位于 cyber/service_discovery 目次中,可以看到,一个节点(历程)有一个完备的拓扑管理单例 TopologyManager,内部由 NodeManager、ChannelManager 和 ServiceManager 三部分构成。
4.2.1 Partcipant

一个 TopologyManager 内里还包罗一个 DDS 中 Participant 的实例,名字是由 hostname+pid 格式构成。
Participant 在被创建时注册了一个回调,用来监听实体离开拓扑的消息并关照全部的 Manager。
三个子 Manager 在初始化时各自基于共同的 Partcipant 上创建差异 Topic 的发布者和订阅者。
4.2.2 Container

每一个 Manager 都须要特别的数据布局来存储拓扑信息,下面逐一先容
4.2.2.1 SingleValueWarehouse

std::unordered_map的一个 Wrapper,并提供了相应的 Add、Remove、Search、GetAllRoles 接口。
4.2.2.2 MultiValueWarehouse

std::unordered_multimap的一个 Wrapper,并提供了相应的 Add、Remove、Search、GetAllRoles 接口。
4.2.2.3 Graph

Graph 用来存储有向图, 和讲义上的连接矩阵和连接表差异,一样平常学习的时间,极点的编号一样平常是从 0 到 n-1,以是可以接纳一个二维向量大概数组+链表形貌。实际上利用的时间,编号不大概这么整齐大概都是数字。框架中利用 NodeName 作为极点的名字,利用两层无序映射容器来实现极点之间的对应关系。
   std::unordered_map<std::string, std::unordered_map<std::string, Vertice>>
  形貌为第⼀层 key 是有向边的尾,第⼆层 value 为有向边的头,边(第⼆层key)⼀般指的是
  Channel(channel_name+dst.name)
  究竟上这个已经⾜以形貌⼀个 有向图了
  别的实现了一个数据布局来形貌边的聚集,这个比力特别,理论上一个有向边只能有两个极点,但是在框架内里实际上一个 Channel 可以有多个读者和写者,以是形貌时接纳同 Channel 的边放在一个聚集内里去。在图中做插入和删除时是以边为单位的。只有一个完备的边才会真正的放在极点与极点之间的关系中去,也就是说,边存在中心状态和完全状态。
   struct RelatedVertices {
    std::unordered_map<std::string, Vertice> src; // key: node_name
    std::unordered_map<std::string, Vertice> dst;
  };
  std::unordered_map<std::string, RelatedVertices>; // key: channel_name
  4.2.3 NodeManager

通过 NodeManager 可以找到这个拓扑中的全部的 Node,以是 NodeManager 只能管理 node。
   其在 Participant 上注册的 Topic 为 node_change_broadcast。
  内部须要借助 SingleValueWarehouse 来生存拓扑信息,每一个 node_id 对应一个 Node Meta
值得留意的是,如果拓扑中有两个名字雷同的 Node,那么后创建的 Node 地点的历程(通常是新启动的历程)会退出。
   通过调试Monitor知道,如果让被监控监控组件暴力退出(意味着没有析构的机遇),那么NodeManager将收不到对方离开拓扑的消息,只能依赖Partcipant自动探活机制来更新拓扑。
  4.2.4 ServiceManager

通过 ServiceManager 可以找到全部的 Server 和 Client。
   其在 Participant 上注册 Topic 为 service_change_broadcast。
  和传统差异,框架中的创建 Service 的地方就被称为 Server,有且只有一个,而 Client 和传同一样,可以有多个。以是 ServiceManager 接纳 SingleValueWarehouse 来存储 Server,用 MultiValueWarehouse 来存储 Client,key 同一为 ServiceID,从布局储存上也可以看出来一个 ServiceID 对应一个 Server,一个 ServiceID 对应多个 Client。
4.2.5 ChannelManager

通过 ChannelManager 可以找到全部的 Writer 和 Reader。
   其在 Participant 注册 Topic 为 channel_change_broadcast。
  ChannelManager 提供了三种维度的数据存储和查询


  • 利用 Graph 来存储 Node 之间的关系
  • 利用 MultiValueWarehouse 来存储以下四种数据

    • channel_writer

      • Key: Channel ID

    • channel_reader

      • Key: Channel ID

    • node_writer

      • Key: Node ID

    • node_reader

      • Key: Node ID


通过以上的数据布局可以实现查询 Node 与 Node 之间的关系,可以查询对应的 Key(node_id or channel_id)上有没有 Writer 大概 Reader。
一个利用场景,我们在创建一个 Writer 并预备些数据前,可以通过 ChannelManager 实例来判定一下当前的 channel 是否有 reader,如果没有,我们可以先不写数据。
4.2.6 Summary

如今的视角大概是如许

4.3 Scheduler

   所谓的调治,肯定是体系资源和运利用命的抵牾,如果体系资源富足多,那么就不须要调治了,也没有调治的须要。调治的作用就是在资源有限的情况下,公道利用体系资源,使体系的服从最高。[6]
  借用教科书上的一句话,历程是体系资源分配的单位,线程是 CPU 调治的单位。
那么在框架里,协程就是调治器调治的单位。
4.3.1 CRoutine

一些着名的协程库,好比腾讯的 libco[7],百度的 bthread[8](也被称为 M:N 线程库),另有 Go 语言原生自带的 Goroutine[9]等都是开放给用户在业务模块中利用。框架内部实现的协程库不但仅在框架内部利用,通过 Reader、Component 等封装在给用户利用,也可以封装成Task提供给用户利用。
一点差异的是,框架内部的生命周期是跟随创建他的主体(Reader、Component 等)的,除非主体不存在,协程才会真正的从调治器中移除。
从它的默认栈巨细也能看出来这个库是否是轻量级别的。
   CRoutine 默认栈巨细 2MB, bthread 默认(最小)栈巨细是两个内存页巨细(通常是 8KB), libco 默认的栈巨细是 128KB,Goroutine 最小的栈巨细是 2KB。固然,协程栈巨细是可以动态增长的,通常也会有一个上限,这里不详细叙述。
  4.3.1.1 State

一共五种状态


  • READY

    • 协程停当

  • FINISHED

    • 协程竣事

  • SLEEP

    • 休眠
    • 现在 Sleep 状态框架内险些不会用,业务层利用也会包管休眠时间控制

  • IO_WAIT

    • 期待 IO
    • 现在只有 io 模块在用(io 模块现在框架内没有启用,提供给业务层用)

  • DATA_WAIT

    • 期待数据

READY 状态可以和 SLEEP、IO_WAIT、DATA_WAIT 在特定的条件下相互转化。在协程生命周期竣事时(Reader 或 Component 析构)状态切换成 FINISHED 并退出。
4.3.1.2 Context

协程实例中持有协程上下文的指针,而栈空间是在上下文中保管的,框架内部利用汇编语言实现了协程栈在寄存器上的上下文交换来实现协程停止和规复。
4.3.1.3 ContextPool

框架中也规定了要有协程上下文的上限,同时要节流创建&&烧毁上下文的开销,以是接纳一个池化的概念,借助 base::CCObjectPool(一个 lock-free 的单链表)来实现,上下文池巨细由历程内的 Component 数目和设置中的 routine_num 的最大值决定。固然,真的到最大值后,框架也没有做硬限,而是单独的创建新的上下文来利用,直至比及池子内里有空闲的上下文。
一个 shared_ptr 带 Deleter 的用法:在上下文池创建实例时,本身塞了一个 Deleter 给了实例的 shared_ptr,如许再协程退出时能做到自动开释池中上下文
4.3.1.4 Atomic_flag

协程内部有两个 atomic_flag,都是利用 test_and_set 机制来实现相干功能。
lock_用来做协程的获取和开释的。
updated_用来做数据关照的,如果有数据到临,updated_会被 clear,如果当前协程正处于 DATA_WAIT 大概 IO_WAIT 状态,就把状态置为 READY。
4.3.1.5 Function

有两种,第一种是针对框架内部的利用,好比Reader
直接用只有一个消息的RoutineFactory举例子,多参数雷同。意思就是不停的利用DataVisitor取数据,取到了实行用户的回调(拜见4.5.2 Reader),实行完继承让出实行权,期待下一次调治。
  1. template <typename M0, typename F>
  2. RoutineFactory CreateRoutineFactory(
  3.     F&& f, const std::shared_ptr<data::DataVisitor<M0>>& dv) {
  4.   RoutineFactory factory;
  5.   factory.SetDataVisitor(dv);
  6.   factory.create_routine = = {
  7.     return = {
  8.       std::shared_ptr<M0> msg;
  9.       for (;;) {
  10.         CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
  11.         if (dv->TryFetch(msg)) {
  12.           f(msg);
  13.           CRoutine::Yield(RoutineState::READY);
  14.         } else {
  15.           CRoutine::Yield();
  16.         }
  17.       }
  18.     };
  19.   };
  20.   return factory;
  21. }
复制代码
第二种是针对业务模块开放的协程池,不停的从BoundedQueue中取使命实行
  1. auto func = this {
  2.     while (!stop_) {
  3.       std::function<void()> task;
  4.       if (!task_queue_->Dequeue(&task)) {
  5.         auto routine = croutine::CRoutine::GetCurrentRoutine();
  6.         routine->HangUp();
  7.         continue;
  8.       }
  9.       task();
  10.     }
  11. };
复制代码
4.3.2 Processor

对于协程来说,Processor 相称于逻辑上的 CPU,每个 Processor 内部会有一个线程,这个线程会根据设置和指定的 CPU 设置亲和性,包管线程的局部调治,提升 CPU 缓存的掷中率。
处置惩罚器内部的线程就干一件事,不停地从 ProcessContext 中获取下一个停当的协程,获取后对协程举行规复(Resume)。获取不到数据停当协程时间,会调用上下文的 Wait 函数举行期待。
只不外根据 ProcessContext 内部的获取停当协程计谋逻辑差异,分为了 ClassicContext 和 ChoreographyContext。
真正创建和生存处置惩罚器的地方就是调治器。而调治器根据派发使命和关照机制又被分成两种计谋,也恰恰对应了两种 Context。
4.3.3 SchedulerConf

调治器设置协议: cyber/proto/scheduler_conf.proto,由于调治器实现与设置强相干,且现在没有一份针对框架调治器设置详细的阐明文档,业务层想要设置本身的调治计谋难上加难,根本都用的默认的 SchedulerClassic 计谋,参数还都是默认的,并不能发挥出框架的全部上风。
SchedulerConf 是在 cyber_conf.proto 中的第一个元素,我们只讨论他的自身。
针对字段逐一表明,可以对比着 cyber/conf/example_sched_classic.conf 或 cyber/conf/example_sched_choreography.conf 看,可以更加直观形象。
SchedulerConf
  1. import "classic_conf.proto";
  2. import "choreography_conf.proto";
  3. message InnerThread {
  4.   optional string name = 1;
  5.   optional string cpuset = 2;
  6.   optional string policy = 3;
  7.   optional uint32 prio = 4 [default = 1];
  8. }
  9. message SchedulerConf {
  10.   optional string policy = 1;
  11.   optional uint32 routine_num = 2;
  12.   optional uint32 default_proc_num = 3;
  13.   optional string process_level_cpuset = 4;
  14.   repeated InnerThread threads = 5;
  15.   optional ClassicConf classic_conf = 6;
  16.   optional ChoreographyConf choreography_conf = 7;
  17. }
复制代码

字段表明


  • Schedulerconf

    • policy

      • 指定当进步程接纳何种调治计谋
      • 取值范围

        • classic
        • choreography


    • routine_num

      • 当进步程协程数目,4.3.1.3 中协程上下文池的巨细参考值之一

    • default_proc_num

      • 逻辑处置惩罚器的数目,可以明白为 4.3.2 中 Processor 的数目

    • process_level_cpuset

      • 当进步程的主线程可以利用的 cpuset,并和这些 cpu 设置亲和性,关于亲和性,参考引用 10
      • 格式

        • 0-7,16-23 : 代表利用 0~7 号和 16~23 号 cpu
        • 0,2,4,6,8: 代表利用第 0、2、4、6、8 号 cpu

      • 疑问

        • 调治器被初始化应该是在框架的 Init 函数中,Init 函数在 main 函数中调用,那为什么把主线程和这些 cpu 设置亲和性?主线程须要到场调治?


    • threads

      • 框架内部线程的设置,好比 async_log、shm 等线程
      • name

        • 内部线程的名字,好比上述的 async_log、shm

      • cpuset

        • 内部线程可利用的 cpuset,格式参考 process_level_cpuset

      • policy

        • Linux 体系 POSIX 线程调治的计谋,参考引用 11
        • 取值范围, 寄义参考引用 12

          • SCHED_FIFO
          • SCHED_RR
          • SCHED_OTHER

            • 接纳 Linux 体系默认时分调治,框架只会设置线程的优先级。参考引用 13



      • prio

        •                    Processes scheduled under one of the real-time policies (SCHED_FIFO, SCHED_RR) have a sched_priority value in the range 1(low) to 99 (high)
              

        • 如果policy设置成SCHED_FIFO或SCHED_RR,那么此线程就是体系的实时线程。这就对应Linux 体系线程调治的优先级, sched_param 中的 sched_priority 参数,和 nice value 差异,这个数字越高优先级越高
        • 取值范围⼀般是 1 到 99,参考引⽤ 12



ClassicConf
  1. message ClassicTask {
  2.   optional string name = 1;
  3.   optional uint32 prio = 2 [default = 1];
  4.   optional string group_name = 3;
  5. }
  6. message SchedGroup {
  7.   required string name = 1 [default = "default_grp"];
  8.   optional uint32 processor_num = 2;
  9.   optional string affinity = 3;
  10.   optional string cpuset = 4;
  11.   optional string processor_policy = 5;
  12.   optional int32 processor_prio = 6 [default = 0];
  13.   repeated ClassicTask tasks = 7;
  14. }
  15. message ClassicConf {
  16.   repeated SchedGroup groups = 1;
  17. }
复制代码

字段表明


  • name

    • 调治计谋组名字

  • processor_num

    • 调治计谋组的逻辑 CPU 数目(Processor 数目),同时也是 task_pool_size_的值
    • 留意,这个没有默认值,须要填故意义的数字

  • affinity

    • 亲和性的范例
    • 取值

      • 1to1

        • 一个线程对应一个 cpu

      • range

        • 一个线程对应一组 cpu



  • cpuset

    • 与 InnerThread::cpuset 划一,不再赘述

  • processor_policy

    • 与 InnerThread::policy 划一,不再赘述

  • processor_prio

    • 与 InnerThread::prio 划一,不再赘述

  • ClassicTask

    • name

      • 协程的名字

    • prio

      • 协程优先级
      • 数字越大,优先级越高
      • 范围

        • 0 ~ 19


    • group_name

      • 调治计谋组名字


ChoreographyConf
  1. message ChoreographyTask {
  2.   optional string name = 1;
  3.   optional int32 processor = 2;
  4.   optional uint32 prio = 3 [default = 1];
  5. }
  6. message ChoreographyConf {
  7.   optional uint32 choreography_processor_num = 1;
  8.   optional string choreography_affinity = 2;
  9.   optional string choreography_processor_policy = 3;
  10.   optional int32 choreography_processor_prio = 4;
  11.   optional string choreography_cpuset = 5;
  12.   optional uint32 pool_processor_num = 6;
  13.   optional string pool_affinity = 7;
  14.   optional string pool_processor_policy = 8;
  15.   optional int32 pool_processor_prio = 9;
  16.   optional string pool_cpuset = 10;
  17.   repeated ChoreographyTask tasks = 11;
  18. }
复制代码

字段表明


  • choreography_processor_num

    • 可编排逻辑 CPU 数目

  • choreography_affinity

    • 可编排 Processor 的亲和性
    • 取值

      • range
      • 1to1


  • choreography_processor_policy

    • 可编排 Processor 的线程调治计谋

      • 与 InnerThread::policy 作用&&取值划一


  • choreography_processor_prio

    • 可编排 Processor 的线程调治优先级

      • 与 InnerThread::prio 作用&&取值划一


  • choreography_cpuset

    • 可编排 Processor 的可利用 cpu 聚集

      • 与 InnerThread::cpuset 作用&&取值划一


  • pool_processor_num

    • 使命池 ClassicProcessor 数目, 也是协程池的巨细

  • pool_affinity

    • 使命池 ClassicProcessor 亲和性

  • pool_processor_policy

    • 使命池 ClassicProcessor 调治计谋

  • pool_cpuset

    • 使命池 ClassicProcessor 利用的 cpu 聚集

  • ChoreographyTask

    • name

      • 协程名字

    • processor

      • 处置惩罚器 ID

    • prio

      • 协程优先级
      • 数字越大,优先级越高


4.3.4 Classic

Classic 调治模子有点雷同于 Golang 语言中最早的 GMP 调治模子,多个逻辑 CPU 共同竞争一个全局队列,为了克制队列的 race-condition,以是要加互斥锁,影响了一部分性能。
框架中的 Classic 调治模子由 ClassicContext、Processor、SchedulerClassic 三个类来实现。
4.3.4.1 ClassicContext

ClassicContext 是真正持有全局队列的地方,根据差异的 GroupName 分别成差异的全局队列,全局队列与全局队列之间相互独立,互不影响。ClassicContext 被创建时间就会指定相应的组名字,如果没有指定,会利用默认的全局队列。
一个 ClassicContext 对象持有一个全局队列的指针,一个 Processor 对应一个 ClassicContext,也就是说,多个 Processor 固然对应多个 ClassicContext,但实际的队列只有一个。
获取下一个协程逻辑比力简朴,每次都从最大优先级的队列开始遍历,找到一个状态停当的协程举行返回。这个函数也是真正被多个 Processor 竞争的。这种调治方式从理论上讲是有大概造成低优先级的协程饿死。

4.3.4.2 SchedulerClassic

初始化逻辑


  • 读指定的设置文件

    • 设置文件名字一样平常是历程启动-p 参数指定的名字,背面加上conf 后缀,且在 conf 目次下的

  • 把设置文件内容序列化成 pb,如果序列化乐成

    • 生存内部线程设置
    • 生存 cpuset 信息,并设置和主线程的亲和性
    • 生存自界说协程调治设置信息

  • 查抄设置协程组长度,如果没有设置则天生一个默认的

    • 默认逻辑 CPU 数目为 2
    • 默认协程组就一个(全局队列就一个)

  • 创建每个协程组的的逻辑 CPU(Processor),对于每一个 Processor

    • 创建 ClassicContext 并绑定到 Processor 上
    • 根据设置设置 Processor 内部线程的亲和性

派发使命逻辑


  • 获取指定协程的互斥锁
  • 按照协程 ID 生存协程的映射
  • 按照协程的设置来设置协程组和优先级信息,如果没有设置则进入默认协程组
  • 按照协程组和优先级来将写成推入 ClassicContext 中的队列
  • 通过 ClassicContext 关照指定协程组有新的停当使命须要处置惩罚
关照 Processor 使命逻辑(被绑定到 DataVisitor 的回调中)


  • 查抄调治器是否克制(unlikely)
  • 根据协程 ID 找到对应协程实例
  • 如果协程处于 DATA_WAIT 大概 IO_WAIT 状态,就设置更新标记(4.3.1.4 中的 update_标记位)
  • 通过 ClassicContext 关照指定协程组有新的停当使命须要处置惩罚
下图为亲和性为 1to1 时 SchedulerClassic 浅易模子图

4.3.5 Choreography

Choreography 调治模子有点雷同于 Golang 语言中如今的 GMP 调治模子,通过当地队列+全局队列的方式来淘汰锁竞争。只不外没有实现 work-stealing 机制,究竟上在终端上也用不上这个机制,由于协程数目有限定。
框架中的 Choreography 调治模子由 ChoreographyContext、Processor、SchedulerChoreography 三个类实现。
4.3.5.1 ChoreographyContext

ChoreographyContext 中的队列由 std::multimap 来实现,Key 为优先级,Value 是协程,并将 std::greater 作为 Compare 类,意味着遍历的时间会根据 Key 从大到小来举行,以此来实现从优先级高到优先级低的次序举行调治。
一个 ChoreographyContext 内里一个队列,一个 Processor 绑定一个 ChoreographyContext,也就是说,Processor 不必与其他 Processor 去竞争,这就是所谓的当地队列。
获取下一个协程的逻辑就是遍历 std::multimap,找到第一个停当的协程举行返回,理论上还是存在低优先级协程饿死题目。

4.3.5.2 SchedulerChoreography

初始化逻辑


  • 读指定的设置文件

    • 设置文件名字一样平常是历程启动-p 参数指定的名字,背面加上。conf 后缀,且在 conf 目次下的

  • 把设置文件内容序列化成 pb,如果序列化乐成

    • 生存内部线程设置
    • 生存 cpuset 信息,并设置和主线程的亲和性
    • 获取 Choreography 相干设置
    • 获取 Classic 相干设置
    • 生存自界说协程调治设置信息

  • 查抄 Processor 数目,如果没有设置则设置成默认的数目,为 2
  • 根据设置创建 Processor,分为两部分

    • Choreography

      • 创建 ChoreographyContext,并绑定到 Processor 上
      • 根据设置设置 Processor 内部线程与 cpu 的亲和性

    • Classic

      • 创建 ClassicContext,并绑定到 Processor 上
      • 根据设置设置 Processor 内部线程与 cpu 的亲和性


使命派发逻辑


  • 获取指定协程的互斥锁
  • 按照协程 ID 生存协程的映射
  • 根据设置来设置相干协程信息

    • Processor
    • Priority

  • 如果当前协程的 ProcessorID 小于 Choreography Processor 的数目,把协程入当地队列,否则,入全局队列
关照 Processor 使命逻辑(被绑定到 DataVisitor 的回调中)


  • 查抄调治器是否克制(unlikely)
  • 根据协程 ID 找到对应协程实例
  • 如果协程处于 DATA_WAIT 大概 IO_WAIT 状态,就设置更新标记(4.3.1.4 中的 update_标记位)
  • 如果当前协程是当地队列的使命,则通过指定 Processor 的 ChoreographyContext 去关照,否则通过 ClassicContext 关照指定协程组有新的停当使命须要处置惩罚。

4.3.6 CRoutine Pool

协程对于用户是透明的,但是用户还是想用怎么办?
框架提供了协程池,协程池的巨细取决于4.3.3中ChoreographyConf::pool_processor_num。
4.3.6.1 TaskManager

内部持有一个巨细为1000的BoundedQueue用于做生产者斲丧者模子的队列。
TaskManager在初始化时,会通过无DataVisitor的RoutineFactory来创建协程池中的多少协程。
每个协程一旦从队列中取数据,就可以实行相干的使命。拜见4.3.1.5中第二种方式。
4.3.6.2 Task

入口代码: cyber/task/task.h的Async函数。
这个函数判定是否是实际模式,是的话向TaskManager推使命,反之则用std::async举行模仿。
在这个文件中,还封装了协程大概线程的Yield、Sleep等利用,算是可以间接的利用协程。
4.3.7 Summary


4.4 Data

数据模块重要是针对框架内部的数据流转做一个派发、存储、关照等机制。
我们按照存储、关照、派发的次序举行先容。
4.4.1 CacheBuffer

就是一个线程安全的RingBuffer,用来真正存储消息指针的地方。
内部buffer由std::vector实现,由两个uint64的头尾指针举行控制。
4.4.2 ChannelBuffer

ChannelBuffer就是CacheBuffer的一个Wrapper,内部额外存储一个uint64值当做channel_id。
这也就意味着一个ChannelBuffer只能处置惩罚一个channel的数据。
4.4.3 DataNotifier

DataNotifier是一个单例类,内部持有channel_id对应一系列callback的映射。
布局如图所示

按照channel_id的维度举行关照,理论上大概会实行多个回调。
4.4.4 DataDispatcher

DataDispatcher是只有一个泛型参数的单例类,内部管理多少个ChannelBuffer的映射。
别的有一个存有DataNotifier的单例指针来做数据关照。
对外提供两个对外接口


  • AddBuffer

    • 将ChannelBuffer和channel_id做映射

  • Dispatch

    • 将对应channel的数据写入到多少个ChannelBuffer内里去,并关照对应的协程干活(更新协程update_状态,拜见4.4.5)


4.4.5 DataVisitor

DataVisitor是一个1~4个泛型参数的模板类,泛型的数目决定内部持有的ChannelBuffer的数目。
拿最常用的1个泛型的数据访问器举例。
内部持有一个ChannelBuffer来存储对应channel的数据,这个buffer交由DataDispatcher来管理。
用一个Notifier来做数据关照,这个Notifier交由DataNotifier管理。
对外就提供一个TryFetch函数供获取数据,这个函数是在协程中实行。
在创建协程时,DataVisitor被传入了一个回调放在了Notifier内里。
  1. if (visitor != nullptr) {
  2.     visitor->RegisterNotifyCallback([this, task_id]{
  3.       if (cyber_unlikely(stop_.load())) {
  4.         return;
  5.       }
  6.       // NotifyProcessor里面实际更改了协程的update_标志,剩下的交由调度器进行调度
  7.       this->NotifyProcessor(task_id);
  8.     });
  9. }
复制代码

4.4.6 Summary


4.5 Node

Node是cyber的底子构件,每个模块都有Node且都通过Node举行通讯。
一个模块可以有差异的通讯范例通过Node界说reader/writer 大概 server/client。
也可以说,Node相称于Writer、Reader、Client和Server的容器
内部含有两个实现类


  • NodeChannelImpl

    • 用于创建Writer和Reader,在实例化时会调用NodeManager的Join函数来到场到当前拓扑中。

  • NodeServiceImpl

    • 用于创建Service和Client

4.5.1 Writer

真正提供给用户利用的类,创建Writer后,用户可以通过Write函数来向Channel内里写数据。
Writer初始化流程


  • 判定是否初始化,初始化过直接返回
  • 创建Transmitter
  • 在ChannelManager中注册回调函数,以此来监听拓扑变革
  • 通过ChannelManager获取Channel上全部Reader
  • 对Channel上全部Reader开启Transmitter的写入功能
  • 调用ChannelManager的Join函数以Writer的身份到场当前拓扑中
拓扑监听回调流程


  • 判定当前ChangeMsg的role是否是Reader,不是则过滤
  • 判定当前Reader的Channel是否和当前Channel相称,不相称则过滤
  • 根据Reader的利用(到场 or 离开)来打开大概关闭transmitter的功能。
别的,Writer通过封装ChannelManager提供GetReaders函数来获取对应的Reader聚集,不必非要直接调用ChannelManager
4.5.2 Reader

Writer写入的数据,必须由Reader来读。同样的,Reader也是提供给用户利用的类。
Reader有两种利用方式,一种是读缓存,另一种是在创建时传入回调,在有数据时自动实行。
方式一: 回调
  1. auto reader = node->CreateReader<Chatter>(
  2.     channel_name, [&](const std::shared_ptr<Chatter>& msg) {
  3.     // your logic
  4. });
复制代码

方式二:
  1. auto reader = node->CreateReader<Chatter>(channel_name);
  2. // 1.获取最新的数据,单条
  3. auto message = reader->GetLatestObserved();
  4. // 2.获取最新的数据,多条
  5. reader->SetHistoryDepth(100);
  6. reader->Observe();
  7. for (auto it = reader->Begin(); it != reader->End(); ++it) {
  8. // your logic for *it
  9. }
复制代码
回调的方式很好明白,数据来的时间趁便实行一下。
同步读的方式,实在是读Reader内部的缓存,缓存由Blocker实现。
Reader的初始化流程


  • 判定是否初始化过
  • 对用户传的回调函数举行封装,如果用户没有传回调,则利用默认调用缓存入队函数

    • 末了的回调落入协程之中实行,拜见4.3.1.5代码 f(msg)

  • 创建DataVisitor
  • 根据封装好的回调函数和DataVisitor创建RoutineFactory
  • 用scheduler实例来创建新的协程使命
  • 通过ReceiverManager来创建receiver
  • 获取ChannelManager实例并注册拓扑变革监听函数
  • 通过ChannelManager实例获取Channel上的Writer,并打开receiver的对应功能
  • 调用ChannelManager的Join函数将当前的Reader到场到全局拓扑中
Reader的拓扑监听函数逻辑


  • 判定当前ChangeMsg的role是否是Writer,不是则过滤
  • 判定当前Writer的Channel是否和当前Channel相称,不相称则过滤
  • 根据Writer的利用(到场 or 离开)来打开大概关闭receiver的功能
4.5.2.1 Blocker

代码位于cyber/blocker/blocker.h
内部含有两个用std::list实现的容量有限定的队列


  • published_msg_queue_

    • 用于记载已经发布的数据,理论上应该比CacheBuffer的数据落后一点点

  • observed_msg_queue_

    • 用于Reader的临时观察,利用时会对published_msg_queue_举行浅拷贝

Reader的SetHistoryDepth函数就是用来设置这两个队列的长度的
4.5.2.2 ReceiverManager

ReceiverManager代码位于cyber/node/reader_base.h中,我们只关心的此中的回调函数的逻辑。
  1. [](const std::shared_ptr<MessageT>& msg,
  2.   const transport::MessageInfo& msg_info,
  3.   const proto::RoleAttributes& reader_attr) {
  4.   (void)msg_info;
  5.   (void)reader_attr;
  6.   PerfEventCache::Instance()->AddTransportEvent(
  7.       TransPerf::DISPATCH, reader_attr.channel_id(),
  8.       msg_info.seq_num());
  9.   data::DataDispatcher<MessageT>::Instance()->Dispatch(
  10.       reader_attr.channel_id(), msg);
  11.   PerfEventCache::Instance()->AddTransportEvent(
  12.       TransPerf::NOTIFY, reader_attr.channel_id(),
  13.       msg_info.seq_num());
  14. }
复制代码

只有一行代码有效,就是调用了DataDispatcher的Dispatch函数(作用拜见4.4.4)
这个回调应该是框架中最复杂的回调,复杂的缘故原由并不是内部逻辑,而是他颠末了层层的Wrapper,中心另有一次动态的注册。Transport::CreateReceiver默认创建的HybridReceiver,以是这个回调先是散落在三种Receiver中,在根据拓扑变革动态注册到对应的XXXDispatcher中。
留意: Transport中的XXXDispatcher和data:ataDispatcher不要搞混
4.5.3 Client

Client内部实际上用两个Channel和C++异步编程(Promise&&shared_future)实现的。
内部通过4.1.2.3形貌的RtpsTransmitter写哀求的Channel,并通过future举行超时期待。
通过4.1.3.3形貌的RtpsReceiver来吸取相应的Channel,在回调处set_value即可。
Client初始化函数就是创建RtpsTransmitter和RtpsReceiver。
通过一个std::unordered_map存储pending request。
4.5.4 Service

同Client一样,内部还是有两个Channel和RtpsTransmitter&&RtpsReceiver。
Service在内部利用std::list作为队列来实现生产者斲丧者模子。
RtpsReceiver在被创建时塞入的回调是将带有处置惩罚客户端哀求函数的lambda表达式入队列,
Service有启动时有单独的线程斲丧这个队列,处置惩罚哀求(实行用户回调)后通过RtpsTransmitter写入到相应Channel内里。
4.5.5 Summary

那么如今的视角应该是这个样子

5 Others

撤除重要模块 ,框架层还包罗其他辅助模块来实现差异的功能 ,同⼤模块⼀样告急。
5.1 ClassLoaderModule

在cyber中 ,基于框架开辟的应⽤模块满是以动态链接库的情势存在 ,以是就须要有辅助类将这些动 态链接库中的类加载出来并可以大概根据类的名字创建相应的对象 ,这些类都是Component大概
TimerComponent的派⽣类。 从调⽤栈次序上⼀⼀先容
5.1.1 ClassLoaderManager

内部生存着动态链接库路径和对应加载后ClassLoader的实例的映射(std::map) ,对外提供 LoadLibrary函数 ,函数内部简朴判重 ,不存在即创建后返回。
根据类名字创建类对象


  • 获取当前全部的ClassLoader实例
  • 遍历全部的ClassLoader ,如果ClassLoader⾥有这个类
  • 调⽤ClassLoader的创建Object接⼝并返回创建好对象的shared_ptr
5.1.2 ClassLoader

ClassLoader类内部并没有持有动态链接库的句柄 ,⽽是将句柄放到了Utility的static变量中。 内部只生存当前动态链接库的路径和对应的引⽤计数(加载库 和类对象)。
各种判定类函数也是依赖 Utility中的函数 ,⽐如ClassLoader构造函数内调⽤ utility:oadLibrary来加 载指定路径下的动态链接库,可以说ClassLoader就是utility中函数的OOP的Wrapper。
判定类是否在这个ClassLoader内


  • 调⽤ utility::GetValidClassNames获取这个ClassLoader全部的类名字
  • 通过std::find来探求指定的类名字 根据名字创建指定类对象
  • 判定当前路径的库是否被加载 ,如果没有 ,⽴刻加载
  • 通过utility::CreateClassObj来创建类的对象
  • 创建乐成对象的引⽤计数+1
  • 创建shared_ptr管理创建乐成的对象 ,并传⼊Deleter在指针析构时⾃动引⽤计数-1和删除对象指 针
  • 返回这个shared_ptr
5.1.3 SharedLibrary

SharedLibrary是动态链接库的实体 ,⼀个SharedLibrary对应⼀个动态链接库 ,在构造时通过dlopen 打开动态链接库并生存相应的handle, 通过dlsym函数和handle可知道当前库是否有对应的符号。
构造时大概会抛非常 ,须要外层函数进⾏捕捉。
5.1.4 Utility

这⾥⾯是若⼲个C风格函数 ,有⼏个全局的数据布局来生存当前状态。


  • ClassClassFactoryMap

    • std::map<std::string, std::map<std::string, AbstractClassFactoryBase*>>

      • 第⼀层key: 基类名字
      • 第⼆层key: 派⽣类名字


  • LibPathSharedLibVector

    • std::vector<std::pair<std::string, std::shared_ptr<SharedLibrary>>>

  • ClassFactoryVector

    • std::vector<AbstractClassFactoryBase*>

重要先容⼏个重要的函数
加载动态链接库(LoadLibrary)


  • 判定这个动态链接库是否被加载过 ,如果被加载过 ,则将当前的ClassLoader和全部有关系的 ClassFactory做关联
  • 获取当前动态链接库全部的ClassFactory而且每⼀个ClassFactory做映射利用
  • 加递归锁
  • 设置当前生动的ClassLoader
  • 设置当前加载的库名字
  • 创建SharedLibrary的shared_ptr
  • 设置当前加载的库名字为空
  • 设置当前生动的ClassLoader为空
  • 解锁
  • 如果SharedLibrary对象为空 ,返回失败
  • 将库名字和SharedLibrary对象的Pair添加到LibPathSharedLibVector⾥⾯去
根据类的名字创建对象


  • 获取递归锁
  • 根据基类名字获取派⽣类Map (ClassClassFactoryMap第⼆层)
  • 在派⽣类Map中找到对应派⽣类的ClassFactory
  • 开释递归锁

    • 如果ClassFactory不为空且包罗当前的ClassLoader 。 调⽤ClassFactory的CreateObj函数创建对象

  • 返回创建好的对象
注册类函数


  • 根据class_name和base_class_name创建utility::ClassFactory的对象cf
  • 通过cf来设置ClassLoader和其关联的库名字

    • 疑问:为什么获取到的ClassLoader和库名字不为空???
    • 答复: RegisterClass函数可以明白为是在shared_library = SharedLibraryPtr(new SharedLibrary(library_path));这个语句后执⾏的 ,以是取到的数据不为空。深层缘故原由是动态链 接库⼀旦被打开 ,优先执⾏宏下令!

  • 获取ClassClassFactoryMap互斥锁并加锁
  • 将cf按照映射关系放到ClassClassFactoryMap中
  • 获取ClassClassFactoryMap互斥锁并解锁
5.2 Mainboard

这个模块会被编译成⼀个可执⾏的⼆进制⽂件mainboard ,全部基于cyber开辟的模块 会被编译成动态链接库 ,通过mainboard进⾏启动。
5.2.1 ModuleArgument

就是把主函数的argc和argv翻译成std::string的过程 ,通过getopt_long来实现 ,总体过程就是通例的 分析 ,没什么意思。
选项
寄义
C++数据
备注
-d
dag⽂件
std::vector<std::string>
-d 后⾯可以指定多个dag⽂件
-p
历程名字
std::string
框架设置⽂件名字根据这个选项决定
-h
资助信息
-
-
5.2.2 ModuleController

负责创建全部的dag⽂件中的Component和TimerComponent对象并缓存起来。 加载过程


  • 根据设置初始化log
  • 将全部的dag⽂件路径全部转为绝对路径
  • 对于每⼀个dag⽂件

    • 将dag⽂件内的格式化信息转为proto buf
    • 对于每⼀个动态链接库

      • 将动态链接库的路径转为绝对路径并查抄路径是否存在 ,不在则终⽌加载流程
      • 利⽤ClassLoaderManager加动态链接库加载进来

    • 对于每⼀个Component

      • 利⽤ClassLoaderManager创建对象并初始化
      • 缓存初始化好的对象

    • 对于每⼀个TimerComponent

      • 利⽤ClassLoaderManager创建对象并初始化
      • 缓存初始化好的对象


5.2.3 Main

这⾥是main函数地点的地⽅ ,可以明白为框架的⼊⼝。
重要叙述⼀下游程


  • ⽤ModuleArgument进⾏分析下令⾏参数
  • 初始化框架
  • 初始化ModuleController
  • 调⽤框架层的WaitForShutdown不绝地期待竣事
  • 清空ModuleController
5.3 Base

base模块都是封装好的底子数据布局 ,供其他模块进⾏使⽤。
5.3.1 Signal

框架中通讯模块&&服务发现模块均使⽤Signal来存储&&触发回调。 ⼀共由三个类来实现


  • Signal
  • Connection
  • Slot
三个类均是担当变⻓泛型参数的模板类
回调界说: using Callback = std::function<void(Args...)> 即返回值为空的不定⻓参数的std::function
5.3.1.1 Slot

Slot内部就⼀个Callback和⼀个标记位。 标记位⽤来标记当前毗连是否可⽤。
重载利用符()来实现执⾏回调。
5.3.1.2 Signal

内部⽤std::list来存储Slot的shared_ptr ,对外提供Connect和Disconnect函数来注册和删除Slot。 重载利用符()实现批量关照 ,须要学习的是 ,他是先浅拷贝信号集⾥⾯的全部Slot ,然后串⾏执⾏。
5.3.1.3 Connection

Signal注册回调乐成会给⽤⼾返回⼀个Connection对象 ,⽤⼾可以通过这个对象进⾏断连。 构造时被传⼊Slot和Signal指针 ,通过Slot可以知道毗连状态 ,通过Signal可以删除Slot。
5.3.2 BoundedQueue

有界队列是通过池化技能+原⼦头尾指针控制+差异的期待计谋来实现的。
内部池⼦在初始化时已经申请好内存 ,包罗头尾指针的内存 ,以是池⼦⼤⼩=指定⼤⼩+2( head and tail)。
⼊队时间利⽤原⼦变量(tail)的CAS机制来获取⼀个元素进⾏写⼊。 出队时也利⽤原⼦变量(head)的CAS机制来获取头部元素进⾏返回。
同样提供带期待的⼊队&&出队接⼝ ,是利⽤WaitStrategy实现 ,默认是SleepWaitStrategy。
   CAS机制均⽤的weak版本 ,weak版本据官⽅先容说有偶发性的的⽐较乐成也会返回失败的情 况 ,但是性能⽐strong版本的⾼ ,详细参考引⽤14
  5.4 Timer

如果有⼀个使命须要周期性来执⾏ ,那么就要使⽤定时器。
框架层中的TimerComponent通过把Proc注册到定时器中进⾏实现。
从操持角度上看 ,⼀个使命应该对应⼀个Timer对象 ,以是Timer对象中持有对应使命的指针⽤于插⼊ 和开释。
团体上定时器内部是实现了时间轮模子的变种 ,有着O(1)时间复杂度去维护多个定时器 ,感爱好的同 学可以直接参考原始论⽂[15]。
定时器提供了三个设置参数


  • Period : 执⾏的隔断时间 ,单位是ms
  • Callback: 须要执⾏的回调
  • Oneshot

    • True: 执⾏⼀次
    • False: 周期性执⾏

启动函数


  • 判定当前是否为真实模子 ,不是的直接返回
  • 判定当前定时器是否启动过 ,如果没有启动过

    • 初始化定时器使命 ,如果初始化乐成 ,将当前定时器使命插⼊到时间轮中

停⽌函数


  • 判定是否停⽌过&&定时器使命是否为空 ,如果是第⼀次停⽌ ,那么将使命重置为空
值得⼀提的是初始化周期性运⾏的定时器使命时 ,会把⽤户的使命(callback)封装⼀层 ,内部会根据预
期触发隔断&&实际触发隔断&&使命单次执⾏时间来盘算出相干时间赔偿参数 ,并调⽤时间轮的 AddTask接⼝将使命添加到下次执⾏的Bucket中。
5.4.1 TimerTask

TimerTask是⼀个全部成员都开放的布局体 ,内部包罗⼀个函数包装器和若⼲个uint64_t变量。
• timer_id_: Timer的id ,由⼀个全局uint64_t原⼦变量累加形成。
• callback: ⽤户回调的Wrapper
• interval_ms: ⽤户指定的执⾏的时间隔断
• remainder_interval_ms:
• next_fire_duration_ms:
• accumulated_error_ns:
• last_execute_time_ns: 前次执⾏的开始时间
5.4.2 TimerBucket

是⼀个std::list ,每⼀个元素是TimerTask的weak_ptr ,并对外提供线程安全的AddTask接⼝。
5.4.3 TimingWheel

时间轮是⼀个单例,意味着整个历程就⼀份 ,和定时器是1:N的关系。
这个和以往的单时间轮+round模子差异 ,框架层实现了个⼆级的时间轮。 ⼀级时间轮为512个bucket ,⼆级时间轮有64个bucket ,精度为2ms。
时间轮单独启动⼀个线程 ,指针2ms从⼀级时间轮⾛⼀个bucket ,如果bucket不为空 ,会把其上⾯的
使命全部都推⼊框架层中的协程池(线程池) 中执⾏。 ⾛完⼀圈之后指针为0时⼆级指针⾛⼀个 bucket ,并把对应⼆级bucket的task添加到⼀级时间轮上。

  • 时间精度:2ms一个tick,工作轮处置惩罚1024ms内的使命
  • 溢出处置惩罚:高出1024ms的使命进入辅助轮,按1024ms/槽分级
  • 级联机制:工作轮转完一圈(512 ticks)后,辅助轮推进一槽,并将该槽使命重新分配到工作轮
  • 使命实行:每次tick处置惩罚当前工作轮槽内的全部使命,异步实行回调
References


  • https://github.com/ApolloAuto/apollo/blob/master/cyber/doxy-docs/source/CyberRT_FAQs.md
  • https://man7.org/linux/man-pages/man7/shm_overview.7.html
  • https://users.cs.cf.ac.uk/Dave.Marshall/C/node27.html
  • https://github.com/eProsima/Fast-DDS/blob/master/README.md
  • RTI Connext DDS
  • https://github.com/daohu527/dig-into-apollo/tree/main/cyber/source
  • https://github.com/Tencent/libco/blob/master/co_routine.h
  • https://github.com/apache/brpc/tree/master/src/bthread
  • https://github.com/golang/go
  • https://man7.org/linux/man-pages/man3/pthread_setaffinity_np.3.html
  • https://man7.org/linux/man-pages/man3/pthread_setschedparam.3.html
  • https://man7.org/linux/man-pages/man7/sched.7.html
  • https://linux.die.net/man/2/setpriority
  • https://zh.cppreference.com/w/cpp/atomic/atomic/compare_exchange
  • http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表