【计划模式】如何用C++实现观察者模式【发布订阅机制】 ...

张春  论坛元老 | 2024-12-19 02:40:17 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1033|帖子 1033|积分 3099

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
【计划模式】如何用C++实现观察者模式【发布订阅机制】

一、问题背景

代码质量影响生存质量。最近工作中频仍打仗各种计划模式,深刻领会到优秀的计划模式不光能明显降低后续维护的压力,还能提升开发服从。观察者模式作为一种降低耦合度、进步扩展性的利器,其计划模式思想让人受益匪浅。
已往曾读过一本《训练的心态》,书中提出了这样的问题:当我们专注于过程时,每每会偏离目标,越专注离目标也就越远;当我们关注目标时,要么无法专注于过程,要么因目标与实际的差距而产生放弃的念头。
如何把握过程与目标的关系,大概须要我们用一种观察者的聪明和心态,即同时使用两个视角来思索:

  • 第一种视角作为被观察者,允许自身专注,沉浸于过程,最好进入心流状态。
  • 第二种视角作为观察者,观察者观察沉浸于过程的执行者,时常检查执行者是否偏离目标,然后提示执行者调整策略。
当掌握这两种视角后,做到极致的情况下,自身始终存在一种理性的观察者视角:当自身愤怒时,观察者能提供一种理性的视角,以避免行为造成无法担当的结果。如果能做到这一点,仅仅是愤怒的话又有何不可呢。
观察者模式中这两种视角是低耦合的,而作为普通人,难免经常会将两种视角混淆在一起。
”破山中贼易,破心中贼难“,要让心中始终存在一种理性的聪明,大概还须要在事上学、心上练。把握人性的度量,从而实现“从心所欲而不逾矩”的理想品德。
二、什么是观察者模式?

观察者模式(Observer Pattern)是一种常见的计划模式,属于行为型模式。它界说了一种一对多的依赖关系,使适当一个对象的状态发生改变时,其全部依赖对象(观察者)都会自动收到关照并更新。这种模式常用于事件驱动的体系。
观察者模式的核心概念:


  • 被观察者(Subject):也称为发布者,它维护着一个观察者列表,当它的状态发生变化时,会关照全部观察者。
  • 观察者(Observer):也称为订阅者,它订阅了某个被观察者,当被观察者状态发生变化时,它会收到关照并执行相应的操作。
三、观察者模式工作原理


  • 注册观察者: 观察者向被观察者注册,把自己添加到被观察者的观察者列表中。
  • 状态改变: 被观察者的状态发生变化。
  • 关照观察者: 被观察者遍历观察者列表,依次关照全部注册的观察者。
  • 更新: 观察者收到关照后,根据自己的逻辑进行更新。
四、为什么使用观察者模式?


  • 低耦合: 被观察者和观察者之间是低耦合的,它们之间不须要知道对方的具体实现细节。
  • 可扩展性: 可以动态地增长或删除观察者,而不须要修改被观察者或其他观察者的代码。
  • 复用性: 观察者模式可以被应用于各种场景,进步代码的复用性。
五、实现步骤

以发布订阅为例,须要实现以下三个组件:

  • Publisher:用于发布订阅事件,承担了观察者模式中的Subject的状态改变功能。
  • TopicServer:用于中央化管理Topic订阅事件和Subscriber观察者列表,承担了中的Subject的关照功能。
  • Subscriber:用于订阅Topic,当发布者Publisher发布消息时,订阅服务器TopicServer会关照全部订阅该Topic的订阅者Subscriber。
1. 订阅者类Subscriber

./subscribe/Subscribe.h
  1. #ifndef SUBSCRIBE_H
  2. #define SUBSCRIBE_H
  3. #include <string>
  4. namespace Observer
  5. {
  6.     class Subscriber
  7.     {
  8.     public:
  9.         virtual ~Subscriber() = default;
  10.         virtual void Update(const std::string &topic, const std::string &message) = 0;
  11.     };
  12. }
  13. #endif
复制代码
./subscribe/SubscribeImpl.h
  1. #ifndef SUBSCRIBEIMPL_H
  2. #define SUBSCRIBEIMPL_H
  3. #include <Subscriber.h>
  4. namespace Observer
  5. {
  6.     class SubscriberImpl : public Subscriber
  7.     {
  8.     public:
  9.         explicit SubscriberImpl(const std::string& subscriberName);
  10.         ~SubscriberImpl() override;
  11.         void Update(const std::string &topic, const std::string &message) override;
  12.     private:
  13.         std::string m_name;
  14.     };
  15. }
  16. #endif
复制代码
./subscribe/SubscribeImpl.cpp
  1. #include <SubscriberImpl.h>
  2. #include <iostream>
  3. using namespace Observer;
  4. SubscriberImpl::SubscriberImpl(const std::string& subscriberName) : m_name(subscriberName) {}
  5. SubscriberImpl::~SubscriberImpl() {}
  6. void SubscriberImpl::Update(const std::string &topic, const std::string &message)
  7. {
  8.     std::cout << "Subscriber [" << m_name << "] received message on topic [" << topic << "]: " << message << std::endl;
  9. }
