Pulsar 入门实战(5)--Java 操作 Pulsar

打印 上一主题 下一主题

主题 831|帖子 831|积分 2493

本文主要介绍使用 Java 来操作 Pulsar,文中所使用到的软件版本:Java 17.0.7(Pulsar 服务使用)、Java 1.8.0_341(客户端使用)、Pulsar 3.3.0、pulsar-client 3.3.0。
1、引入依赖
  1. <dependency>
  2.     <groupId>org.apache.pulsar</groupId>
  3.     <artifactId>pulsar-client</artifactId>
  4.     <version>3.3.0</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.pulsar</groupId>
  8.     <artifactId>pulsar-client-admin</artifactId>
  9.     <version>3.3.0</version>
  10. </dependency>
复制代码
2、初始化 PulsarClient 和 PulsarAdmin

PulsarClient:
  1. @Before
  2. public void before() throws PulsarClientException {
  3.     client = PulsarClient.builder()
  4.             .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
  5.             .build();
  6. }
复制代码
PulsarAdmin:
  1. @Before
  2. public void before() throws PulsarClientException {
  3.     admin = PulsarAdmin.builder()
  4.             .serviceHttpUrl("http://10.49.196.30:8080,10.49.196.31:8080,10.49.196.32:8080")
  5.             .build();
  6. }
复制代码
3、消费者

3.1、同步消费
  1. @Test
  2. public void sync() throws PulsarClientException {
  3.     Consumer<String> consumer = client.newConsumer(Schema.STRING)
  4.             .topic("my-topic") //主题
  5.             .subscriptionName("my-subscription") //订阅名称
  6.             .subscriptionType(SubscriptionType.Shared) //订阅模式
  7.             .subscribe();
  8.     while (true) {
  9.         Message<String> message = consumer.receive();
  10.         try {
  11.             log.info("topicName={},value={}", message.getTopicName(), message.getValue());
  12.             consumer.acknowledge(message);
  13.         } catch (Exception e) {
  14.             consumer.negativeAcknowledge(message);
  15.         }
  16.     }
  17. }
复制代码
3.2、异步消费
  1. public void async() throws InterruptedException {
  2.     client.newConsumer(Schema.STRING)
  3.             .topic("my-topic2")
  4.             .subscriptionName("my-subscription")
  5.             .subscriptionType(SubscriptionType.Shared)
  6.             .subscribeAsync()
  7.             .thenAccept(this::receiveAsync);
  8.     Thread.sleep(1000 * 500);
  9. }
  10. private void receiveAsync(Consumer<String> consumer) {
  11.     consumer.receiveAsync().thenAccept(message -> {
  12.         try {
  13.             log.info("messageId={},value={}", message.getMessageId(), message.getValue());
  14.             consumer.acknowledge(message);
  15.         } catch (Exception e) {
  16.             consumer.negativeAcknowledge(message);
  17.         }
  18.         receiveAsync(consumer);
  19.     });
  20. }
复制代码
3.3、使用 MessageListener
  1. @Test
  2. public void messageListener() throws Exception {
  3.     PulsarClient client = PulsarClient.builder()
  4.             .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
  5.             .listenerThreads(3)
  6.             .build();
  7.     MessageListener<String> messageListener = (consumer, msg) -> {
  8.         try {
  9.             log.info("messageId={},value={}", msg.getMessageId(), msg.getValue());
  10.             consumer.acknowledge(msg);
  11.         } catch (Exception e) {
  12.             consumer.negativeAcknowledge(msg);
  13.         }
  14.     };
  15.     client.newConsumer(Schema.STRING)
  16.             .topic("my-topic")
  17.             .subscriptionName("my-subscription")
  18.             .subscriptionType(SubscriptionType.Shared)
  19.             .messageListener(messageListener)
  20.             .subscribe();
  21.     Thread.sleep(1000 * 500);
  22. }
