ToB企服应用市场:ToB评测及商务社交产业平台

标题: Windows10配置C++版本的Kafka,并进行发布和订阅测试 [打印本页]

作者: 络腮胡菲菲    时间: 7 天前
标题: Windows10配置C++版本的Kafka,并进行发布和订阅测试
配置的情况为:Release x64下的情况
完整项目:https://gitee.com/jiajingong/kafka-publisher
1、首先下载相应的库文件(.lib,.dll)

参考链接:
GitHub - eStreamSoftware/delphi-kafka
GitHub - cloader/KafkaCPP-win32-dll: KafkaCpp-win32-dll
2、新建一个新的下令行C++工程

建完工程后,选择Release x64,并在天生中执行重新天生解决方案,这样会在项目目次下天生x64/Release文件夹
3、通过VS2017配置附加库目次和附加依赖项

所有的.lib、.dll等库文件均在下图x64/Release目次下

附加依赖项到场:librdkafka.lib;librdkafkacpp.lib,如下图:

4、发布端:

将主函数的CPP文件改为:
  1. #include <iostream>
  2. #include <thread>
  3. #include "rdkafkacpp.h"
  4. int main()
  5. {
  6.         std::string brokers = "172.18.4.96:9092";
  7.         std::string errorStr;
  8.         RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  9.         RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  10.         if (!conf) {
  11.                 std::cout << "Create RdKafka Conf failed" << std::endl;
  12.                 return -1;
  13.         }
  14.         conf->set("message.max.bytes", "10240000", errorStr); //最大字节数
  15.         conf->set("replica.fetch.max.bytes", "20485760", errorStr);
  16.         conf->set("bootstrap.servers", brokers, errorStr);
  17.         RdKafka::Producer *producer = RdKafka::Producer::create(conf, errorStr);
  18.         if (!producer) {
  19.                 std::cout << "Create Producer failed" << std::endl;
  20.                 return -1;
  21.         }
  22.         //创建Topic
  23.         RdKafka::Topic *topic = RdKafka::Topic::create(producer, "koala-stqf-03", tconf, errorStr);
  24.         if (!topic) {
  25.                 std::cout << "Create Topic failed" << std::endl;
  26.         }
  27.         int count = 0;
  28.         while (true)
  29.         {   //发送消息
  30.                 RdKafka::ErrorCode resCode = producer->produce(topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, (char *)"123456789", 10, nullptr, nullptr);
  31.                 std::cout << "Count:" << count << ",has publish:" << (char *)"123456789" << std::endl;
  32.                 if (resCode != RdKafka::ERR_NO_ERROR) {
  33.                         std::cerr << "Produce failed: " << RdKafka::err2str(resCode) << std::endl;
  34.                 }
  35.                 count += 1;
  36.                 std::this_thread::sleep_for(std::chrono::seconds(1));
  37.         }
  38.         delete conf;
  39.         delete tconf;
  40.         delete topic;
  41.         delete producer;
  42.         RdKafka::wait_destroyed(5000);
  43.         return 0;
  44. }
复制代码
5、订阅端