复制代码
2. 发布订阅中央类TopicServer

./topicServer/TopicServer.h
  1. #ifndef TOPICSERVER_H
  2. #define TOPICSERVER_H
  3. #include <string>
  4. #include <memory>
  5. #include <Subscriber.h>
  6. namespace Observer
  7. {
  8.     class TopicServer
  9.     {
  10.     public:
  11.         virtual ~TopicServer() = default;
  12.         virtual void Subscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber) = 0;
  13.         virtual void Unsubscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber) = 0;
  14.         virtual void Notify(const std::string& topic, const std::string& message) = 0;
  15.     };
  16. }
  17. #endif
复制代码
./topicServer/TopicServerImpl.h
  1. #ifndef TOPICSERVERIMPL_H
  2. #define TOPICSERVERIMPL_H
  3. #include <TopicServer.h>
  4. #include <unordered_map>
  5. #include <vector>
  6. namespace Observer
  7. {
  8.     class TopicServerImpl : public TopicServer
  9.     {
  10.     public:
  11.         explicit TopicServerImpl();
  12.         ~TopicServerImpl() override;
  13.         void Subscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber) override;
  14.         void Unsubscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber) override;
  15.         void Notify(const std::string& topic, const std::string& message) override;
  16.     private:
  17.         std::unordered_map<std::string, std::vector<std::shared_ptr<Subscriber>>> m_topicSubscriber;
  18.     };
  19. }
  20. #endif
复制代码
./topicServer/TopicServerImpl.cpp
  1. #include <TopicServerImpl.h>
  2. #include <algorithm>
  3. using namespace Observer;
  4. TopicServerImpl::TopicServerImpl() {}
  5. TopicServerImpl::~TopicServerImpl() {}
  6. void TopicServerImpl::Subscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber)
  7. {
  8.     m_topicSubscriber[topic].push_back(subscriber);
  9. }
  10. void TopicServerImpl::Unsubscribe(const std::string& topic, std::shared_ptr<Subscriber> subscriber)
  11. {
  12.     auto& subscribersList = m_topicSubscriber[topic];
  13.     auto itBegin = std::remove(subscribersList.begin(), subscribersList.end(), subscriber);
  14.     subscribersList.erase(itBegin, subscribersList.end());
  15.     if (subscribersList.size() == 0) {
  16.         m_topicSubscriber.erase(topic);
  17.     }
  18. }
  19. void TopicServerImpl::Notify(const std::string& topic, const std::string& message)
  20. {
  21.     if (m_topicSubscriber.contains(topic)) {
  22.         for (auto subscriber : m_topicSubscriber[topic]) {
  23.             subscriber->Update(topic, message);
  24.         }
  25.     }
  26. }
复制代码
3. 发布者类Publisher

./publisher/Publisher.h
  1. #ifndef PUBLISHER_H
  2. #define PUBLISHER_H
  3. #include <string>
  4. namespace Observer
  5. {
  6.     class Publisher
  7.     {
  8.     public:
  9.         virtual ~Publisher() = default;
  10.         virtual void PublishMessage(const std::string& topic, const std::string& message) = 0;
  11.     };
  12. }
  13. #endif
复制代码
/publisher/PublisherImpl.h
  1. #ifndef PUBLISHERIMPL_H
  2. #define PUBLISHERIMPL_H
  3. #include <Publisher.h>
  4. #include <TopicServer.h>
  5. #include <memory>
  6. namespace Observer
  7. {
  8.     class PublisherImpl : public Publisher
  9.     {
  10.     public:
  11.         explicit PublisherImpl(std::shared_ptr<TopicServer> topicServer);
  12.         ~PublisherImpl() override;
  13.         void PublishMessage(const std::string& topic, const std::string& message) override;
  14.     private:
  15.         std::shared_ptr<TopicServer> m_server;
  16.     };
  17. }
  18. #endif
复制代码
/publisher/PublisherImpl.cpp
  1. #include <PublisherImpl.h>
  2. using namespace Observer;
  3. PublisherImpl::PublisherImpl(std::shared_ptr<TopicServer> topicServer) : m_server(topicServer) {}
  4. PublisherImpl::~PublisherImpl() {}
  5. void PublisherImpl::PublishMessage(const std::string& topic, const std::string& message) {
  6.     m_server->Notify(topic, message);
  7. }
复制代码
4. main函数调用