复制代码
3.4、重试信主题
  1. /**
  2. * 重试信主题
  3. * 处理失败的消息会进入重试信主题,达到最大重试次数后进入死信主题
  4. */
  5. @Test
  6. public void retryTopic() throws Exception {
  7.     Consumer<String> consumer = client.newConsumer(Schema.STRING)
  8.             .topic("my-topic-r")
  9.             .subscriptionName("my-subscription")
  10.             .subscriptionType(SubscriptionType.Shared)
  11.             .enableRetry(true)
  12.             .deadLetterPolicy(DeadLetterPolicy.builder()
  13.                     .maxRedeliverCount(5)
  14.                     .build())
  15.             .subscribe();
  16.     while (true) {
  17.         Message<String> message = consumer.receive();
  18.         try {
  19.             log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
  20.             //TODO:业务处理,可能发生异常
  21.             //throw new RuntimeException();
  22.             consumer.acknowledge(message);
  23.         } catch (Exception e) {
  24.             log.error("", e);
  25.             consumer.reconsumeLater(message, 5L, TimeUnit.SECONDS);
  26.         }
  27.     }
  28. }
复制代码
3.5、死信主题
  1. /**
  2. * 死信主题
  3. * 由确认超时、负面确认或重试信主题 三种情况消息处理失败后,重试最大次数后消息会进入死信主题
  4. */
  5. @Test
  6. public void deadTopic() throws Exception {
  7.     Consumer<String> consumer = client.newConsumer(Schema.STRING)
  8.             .topic("my-topic-d")
  9.             .subscriptionName("my-subscription")
  10.             .subscriptionType(SubscriptionType.Shared)
  11.             .deadLetterPolicy(DeadLetterPolicy.builder()
  12.                     .maxRedeliverCount(5)
  13.                     .initialSubscriptionName("init-sub")
  14.                     .build())
  15.             .subscribe();
  16.     while (true) {
  17.         Message<String> message = consumer.receive();
  18.         try {
  19.             log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
  20.             //TODO:业务处理,可能发生异常
  21.             //throw new RuntimeException();
  22.             consumer.acknowledge(message);
  23.         } catch (Exception e) {
  24.             log.error("", e);
  25.             consumer.negativeAcknowledge(message);
  26.         }
  27.     }
  28. }
  29. /**
  30. * 订阅死信队列,处理其中的消息
  31. */
  32. @Test
  33. public void consumerDeadTopic() throws Exception {
  34.     Consumer<String> consumer = client.newConsumer(Schema.STRING)
  35.             .topic("my-topic-d-my-subscription-DLQ")
  36.             .subscriptionName("init-sub")
  37.             .subscriptionType(SubscriptionType.Shared)
  38.             .subscribe();
  39.     while (true) {
  40.         Message<String> message = consumer.receive();
  41.         try {
  42.             log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
  43.         } catch (Exception e) {
  44.             log.error("", e);
  45.         }
  46.         consumer.acknowledge(message);
  47.     }
  48. }