新建一个同样的订阅端工程,同样将主函数的代码改为:
  1. #include "rdkafkacpp.h"
  2. #include <chrono>
  3. #include <time.h>
  4. #include <sstream>
  5. #include <iomanip>
  6. #include <iostream>
  7. #include <algorithm>
  8. #include <iterator>
  9. void consume_cb(RdKafka::Message &message, void *opaque)
  10. {
  11.         switch (message.err()) {
  12.         case RdKafka::ERR__TIMED_OUT:
  13.                 std::cout << "RdKafka::ERR__TIMED_OUT" << std::endl;
  14.                 break;
  15.         case RdKafka::ERR_NO_ERROR:
  16.                 /* Real message */
  17.                 RdKafka::MessageTimestamp ts;
  18.                 ts = message.timestamp();
  19.                 if (ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {
  20.                         std::string timeprefix;
  21.                         if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME) {
  22.                                 timeprefix = "created time";
  23.                         }
  24.                         else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME) {
  25.                                 timeprefix = "log append time";
  26.                         }
  27.                         unsigned long long milli = ts.timestamp + (unsigned long long)8 * 60 * 60 * 1000;//此处转化为东八区北京时间,如果是其它时区需要按需求修改
  28.                         auto mTime = std::chrono::milliseconds(milli);
  29.                         auto tp = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>(mTime);
  30.                         auto tt = std::chrono::system_clock::to_time_t(tp);
  31.                         tm timeinfo;
  32.                         ::gmtime_s(&timeinfo, &tt);
  33.                         //char s[60]{ 0 };
  34.                         //::sprintf(s, "%04d-%02d-%02d %02d:%02d:%02d", timeinfo.tm_year + 1900, timeinfo.tm_mon + 1, timeinfo.tm_mday, timeinfo.tm_hour, timeinfo.tm_min, timeinfo.tm_sec);
  35.                         // std::cout << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
  36. #if 0
  37.                         std::stringstream ss;
  38.                         std::string dateStr;
  39.                         ss << timeinfo.tm_year + 1900 << "-"
  40.                                 << timeinfo.tm_mon + 1 << "-"
  41.                                 << timeinfo.tm_mday;
  42.                         ss >> dateStr;
  43.                         ss.clear();
  44.                         ss << timeinfo.tm_hour << ":"
  45.                                 << timeinfo.tm_min << ":"
  46.                                 << timeinfo.tm_sec;
  47.                         std::string timeStr;
  48.                         ss >> timeStr;
  49.                         std::string dateTimeStr;
  50.                         dateTimeStr += dateStr;
  51.                         dateTimeStr.push_back(' ');
  52.                         dateTimeStr += timeStr;
  53. #endif // 0
  54.                         //std::cout << "TimeStamp" << timeprefix << " " << s << std::endl;
  55.                         std::cout << "TimeStamp   " << timeinfo.tm_year + 1900 << "-" << timeinfo.tm_mon + 1 << "-" << timeinfo.tm_mday << " " << timeinfo.tm_hour << ":" << timeinfo.tm_min << ":" << timeinfo.tm_sec << std::endl;
  56.                 }
  57.                 std::cout << message.topic_name() << " offset" << message.offset() << "  partion " << message.partition() << " message: " << reinterpret_cast<char*>(message.payload()) << std::endl;
  58.                 break;
  59.         case RdKafka::ERR__PARTITION_EOF:
  60.                 /* Last message */
  61.                 std::cout << "EOF reached for" << std::endl;
  62.                 break;
  63.         case RdKafka::ERR__UNKNOWN_TOPIC:
  64.         case RdKafka::ERR__UNKNOWN_PARTITION:
  65.                 std::cout << "Consume failed: " << message.errstr();
  66.                 break;
  67.         default:
  68.                 /* Errors */
  69.                 std::cout << "Consume failed: " << message.errstr();
  70.                 break;
  71.         }
  72. }
  73. int main()
  74. {
  75.         std::string brokers = "172.18.4.96:9092";
  76.         std::string errstr;
  77.         std::vector<std::string> topics{ "koala-stqf-03",
  78.                 "klai-seim-alert-koala-test-03"
  79.         };
  80.         std::string group_id = "whl-consumer-group";
  81.         RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  82.         if (conf->set("group.id", group_id, errstr)) {
  83.                 std::cout << errstr << std::endl;
  84.                 return -1;
  85.         }
  86.         conf->set("bootstrap.servers", brokers, errstr);
  87.         conf->set("max.partition.fetch.bytes", "1024000", errstr);
  88.         //conf->set("enable-auto-commit", "true", errstr);
  89.         RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
  90.         tconf->set("auto.offset.reset", "latest", errstr);
  91.         conf->set("default_topic_conf", tconf, errstr);
  92.         RdKafka::KafkaConsumer *m_consumer = RdKafka::KafkaConsumer::create(conf, errstr);
  93.         if (!m_consumer) {
  94.                 std::cout << "failed to create consumer " << errstr << std::endl;
  95.                 return -1;
  96.         }
  97. #if 0 //从上一次消费结束的位置开始消费
  98.         RdKafka::ErrorCode err = m_consumer->subscribe(topics);
  99.         if (err != RdKafka::ERR_NO_ERROR) {
  100.                 std::cout << RdKafka::err2str(err) << std::endl;
  101.                 return -1;
  102.         }
  103. #else //指定每个topic的每个分区开始消费的位置
  104.         //基本思路为先获取server端的状态信息,将与订阅相关的topic找出来,根据分区,创建TopicPartion;最后使用assign消费
  105.         RdKafka::Metadata *metadataMap{ nullptr };
  106.         RdKafka::ErrorCode err = m_consumer->metadata(true, nullptr, &metadataMap, 2000);
  107.         if (err != RdKafka::ERR_NO_ERROR) {
  108.                 std::cout << RdKafka::err2str(err) << std::endl;
  109.         }
  110.         const RdKafka::Metadata::TopicMetadataVector *topicList = metadataMap->topics();
  111.         std::cout << "broker topic size: " << topicList->size() << std::endl;
  112.         RdKafka::Metadata::TopicMetadataVector subTopicMetaVec;
  113.         std::copy_if(topicList->begin(), topicList->end(), std::back_inserter(subTopicMetaVec), [&topics](const RdKafka::TopicMetadata* data) {
  114.                 return std::find_if(topics.begin(), topics.end(), [data](const std::string &tname) {return data->topic() == tname; }) != topics.end();
  115.         });
  116.         std::vector<RdKafka::TopicPartition*> topicpartions;
  117.         std::for_each(subTopicMetaVec.begin(), subTopicMetaVec.end(), [&topicpartions](const RdKafka::TopicMetadata* data) {
  118.                 auto parVec = data->partitions();
  119.                 std::for_each(parVec->begin(), parVec->end(), [&](const RdKafka::PartitionMetadata *value) {
  120.                         std::cout << data->topic() << " has partion: " << value->id() << " Leader is : " << value->leader() << std::endl;
  121.                         topicpartions.push_back(RdKafka::TopicPartition::create(data->topic(), value->id(), RdKafka::Topic::OFFSET_END));
  122.                 });
  123.         });
  124.         m_consumer->assign(topicpartions);
  125. #endif // 0
  126.         RdKafka::ErrorCode errccc = m_consumer->subscribe(topics);
  127.         if (errccc != RdKafka::ERR_NO_ERROR) {
  128.                 std::cout << RdKafka::err2str(errccc) << std::endl;
  129.                 return -1;
  130.         }
  131.         while (true)
  132.         {
  133.                 RdKafka::Message *msg = m_consumer->consume(6000);
  134.                 consume_cb(*msg, nullptr); //消息一条消息
  135.                 delete msg;
  136.         }
  137.         return 0;
  138. }
复制代码

6、发布 订阅展示:




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4