./main.cpp
  1. #include <PublisherImpl.h>
  2. #include <TopicServerImpl.h>
  3. #include <SubscriberImpl.h>
  4. namespace {
  5.     constexpr std::string TOPICA {"TopicA"};
  6.     constexpr std::string TOPICB {"TopicB"};
  7. }
  8. using namespace Observer;
  9. int main()
  10. {
  11.     std::shared_ptr<TopicServer> server = std::make_shared<TopicServerImpl>();
  12.     // 创建观察者
  13.     std::shared_ptr<Subscriber> subscriber1 = std::make_shared<SubscriberImpl>("subscriber1");
  14.     std::shared_ptr<Subscriber> subscriber2 = std::make_shared<SubscriberImpl>("subscriber2");
  15.     std::shared_ptr<Subscriber> subscriber3 = std::make_shared<SubscriberImpl>("subscriber3");
  16.     std::shared_ptr<Subscriber> subscriber4 = std::make_shared<SubscriberImpl>("subscriber4");
  17.     // 订阅主题
  18.     server->Subscribe(TOPICA, subscriber1);
  19.     server->Subscribe(TOPICA, subscriber2);
  20.     server->Subscribe(TOPICA, subscriber4);
  21.     server->Subscribe(TOPICB, subscriber1);
  22.     server->Subscribe(TOPICB, subscriber3);
  23.     server->Subscribe(TOPICB, subscriber4);
  24.     std::shared_ptr<Publisher> publisher = std::make_shared<PublisherImpl>(server);
  25.     // 发布消息
  26.     publisher->PublishMessage(TOPICA, "Hello from TopicA!");
  27.     publisher->PublishMessage(TOPICB, "Greetings from TopicB!");
  28.     // 移除订阅者
  29.     server->Unsubscribe(TOPICA, subscriber4);
  30.     server->Unsubscribe(TOPICB, subscriber4);
  31.     // 发布消息
  32.     publisher->PublishMessage(TOPICA, "Update from TopicA after unsubscribe!");
  33.     publisher->PublishMessage(TOPICB, "Update from TopicB after unsubscribe!");
  34.     return 0;
  35. }
复制代码
5. 编写CMakeLists.txt

  1. cmake_minimum_required(VERSION 3.10)
  2. set(ProjectName Observer)
  3. project(${ProjectName})
  4. set(CMAKE_CXX_STANDARD 20)
  5. set(CMAKE_CXX_STANDARD_REQUIRED True)
  6. include_directories(
  7.     ./publisher
  8.     ./subscriber
  9.     ./topicServer
  10. )
  11. file(GLOB LIB_FILE
  12.     publisher/*
  13.     subscriber/*
  14.     topicServer/*)
  15. message(${LIB_FILE})
  16. add_executable(${ProjectName}
  17.     main.cpp
  18.     ${LIB_FILE})
复制代码
此时文件树布局如下:
  1. .
  2. ├── CMakeLists.txt
  3. ├── main.cpp
  4. ├── publisher
  5. │   ├── Publisher.h
  6. │   ├── PublisherImpl.cpp
  7. │   └── PublisherImpl.h
  8. ├── subscriber
  9. │   ├── Subscriber.h
  10. │   ├── SubscriberImpl.cpp
  11. │   └── SubscriberImpl.h
  12. └── topicServer
  13.     ├── TopicServer.h
  14.     ├── TopicServerImpl.cpp
  15.     └── TopicServerImpl.h
复制代码
6. 编译运行

  1. mkdir build
  2. cd build
  3. cmake ..
  4. make -j12
  5. ./Observer
复制代码
运行效果如下
  1. Subscriber [subscriber1] received message on topic [TopicA]: Hello from TopicA!
  2. Subscriber [subscriber2] received message on topic [TopicA]: Hello from TopicA!
  3. Subscriber [subscriber4] received message on topic [TopicA]: Hello from TopicA!
  4. Subscriber [subscriber1] received message on topic [TopicB]: Greetings from TopicB!
  5. Subscriber [subscriber3] received message on topic [TopicB]: Greetings from TopicB!
  6. Subscriber [subscriber4] received message on topic [TopicB]: Greetings from TopicB!
  7. Subscriber [subscriber1] received message on topic [TopicA]: Update from TopicA after unsubscribe!
  8. Subscriber [subscriber2] received message on topic [TopicA]: Update from TopicA after unsubscribe!
  9. Subscriber [subscriber1] received message on topic [TopicB]: Update from TopicB after unsubscribe!
  10. Subscriber [subscriber3] received message on topic [TopicB]: Update from TopicB after unsubscribe!
复制代码
运行效果说明:
订阅了TOPICA的subscriber1、subscriber2、subscriber4均收到了Publisher发布的Hello from TopicA!
订阅了TOPICB的subscriber1、subscriber3、subscriber4均收到了Publisher发布的Greetings from TopicB!
subscriber4对TOPICA和TOPICB去订阅后,再次发布消息则只有subscriber1、subscriber2、subscriber3能收到订阅信息
本文基于观察者模式,侧重于于阐述计划模式的核心思想,实现了一个简化的发布订阅体系。这种计划模式在实际生产环境中,每每须要更复杂的实现,比如涉及到不同进程之间的通信、负载均衡等,以满足高并发、高可用性的要求。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张春

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表