复制代码
3.6、完备代码

  1. package com.abc.demo.pulsar;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.pulsar.client.api.*;
  4. import org.junit.Before;
  5. import org.junit.Test;
  6. import java.util.concurrent.TimeUnit;
  7. @Slf4j
  8. public class ConsumerCase {
  9.     private PulsarClient client;
  10.     @Before
  11.     public void before() throws PulsarClientException {
  12.         client = PulsarClient.builder()
  13.                 .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
  14.                 .build();
  15.     }
  16.     @Test
  17.     public void sync() throws PulsarClientException {
  18.         Consumer<String> consumer = client.newConsumer(Schema.STRING)
  19.                 .topic("my-topic") //主题
  20.                 .subscriptionName("my-subscription") //订阅名称
  21.                 .subscriptionType(SubscriptionType.Shared) //订阅模式
  22.                 .subscribe();
  23.         while (true) {
  24.             Message<String> message = consumer.receive();
  25.             try {
  26.                 log.info("topicName={},value={}", message.getTopicName(), message.getValue());
  27.                 consumer.acknowledge(message);
  28.             } catch (Exception e) {
  29.                 consumer.negativeAcknowledge(message);
  30.             }
  31.         }
  32.     }
  33.     @Test
  34.     public void async() throws InterruptedException {
  35.         client.newConsumer(Schema.STRING)
  36.                 .topic("my-topic2")
  37.                 .subscriptionName("my-subscription")
  38.                 .subscriptionType(SubscriptionType.Shared)
  39.                 .subscribeAsync()
  40.                 .thenAccept(this::receiveAsync);
  41.         Thread.sleep(1000 * 500);
  42.     }
  43.     private void receiveAsync(Consumer<String> consumer) {
  44.         consumer.receiveAsync().thenAccept(message -> {
  45.             try {
  46.                 log.info("messageId={},value={}", message.getMessageId(), message.getValue());
  47.                 consumer.acknowledge(message);
  48.             } catch (Exception e) {
  49.                 consumer.negativeAcknowledge(message);
  50.             }
  51.             receiveAsync(consumer);
  52.         });
  53.     }
  54.     @Test
  55.     public void messageListener() throws Exception {
  56.         PulsarClient client = PulsarClient.builder()
  57.                 .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
  58.                 .listenerThreads(3)
  59.                 .build();
  60.         MessageListener<String> messageListener = (consumer, msg) -> {
  61.             try {
  62.                 log.info("messageId={},value={}", msg.getMessageId(), msg.getValue());
  63.                 consumer.acknowledge(msg);
  64.             } catch (Exception e) {
  65.                 consumer.negativeAcknowledge(msg);
  66.             }
  67.         };
  68.         client.newConsumer(Schema.STRING)
  69.                 .topic("my-topic")
  70.                 .subscriptionName("my-subscription")
  71.                 .subscriptionType(SubscriptionType.Shared)
  72.                 .messageListener(messageListener)
  73.                 .subscribe();
  74.         Thread.sleep(1000 * 500);
  75.     }
  76.     /**
  77.      * 重试信主题
  78.      * 处理失败的消息会进入重试信主题,达到最大重试次数后进入死信主题
  79.      */
  80.     @Test
  81.     public void retryTopic() throws Exception {
  82.         Consumer<String> consumer = client.newConsumer(Schema.STRING)
  83.                 .topic("my-topic-r")
  84.                 .subscriptionName("my-subscription")
  85.                 .subscriptionType(SubscriptionType.Shared)
  86.                 .enableRetry(true)
  87.                 .deadLetterPolicy(DeadLetterPolicy.builder()
  88.                         .maxRedeliverCount(5)
  89.                         .build())
  90.                 .subscribe();
  91.         while (true) {
  92.             Message<String> message = consumer.receive();
  93.             try {
  94.                 log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
  95.                 //TODO:业务处理,可能发生异常
  96.                 //throw new RuntimeException();
  97.                 consumer.acknowledge(message);
  98.             } catch (Exception e) {
  99.                 log.error("", e);
  100.                 consumer.reconsumeLater(message, 5L, TimeUnit.SECONDS);
  101.             }
  102.         }
  103.     }
  104.     /**
  105.      * 死信主题
  106.      * 由确认超时、负面确认或重试信主题 三种情况消息处理失败后,重试最大次数后消息会进入死信主题
  107.      */
  108.     @Test
  109.     public void deadTopic() throws Exception {
  110.         Consumer<String> consumer = client.newConsumer(Schema.STRING)
  111.                 .topic("my-topic-d")
  112.                 .subscriptionName("my-subscription")
  113.                 .subscriptionType(SubscriptionType.Shared)
  114.                 .deadLetterPolicy(DeadLetterPolicy.builder()
  115.                         .maxRedeliverCount(5)
  116.                         .initialSubscriptionName("init-sub")
  117.                         .build())
  118.                 .subscribe();
  119.         while (true) {
  120.             Message<String> message = consumer.receive();
  121.             try {
  122.                 log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
  123.                 //TODO:业务处理,可能发生异常
  124.                 //throw new RuntimeException();
  125.                 consumer.acknowledge(message);
  126.             } catch (Exception e) {
  127.                 log.error("", e);
  128.                 consumer.negativeAcknowledge(message);
  129.             }
  130.         }
  131.     }
  132.     /**
  133.      * 订阅死信队列,处理其中的消息
  134.      */
  135.     @Test
  136.     public void consumerDeadTopic() throws Exception {
  137.         Consumer<String> consumer = client.newConsumer(Schema.STRING)
  138.                 .topic("my-topic-d-my-subscription-DLQ")
  139.                 .subscriptionName("init-sub")
  140.                 .subscriptionType(SubscriptionType.Shared)
  141.                 .subscribe();
  142.         while (true) {
  143.             Message<String> message = consumer.receive();
  144.             try {
  145.                 log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
  146.             } catch (Exception e) {
  147.                 log.error("", e);
  148.             }
  149.             consumer.acknowledge(message);
  150.         }
  151.     }
  152. }
复制代码
ConsumerCase.java4、Reader
  1. @Test
  2. public void reader() throws Exception {
  3.     //MessageId messageId = MessageId.fromByteArray(Base64.getDecoder().decode("CBcQBTAA"));
  4.     MessageId messageId = MessageId.earliest;
  5.     Reader reader = client.newReader(Schema.STRING)
  6.             .topic("my-topic")
  7.             .startMessageId(messageId)
  8.             .create();
  9.     while (true) {
  10.         Message msg = reader.readNext();
  11.         log.info("messageId={},messageIdBase64={},value={}", msg.getMessageId(), Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray()), msg.getValue());
  12.     }
  13. }
复制代码
完备代码:
  1. package com.abc.demo.pulsar;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.pulsar.client.api.*;
  4. import org.junit.Before;
  5. import org.junit.Test;
  6. import java.util.Base64;
  7. @Slf4j
  8. public class ReaderCase {
  9.     private PulsarClient client;
  10.     @Before
  11.     public void before() throws PulsarClientException {
  12.         client = PulsarClient.builder()
  13.                 .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
  14.                 .build();
  15.     }
  16.     @Test
  17.     public void reader() throws Exception {
  18.         //MessageId messageId = MessageId.fromByteArray(Base64.getDecoder().decode("CBcQBTAA"));
  19.         MessageId messageId = MessageId.earliest;
  20.         Reader reader = client.newReader(Schema.STRING)
  21.                 .topic("my-topic")
  22.                 .startMessageId(messageId)
  23.                 .create();
  24.         while (true) {
  25.             Message msg = reader.readNext();
  26.             log.info("messageId={},messageIdBase64={},value={}", msg.getMessageId(), Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray()), msg.getValue());
  27.         }
  28.     }
  29. }
复制代码
ReaderCase.java5、生产者

5.1、同步发送
  1. @Test
  2. public void sync() throws PulsarClientException {
  3.     Producer<String> producer = client.newProducer(Schema.STRING)
  4.             .topic("my-topic-d")
  5.             .create();
  6.     for (int i = 0; i < 3; i++) {
  7.         MessageId messageId = producer.send(("hello" + i));
  8.         log.info("messageId={}", messageId);
  9.     }
  10.     producer.close();
  11. }
复制代码
5.2、异步发送
  1. @Test
  2. public void async() throws InterruptedException {
  3.     client.newProducer(Schema.STRING)
  4.             .topic("my-topic2")
  5.             .createAsync()
  6.             .thenAccept(producer -> {
  7.                 for (int i = 0; i < 10; i++) {
  8.                     producer.sendAsync("hello" + i).thenAccept(messageId -> {
  9.                                 log.info("messageId={}", messageId);
  10.                     });
  11.                 }
  12.             });
  13.     Thread.sleep(1000 * 5);
  14. }
复制代码
5.3、详细设置消息
  1. @Test
  2. public void configMessage() throws PulsarClientException {
  3.     Producer<byte[]> producer = client.newProducer()
  4.             .topic("my-topic")
  5.             .create();
  6.     MessageId messageId = producer.newMessage(Schema.STRING)
  7.             .key("my-key") //设置消息key
  8.             .eventTime(System.currentTimeMillis()) //设置事件事件
  9.             .sequenceId(123) //设置 sequenceId
  10.             .deliverAfter(1, TimeUnit.MINUTES) //延迟投递消息
  11.             .property("my-key", "my-value") //自定义属性
  12.             .value("content")
  13.             .send();
  14.     log.info("messageId={}", messageId);
  15.     producer.close();
  16. }
复制代码
5.4、完备代码

  1. package com.abc.demo.pulsar;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.pulsar.client.api.*;
  4. import org.junit.After;
  5. import org.junit.Before;
  6. import org.junit.Test;
  7. import java.util.concurrent.CompletableFuture;
  8. import java.util.concurrent.TimeUnit;
  9. @Slf4j
  10. public class ProducerCase {
  11.     private PulsarClient client;
  12.     @Before
  13.     public void before() throws PulsarClientException {
  14.         client = PulsarClient.builder()
  15.                 //.serviceUrl("pulsar://10.49.196.33:6650")
  16.                 .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
  17.                 .build();
  18.     }
  19.     @After
  20.     public void after() throws PulsarClientException {
  21.         client.close();
  22.     }
  23.     @Test
  24.     public void sync() throws PulsarClientException {
  25.         Producer<String> producer = client.newProducer(Schema.STRING)
  26.                 .topic("my-topic-d")
  27.                 .create();
  28.         for (int i = 0; i < 3; i++) {
  29.             MessageId messageId = producer.send(("hello" + i));
  30.             log.info("messageId={}", messageId);
  31.         }
  32.         producer.close();
  33.     }
  34.     @Test
  35.     public void async() throws InterruptedException {
  36.         client.newProducer(Schema.STRING)
  37.                 .topic("my-topic2")
  38.                 .createAsync()
  39.                 .thenAccept(producer -> {
  40.                     for (int i = 0; i < 10; i++) {
  41.                         producer.sendAsync("hello" + i).thenAccept(messageId -> {
  42.                                     log.info("messageId={}", messageId);
  43.                         });
  44.                     }
  45.                 });
  46.         Thread.sleep(1000 * 5);
  47.     }
  48.     @Test
  49.     public void configMessage() throws PulsarClientException {
  50.         Producer<byte[]> producer = client.newProducer()
  51.                 .topic("my-topic")
  52.                 .create();
  53.         MessageId messageId = producer.newMessage(Schema.STRING)
  54.                 .key("my-key") //设置消息key
  55.                 .eventTime(System.currentTimeMillis()) //设置事件事件
  56.                 .sequenceId(123) //设置 sequenceId
  57.                 .deliverAfter(1, TimeUnit.MINUTES) //延迟投递消息
  58.                 .property("my-key", "my-value") //自定义属性
  59.                 .value("content")
  60.                 .send();
  61.         log.info("messageId={}", messageId);
  62.         producer.close();
  63.     }
  64. }
复制代码
ProducerCase.java6、Admin

6.1、Brokers

6.1.1、列出活动 broker
  1. admin.brokers().getActiveBrokers(clusterName)
复制代码
6.1.2、列出 broker 的定名空间
  1. admin.brokers().getOwnedNamespaces(cluster,brokerUrl);
复制代码
6.1.3、获取动态配置名称
  1. admin.brokers().getDynamicConfigurationNames();
复制代码
6.1.4、更新动态配置
  1. admin.brokers().updateDynamicConfiguration(configName, configValue);
复制代码
6.1.5、获取已经更新过的动态配置
  1. admin.brokers().getAllDynamicConfigurations();
复制代码
6.1.6、获取 leader broker
  1. admin.brokers().getLeaderBroker()
复制代码
6.2、Clusters

6.2.1、获取集群配置信息
  1. admin.clusters().getCluster(clusterName);
复制代码
6.2.2、获取集群列表
  1. admin.clusters().getClusters();
复制代码
6.3、Tenant

6.3.1、列出租户
  1. admin.tenants().getTenants();
复制代码
6.3.2、创建租户
  1. admin.tenants().createTenant(tenantName, tenantInfo);
复制代码
6.3.3、获取租户配置信息
  1. admin.tenants().getTenantInfo(tenantName);
复制代码
6.3.4、删除租户
  1. admin.Tenants().deleteTenant(tenantName);
复制代码
6.3.5、更新租户
  1. admin.tenants().updateTenant(tenantName, tenantInfo);
复制代码
6.4、Namespaces

6.4.1、创建定名空间
  1. admin.namespaces().createNamespace(namespace);
复制代码
6.4.2、获取定名空间策略
  1. admin.namespaces().getPolicies(namespace);
复制代码
6.4.3、列出定名空间
  1. admin.namespaces().getNamespaces(tenant);
复制代码
6.4.4、删除定名空间
  1. admin.namespaces().deleteNamespace(namespace);
复制代码
6.5、Topics

6.5.1、查察订阅的待消费消息(不会消费消息)
  1. admin.topics().peekMessages(topic, subName, numMessages);
复制代码
6.5.2、通过消息 ID 获取消息
  1. admin.topics().getMessageById(topic, ledgerId, entryId);
复制代码
6.5.3、通过相对于最早或最新消息的位置来查找消息
  1. admin.topics().examineMessage(topic, "latest", 1);
复制代码
6.5.4、通过期间获取消息的 ID
  1. admin.topics().getMessageIdByTimestamp(topic, timestamp);
复制代码
6.5.5、跳过特定主题的某个订阅中的多少条未消费消息
  1. admin.topics().skipMessages(topic, subName, numMessages);
复制代码
6.5.6、跳过特定主题的某个订阅中的所有未消费消息
  1. admin.topics().skipAllMessages(topic, subName);
复制代码
6.5.7、根据时间重置游标
  1. admin.topics().resetCursor(topic, subName, timestamp);
复制代码
6.5.8、获取主题所属的 broker
  1. admin.lookups().lookupTopic(topic);
复制代码
6.5.9、获取分区主题所属的 broker
  1. admin.lookups().lookupPartitionedTopic(topic);
复制代码
6.5.10、获取主题的订阅信息
  1. admin.topics().getSubscriptions(topic);
复制代码
6.5.11、获取最新消息的 ID
  1. admin.topics().getLastMessage(topic);
复制代码
6.5.12、创建非分区主题
  1. admin.topics().createNonPartitionedTopic(topicName);
复制代码
6.5.13、删除非分区主题
  1. admin.topics().delete(topic);
复制代码
6.5.14、列出非分区主题
  1. admin.topics().getList(namespace);
复制代码
6.5.15、获取非分区主题状态
  1. admin.topics().getStats(topic, false /* is precise backlog */);
复制代码
6.5.16、获取非分区主题内部状态
  1. admin.topics().getInternalStats(topic);
复制代码
6.5.17、创建分区主题
  1. admin.topics().createPartitionedTopic(topicName, numPartitions);
复制代码
6.5.18、获取分区主题元数据信息
  1. admin.topics().getPartitionedTopicMetadata(topicName);
复制代码
6.5.19、更新分区主题的分区数(只能比原来大)
  1. admin.topics().updatePartitionedTopic(topic, numPartitions);
复制代码
6.5.20、删除分区主题
  1. admin.topics().deletePartitionedTopic(topic);
复制代码
6.5.21、列出分区主题
  1. admin.topics().getPartitionedTopicList(namespace);
复制代码
6.5.22、获取分区主题状态
  1. admin.topics().getPartitionedStats(topic, true /* per partition */, false /* is precise backlog */);
复制代码
6.5.23、获取分区主题内部状态
  1. admin.topics().getPartitionedInternalStats(topic);
复制代码
6.5.24、创建订阅
  1. admin.topics().createSubscription(topic, subscriptionName, MessageId.latest);
复制代码
6.5.25、获取订阅
  1. admin.topics().getSubscriptions(topic);
复制代码
6.5.26、删除订阅
  1. admin.topics().deleteSubscription(topic, subscriptionName);
复制代码
6.6、完备代码

  1. package com.abc.demo.pulsar;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.pulsar.client.admin.PulsarAdmin;
  4. import org.apache.pulsar.client.admin.PulsarAdminException;
  5. import org.apache.pulsar.client.api.Message;
  6. import org.apache.pulsar.client.api.MessageId;
  7. import org.apache.pulsar.client.api.PulsarClientException;
  8. import org.apache.pulsar.common.policies.data.TenantInfoImpl;
  9. import org.junit.After;
  10. import org.junit.Before;
  11. import org.junit.Test;
  12. import java.util.Collections;
  13. import java.util.HashSet;
  14. @Slf4j
  15. public class AdminCase {
  16.     private PulsarAdmin admin;
  17.     @Before
  18.     public void before() throws PulsarClientException {
  19.         admin = PulsarAdmin.builder()
  20.                 .serviceHttpUrl("http://10.49.196.30:8080,10.49.196.31:8080,10.49.196.32:8080")
  21.                 .build();
  22.     }
  23.     @After
  24.     public void after() {
  25.         admin.close();
  26.     }
  27.     @Test
  28.     public void broker() throws PulsarAdminException {
  29.         log.info("getActiveBrokers={}", admin.brokers().getActiveBrokers());
  30.         log.info("getOwnedNamespaces={}", admin.brokers().getOwnedNamespaces("pulsar-cluster-1", "app1:8080"));
  31.         log.info("getDynamicConfigurationNames={}", admin.brokers().getDynamicConfigurationNames());
  32.         admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "true");
  33.         log.info("getAllDynamicConfigurations={}", admin.brokers().getAllDynamicConfigurations());
  34.         log.info("getLeaderBroker={}", admin.brokers().getLeaderBroker());
  35.     }
  36.     @Test
  37.     public void clusters() throws PulsarAdminException {
  38.         log.info("getCluster={}", admin.clusters().getCluster("pulsar-cluster-1"));
  39.         log.info("getClusters={}", admin.clusters().getClusters());
  40.     }
  41.    
  42.     @Test
  43.     public void tenants() throws PulsarAdminException {
  44.         log.info("getTenants={}", admin.tenants().getTenants());
  45.         TenantInfoImpl tenantInfo = new TenantInfoImpl();
  46.         tenantInfo.setAdminRoles(new HashSet<>());
  47.         tenantInfo.setAllowedClusters(Collections.singleton("pulsar-cluster-1"));
  48.         admin.tenants().createTenant("test-tenant", tenantInfo);
  49.         log.info("getTenantInfo={}", admin.tenants().getTenantInfo("test-tenant"));
  50.         admin.tenants().updateTenant("test-tenant", tenantInfo);
  51.         admin.tenants().deleteTenant("test-tenant");
  52.     }
  53.     @Test
  54.     public void namespaces() throws PulsarAdminException {
  55.         admin.namespaces().createNamespace("public/test-ns");
  56.         log.info("getPolicies={}", admin.namespaces().getPolicies("public/default"));
  57.         log.info("getNamespaces={}", admin.namespaces().getNamespaces("public"));
  58.         admin.namespaces().deleteNamespace("public/test-ns");
  59.     }
  60.    
  61.     @Test
  62.     public void topics() throws PulsarAdminException {
  63.         log.info("peekMessages={}", admin.topics().peekMessages("persistent://public/default/my-topic", "my-subscription", 3));
  64.         Message<byte[]> message = admin.topics().getMessageById("persistent://public/default/my-topic", 171, 16);
  65.         log.info("getMessageById={}", new String(message.getData()));
  66.         message = admin.topics().examineMessage("persistent://public/default/my-topic", "latest", 1);
  67.         log.info("examineMessage={}", new String(message.getData()));
  68.         log.info("getMessageIdByTimestamp={}", admin.topics().getMessageIdByTimestamp("persistent://public/default/my-topic", System.currentTimeMillis()));
  69.         admin.topics().skipMessages("persistent://public/default/my-topic", "my-subscription", 1);
  70.         admin.topics().skipAllMessages("persistent://public/default/my-topic", "my-subscription");
  71.         admin.topics().resetCursor("persistent://public/default/my-topic", "my-subscription", System.currentTimeMillis() - 1000 * 60 * 15);
  72.         log.info("lookupTopic={}", admin.lookups().lookupTopic("persistent://public/default/my-topic"));
  73.         log.info("lookupPartitionedTopic={}", admin.lookups().lookupPartitionedTopic("persistent://public/default/my-topic2"));
  74.         log.info("getSubscriptions={}", admin.topics().getSubscriptions("persistent://public/default/my-topic"));
  75.         log.info("getLastMessageId={}", admin.topics().getLastMessageId("persistent://public/default/my-topic"));
  76.         admin.topics().createNonPartitionedTopic("persistent://public/default/test-topic");
  77.         admin.topics().delete("persistent://public/default/test-topic");
  78.         log.info("getList={}", admin.topics().getList("public/default"));
  79.         log.info("getStats={}", admin.topics().getStats("persistent://public/default/my-topic", false));
  80.         log.info("getInternalStats={}", admin.topics().getInternalStats("persistent://public/default/my-topic"));
  81.         admin.topics().createPartitionedTopic("persistent://public/default/test-topic-p", 2);
  82.         log.info("getPartitionedTopicMetadata={}", admin.topics().getPartitionedTopicMetadata("persistent://public/default/test-topic-p"));
  83.         admin.topics().updatePartitionedTopic("persistent://public/default/test-topic-p", 3);
  84.         admin.topics().deletePartitionedTopic("persistent://public/default/test-topic-p");
  85.         log.info("getStats={}", admin.topics().getPartitionedStats("persistent://public/default/my-topic2", false));
  86.         log.info("getInternalStats={}", admin.topics().getPartitionedInternalStats("persistent://public/default/my-topic2"));
  87.         admin.topics().createSubscription("persistent://public/default/my-topic", "test-subscription", MessageId.latest);
  88.         log.info("getStats={}", admin.topics().getSubscriptions("persistent://public/default/my-topic"));
  89.         admin.topics().deleteSubscription("persistent://public/default/my-topic", "test-subscription");
  90.     }
  91. }
复制代码
AdminCase.java 
 
参考:
https://pulsar.apache.org/docs/3.3.x/client-libraries-java/
https://pulsar.apache.org/docs/3.3.x/admin-api-overview/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

徐锦洪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表