本文主要介绍使用 Java 来操作 Pulsar,文中所使用到的软件版本:Java 17.0.7(Pulsar 服务使用)、Java 1.8.0_341(客户端使用)、Pulsar 3.3.0、pulsar-client 3.3.0。
1、引入依赖
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client</artifactId>
- <version>3.3.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-admin</artifactId>
- <version>3.3.0</version>
- </dependency>
复制代码 2、初始化 PulsarClient 和 PulsarAdmin
PulsarClient:- @Before
- public void before() throws PulsarClientException {
- client = PulsarClient.builder()
- .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
- .build();
- }
复制代码 PulsarAdmin:- @Before
- public void before() throws PulsarClientException {
- admin = PulsarAdmin.builder()
- .serviceHttpUrl("http://10.49.196.30:8080,10.49.196.31:8080,10.49.196.32:8080")
- .build();
- }
复制代码 3、消费者
3.1、同步消费
- @Test
- public void sync() throws PulsarClientException {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic("my-topic") //主题
- .subscriptionName("my-subscription") //订阅名称
- .subscriptionType(SubscriptionType.Shared) //订阅模式
- .subscribe();
- while (true) {
- Message<String> message = consumer.receive();
- try {
- log.info("topicName={},value={}", message.getTopicName(), message.getValue());
- consumer.acknowledge(message);
- } catch (Exception e) {
- consumer.negativeAcknowledge(message);
- }
- }
- }
复制代码 3.2、异步消费
- public void async() throws InterruptedException {
- client.newConsumer(Schema.STRING)
- .topic("my-topic2")
- .subscriptionName("my-subscription")
- .subscriptionType(SubscriptionType.Shared)
- .subscribeAsync()
- .thenAccept(this::receiveAsync);
- Thread.sleep(1000 * 500);
- }
- private void receiveAsync(Consumer<String> consumer) {
- consumer.receiveAsync().thenAccept(message -> {
- try {
- log.info("messageId={},value={}", message.getMessageId(), message.getValue());
- consumer.acknowledge(message);
- } catch (Exception e) {
- consumer.negativeAcknowledge(message);
- }
- receiveAsync(consumer);
- });
- }
复制代码 3.3、使用 MessageListener
- @Test
- public void messageListener() throws Exception {
- PulsarClient client = PulsarClient.builder()
- .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
- .listenerThreads(3)
- .build();
- MessageListener<String> messageListener = (consumer, msg) -> {
- try {
- log.info("messageId={},value={}", msg.getMessageId(), msg.getValue());
- consumer.acknowledge(msg);
- } catch (Exception e) {
- consumer.negativeAcknowledge(msg);
- }
- };
- client.newConsumer(Schema.STRING)
- .topic("my-topic")
- .subscriptionName("my-subscription")
- .subscriptionType(SubscriptionType.Shared)
- .messageListener(messageListener)
- .subscribe();
- Thread.sleep(1000 * 500);
- }
复制代码 3.4、重试信主题
- /**
- * 重试信主题
- * 处理失败的消息会进入重试信主题,达到最大重试次数后进入死信主题
- */
- @Test
- public void retryTopic() throws Exception {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic("my-topic-r")
- .subscriptionName("my-subscription")
- .subscriptionType(SubscriptionType.Shared)
- .enableRetry(true)
- .deadLetterPolicy(DeadLetterPolicy.builder()
- .maxRedeliverCount(5)
- .build())
- .subscribe();
- while (true) {
- Message<String> message = consumer.receive();
- try {
- log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
- //TODO:业务处理,可能发生异常
- //throw new RuntimeException();
- consumer.acknowledge(message);
- } catch (Exception e) {
- log.error("", e);
- consumer.reconsumeLater(message, 5L, TimeUnit.SECONDS);
- }
- }
- }
复制代码 3.5、死信主题
- /**
- * 死信主题
- * 由确认超时、负面确认或重试信主题 三种情况消息处理失败后,重试最大次数后消息会进入死信主题
- */
- @Test
- public void deadTopic() throws Exception {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic("my-topic-d")
- .subscriptionName("my-subscription")
- .subscriptionType(SubscriptionType.Shared)
- .deadLetterPolicy(DeadLetterPolicy.builder()
- .maxRedeliverCount(5)
- .initialSubscriptionName("init-sub")
- .build())
- .subscribe();
- while (true) {
- Message<String> message = consumer.receive();
- try {
- log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
- //TODO:业务处理,可能发生异常
- //throw new RuntimeException();
- consumer.acknowledge(message);
- } catch (Exception e) {
- log.error("", e);
- consumer.negativeAcknowledge(message);
- }
- }
- }
- /**
- * 订阅死信队列,处理其中的消息
- */
- @Test
- public void consumerDeadTopic() throws Exception {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic("my-topic-d-my-subscription-DLQ")
- .subscriptionName("init-sub")
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- while (true) {
- Message<String> message = consumer.receive();
- try {
- log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
- } catch (Exception e) {
- log.error("", e);
- }
- consumer.acknowledge(message);
- }
- }
复制代码 3.6、完备代码
data:image/s3,"s3://crabby-images/e0f05/e0f05a005ca74af689d31917b7f728b20fba3814" alt="" data:image/s3,"s3://crabby-images/8bdce/8bdce277043852b5cc34353ff5b6afee93e19db3" alt="" - package com.abc.demo.pulsar;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.pulsar.client.api.*;
- import org.junit.Before;
- import org.junit.Test;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- public class ConsumerCase {
- private PulsarClient client;
- @Before
- public void before() throws PulsarClientException {
- client = PulsarClient.builder()
- .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
- .build();
- }
- @Test
- public void sync() throws PulsarClientException {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic("my-topic") //主题
- .subscriptionName("my-subscription") //订阅名称
- .subscriptionType(SubscriptionType.Shared) //订阅模式
- .subscribe();
- while (true) {
- Message<String> message = consumer.receive();
- try {
- log.info("topicName={},value={}", message.getTopicName(), message.getValue());
- consumer.acknowledge(message);
- } catch (Exception e) {
- consumer.negativeAcknowledge(message);
- }
- }
- }
- @Test
- public void async() throws InterruptedException {
- client.newConsumer(Schema.STRING)
- .topic("my-topic2")
- .subscriptionName("my-subscription")
- .subscriptionType(SubscriptionType.Shared)
- .subscribeAsync()
- .thenAccept(this::receiveAsync);
- Thread.sleep(1000 * 500);
- }
- private void receiveAsync(Consumer<String> consumer) {
- consumer.receiveAsync().thenAccept(message -> {
- try {
- log.info("messageId={},value={}", message.getMessageId(), message.getValue());
- consumer.acknowledge(message);
- } catch (Exception e) {
- consumer.negativeAcknowledge(message);
- }
- receiveAsync(consumer);
- });
- }
- @Test
- public void messageListener() throws Exception {
- PulsarClient client = PulsarClient.builder()
- .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
- .listenerThreads(3)
- .build();
- MessageListener<String> messageListener = (consumer, msg) -> {
- try {
- log.info("messageId={},value={}", msg.getMessageId(), msg.getValue());
- consumer.acknowledge(msg);
- } catch (Exception e) {
- consumer.negativeAcknowledge(msg);
- }
- };
- client.newConsumer(Schema.STRING)
- .topic("my-topic")
- .subscriptionName("my-subscription")
- .subscriptionType(SubscriptionType.Shared)
- .messageListener(messageListener)
- .subscribe();
- Thread.sleep(1000 * 500);
- }
- /**
- * 重试信主题
- * 处理失败的消息会进入重试信主题,达到最大重试次数后进入死信主题
- */
- @Test
- public void retryTopic() throws Exception {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic("my-topic-r")
- .subscriptionName("my-subscription")
- .subscriptionType(SubscriptionType.Shared)
- .enableRetry(true)
- .deadLetterPolicy(DeadLetterPolicy.builder()
- .maxRedeliverCount(5)
- .build())
- .subscribe();
- while (true) {
- Message<String> message = consumer.receive();
- try {
- log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
- //TODO:业务处理,可能发生异常
- //throw new RuntimeException();
- consumer.acknowledge(message);
- } catch (Exception e) {
- log.error("", e);
- consumer.reconsumeLater(message, 5L, TimeUnit.SECONDS);
- }
- }
- }
- /**
- * 死信主题
- * 由确认超时、负面确认或重试信主题 三种情况消息处理失败后,重试最大次数后消息会进入死信主题
- */
- @Test
- public void deadTopic() throws Exception {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic("my-topic-d")
- .subscriptionName("my-subscription")
- .subscriptionType(SubscriptionType.Shared)
- .deadLetterPolicy(DeadLetterPolicy.builder()
- .maxRedeliverCount(5)
- .initialSubscriptionName("init-sub")
- .build())
- .subscribe();
- while (true) {
- Message<String> message = consumer.receive();
- try {
- log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
- //TODO:业务处理,可能发生异常
- //throw new RuntimeException();
- consumer.acknowledge(message);
- } catch (Exception e) {
- log.error("", e);
- consumer.negativeAcknowledge(message);
- }
- }
- }
- /**
- * 订阅死信队列,处理其中的消息
- */
- @Test
- public void consumerDeadTopic() throws Exception {
- Consumer<String> consumer = client.newConsumer(Schema.STRING)
- .topic("my-topic-d-my-subscription-DLQ")
- .subscriptionName("init-sub")
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
- while (true) {
- Message<String> message = consumer.receive();
- try {
- log.info("messageId={},topicName={},value={}", message.getMessageId(), message.getTopicName(), message.getValue());
- } catch (Exception e) {
- log.error("", e);
- }
- consumer.acknowledge(message);
- }
- }
- }
复制代码 ConsumerCase.java4、Reader
- @Test
- public void reader() throws Exception {
- //MessageId messageId = MessageId.fromByteArray(Base64.getDecoder().decode("CBcQBTAA"));
- MessageId messageId = MessageId.earliest;
- Reader reader = client.newReader(Schema.STRING)
- .topic("my-topic")
- .startMessageId(messageId)
- .create();
- while (true) {
- Message msg = reader.readNext();
- log.info("messageId={},messageIdBase64={},value={}", msg.getMessageId(), Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray()), msg.getValue());
- }
- }
复制代码 完备代码:
data:image/s3,"s3://crabby-images/e0f05/e0f05a005ca74af689d31917b7f728b20fba3814" alt="" data:image/s3,"s3://crabby-images/8bdce/8bdce277043852b5cc34353ff5b6afee93e19db3" alt="" - package com.abc.demo.pulsar;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.pulsar.client.api.*;
- import org.junit.Before;
- import org.junit.Test;
- import java.util.Base64;
- @Slf4j
- public class ReaderCase {
- private PulsarClient client;
- @Before
- public void before() throws PulsarClientException {
- client = PulsarClient.builder()
- .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
- .build();
- }
- @Test
- public void reader() throws Exception {
- //MessageId messageId = MessageId.fromByteArray(Base64.getDecoder().decode("CBcQBTAA"));
- MessageId messageId = MessageId.earliest;
- Reader reader = client.newReader(Schema.STRING)
- .topic("my-topic")
- .startMessageId(messageId)
- .create();
- while (true) {
- Message msg = reader.readNext();
- log.info("messageId={},messageIdBase64={},value={}", msg.getMessageId(), Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray()), msg.getValue());
- }
- }
- }
复制代码 ReaderCase.java5、生产者
5.1、同步发送
- @Test
- public void sync() throws PulsarClientException {
- Producer<String> producer = client.newProducer(Schema.STRING)
- .topic("my-topic-d")
- .create();
- for (int i = 0; i < 3; i++) {
- MessageId messageId = producer.send(("hello" + i));
- log.info("messageId={}", messageId);
- }
- producer.close();
- }
复制代码 5.2、异步发送
- @Test
- public void async() throws InterruptedException {
- client.newProducer(Schema.STRING)
- .topic("my-topic2")
- .createAsync()
- .thenAccept(producer -> {
- for (int i = 0; i < 10; i++) {
- producer.sendAsync("hello" + i).thenAccept(messageId -> {
- log.info("messageId={}", messageId);
- });
- }
- });
- Thread.sleep(1000 * 5);
- }
复制代码 5.3、详细设置消息
- @Test
- public void configMessage() throws PulsarClientException {
- Producer<byte[]> producer = client.newProducer()
- .topic("my-topic")
- .create();
- MessageId messageId = producer.newMessage(Schema.STRING)
- .key("my-key") //设置消息key
- .eventTime(System.currentTimeMillis()) //设置事件事件
- .sequenceId(123) //设置 sequenceId
- .deliverAfter(1, TimeUnit.MINUTES) //延迟投递消息
- .property("my-key", "my-value") //自定义属性
- .value("content")
- .send();
- log.info("messageId={}", messageId);
- producer.close();
- }
复制代码 5.4、完备代码
data:image/s3,"s3://crabby-images/e0f05/e0f05a005ca74af689d31917b7f728b20fba3814" alt="" data:image/s3,"s3://crabby-images/8bdce/8bdce277043852b5cc34353ff5b6afee93e19db3" alt="" - package com.abc.demo.pulsar;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.pulsar.client.api.*;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- public class ProducerCase {
- private PulsarClient client;
- @Before
- public void before() throws PulsarClientException {
- client = PulsarClient.builder()
- //.serviceUrl("pulsar://10.49.196.33:6650")
- .serviceUrl("pulsar://10.49.196.30:6650,10.49.196.31:6650,10.49.196.32:6650")
- .build();
- }
- @After
- public void after() throws PulsarClientException {
- client.close();
- }
- @Test
- public void sync() throws PulsarClientException {
- Producer<String> producer = client.newProducer(Schema.STRING)
- .topic("my-topic-d")
- .create();
- for (int i = 0; i < 3; i++) {
- MessageId messageId = producer.send(("hello" + i));
- log.info("messageId={}", messageId);
- }
- producer.close();
- }
- @Test
- public void async() throws InterruptedException {
- client.newProducer(Schema.STRING)
- .topic("my-topic2")
- .createAsync()
- .thenAccept(producer -> {
- for (int i = 0; i < 10; i++) {
- producer.sendAsync("hello" + i).thenAccept(messageId -> {
- log.info("messageId={}", messageId);
- });
- }
- });
- Thread.sleep(1000 * 5);
- }
- @Test
- public void configMessage() throws PulsarClientException {
- Producer<byte[]> producer = client.newProducer()
- .topic("my-topic")
- .create();
- MessageId messageId = producer.newMessage(Schema.STRING)
- .key("my-key") //设置消息key
- .eventTime(System.currentTimeMillis()) //设置事件事件
- .sequenceId(123) //设置 sequenceId
- .deliverAfter(1, TimeUnit.MINUTES) //延迟投递消息
- .property("my-key", "my-value") //自定义属性
- .value("content")
- .send();
- log.info("messageId={}", messageId);
- producer.close();
- }
- }
复制代码 ProducerCase.java6、Admin
6.1、Brokers
6.1.1、列出活动 broker
- admin.brokers().getActiveBrokers(clusterName)
复制代码 6.1.2、列出 broker 的定名空间
- admin.brokers().getOwnedNamespaces(cluster,brokerUrl);
复制代码 6.1.3、获取动态配置名称
- admin.brokers().getDynamicConfigurationNames();
复制代码 6.1.4、更新动态配置
- admin.brokers().updateDynamicConfiguration(configName, configValue);
复制代码 6.1.5、获取已经更新过的动态配置
- admin.brokers().getAllDynamicConfigurations();
复制代码 6.1.6、获取 leader broker
- admin.brokers().getLeaderBroker()
复制代码 6.2、Clusters
6.2.1、获取集群配置信息
- admin.clusters().getCluster(clusterName);
复制代码 6.2.2、获取集群列表
- admin.clusters().getClusters();
复制代码 6.3、Tenant
6.3.1、列出租户
- admin.tenants().getTenants();
复制代码 6.3.2、创建租户
- admin.tenants().createTenant(tenantName, tenantInfo);
复制代码 6.3.3、获取租户配置信息
- admin.tenants().getTenantInfo(tenantName);
复制代码 6.3.4、删除租户
- admin.Tenants().deleteTenant(tenantName);
复制代码 6.3.5、更新租户
- admin.tenants().updateTenant(tenantName, tenantInfo);
复制代码 6.4、Namespaces
6.4.1、创建定名空间
- admin.namespaces().createNamespace(namespace);
复制代码 6.4.2、获取定名空间策略
- admin.namespaces().getPolicies(namespace);
复制代码 6.4.3、列出定名空间
- admin.namespaces().getNamespaces(tenant);
复制代码 6.4.4、删除定名空间
- admin.namespaces().deleteNamespace(namespace);
复制代码 6.5、Topics
6.5.1、查察订阅的待消费消息(不会消费消息)
- admin.topics().peekMessages(topic, subName, numMessages);
复制代码 6.5.2、通过消息 ID 获取消息
- admin.topics().getMessageById(topic, ledgerId, entryId);
复制代码 6.5.3、通过相对于最早或最新消息的位置来查找消息
- admin.topics().examineMessage(topic, "latest", 1);
复制代码 6.5.4、通过期间获取消息的 ID
- admin.topics().getMessageIdByTimestamp(topic, timestamp);
复制代码 6.5.5、跳过特定主题的某个订阅中的多少条未消费消息
- admin.topics().skipMessages(topic, subName, numMessages);
复制代码 6.5.6、跳过特定主题的某个订阅中的所有未消费消息
- admin.topics().skipAllMessages(topic, subName);
复制代码 6.5.7、根据时间重置游标
- admin.topics().resetCursor(topic, subName, timestamp);
复制代码 6.5.8、获取主题所属的 broker
- admin.lookups().lookupTopic(topic);
复制代码 6.5.9、获取分区主题所属的 broker
- admin.lookups().lookupPartitionedTopic(topic);
复制代码 6.5.10、获取主题的订阅信息
- admin.topics().getSubscriptions(topic);
复制代码 6.5.11、获取最新消息的 ID
- admin.topics().getLastMessage(topic);
复制代码 6.5.12、创建非分区主题
- admin.topics().createNonPartitionedTopic(topicName);
复制代码 6.5.13、删除非分区主题
- admin.topics().delete(topic);
复制代码 6.5.14、列出非分区主题
- admin.topics().getList(namespace);
复制代码 6.5.15、获取非分区主题状态
- admin.topics().getStats(topic, false /* is precise backlog */);
复制代码 6.5.16、获取非分区主题内部状态
- admin.topics().getInternalStats(topic);
复制代码 6.5.17、创建分区主题
- admin.topics().createPartitionedTopic(topicName, numPartitions);
复制代码 6.5.18、获取分区主题元数据信息
- admin.topics().getPartitionedTopicMetadata(topicName);
复制代码 6.5.19、更新分区主题的分区数(只能比原来大)
- admin.topics().updatePartitionedTopic(topic, numPartitions);
复制代码 6.5.20、删除分区主题
- admin.topics().deletePartitionedTopic(topic);
复制代码 6.5.21、列出分区主题
- admin.topics().getPartitionedTopicList(namespace);
复制代码 6.5.22、获取分区主题状态
- admin.topics().getPartitionedStats(topic, true /* per partition */, false /* is precise backlog */);
复制代码 6.5.23、获取分区主题内部状态
- admin.topics().getPartitionedInternalStats(topic);
复制代码 6.5.24、创建订阅
- admin.topics().createSubscription(topic, subscriptionName, MessageId.latest);
复制代码 6.5.25、获取订阅
- admin.topics().getSubscriptions(topic);
复制代码 6.5.26、删除订阅
- admin.topics().deleteSubscription(topic, subscriptionName);
复制代码 6.6、完备代码
data:image/s3,"s3://crabby-images/e0f05/e0f05a005ca74af689d31917b7f728b20fba3814" alt="" data:image/s3,"s3://crabby-images/8bdce/8bdce277043852b5cc34353ff5b6afee93e19db3" alt="" - package com.abc.demo.pulsar;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.pulsar.client.admin.PulsarAdmin;
- import org.apache.pulsar.client.admin.PulsarAdminException;
- import org.apache.pulsar.client.api.Message;
- import org.apache.pulsar.client.api.MessageId;
- import org.apache.pulsar.client.api.PulsarClientException;
- import org.apache.pulsar.common.policies.data.TenantInfoImpl;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- import java.util.Collections;
- import java.util.HashSet;
- @Slf4j
- public class AdminCase {
- private PulsarAdmin admin;
- @Before
- public void before() throws PulsarClientException {
- admin = PulsarAdmin.builder()
- .serviceHttpUrl("http://10.49.196.30:8080,10.49.196.31:8080,10.49.196.32:8080")
- .build();
- }
- @After
- public void after() {
- admin.close();
- }
- @Test
- public void broker() throws PulsarAdminException {
- log.info("getActiveBrokers={}", admin.brokers().getActiveBrokers());
- log.info("getOwnedNamespaces={}", admin.brokers().getOwnedNamespaces("pulsar-cluster-1", "app1:8080"));
- log.info("getDynamicConfigurationNames={}", admin.brokers().getDynamicConfigurationNames());
- admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "true");
- log.info("getAllDynamicConfigurations={}", admin.brokers().getAllDynamicConfigurations());
- log.info("getLeaderBroker={}", admin.brokers().getLeaderBroker());
- }
- @Test
- public void clusters() throws PulsarAdminException {
- log.info("getCluster={}", admin.clusters().getCluster("pulsar-cluster-1"));
- log.info("getClusters={}", admin.clusters().getClusters());
- }
-
- @Test
- public void tenants() throws PulsarAdminException {
- log.info("getTenants={}", admin.tenants().getTenants());
- TenantInfoImpl tenantInfo = new TenantInfoImpl();
- tenantInfo.setAdminRoles(new HashSet<>());
- tenantInfo.setAllowedClusters(Collections.singleton("pulsar-cluster-1"));
- admin.tenants().createTenant("test-tenant", tenantInfo);
- log.info("getTenantInfo={}", admin.tenants().getTenantInfo("test-tenant"));
- admin.tenants().updateTenant("test-tenant", tenantInfo);
- admin.tenants().deleteTenant("test-tenant");
- }
- @Test
- public void namespaces() throws PulsarAdminException {
- admin.namespaces().createNamespace("public/test-ns");
- log.info("getPolicies={}", admin.namespaces().getPolicies("public/default"));
- log.info("getNamespaces={}", admin.namespaces().getNamespaces("public"));
- admin.namespaces().deleteNamespace("public/test-ns");
- }
-
- @Test
- public void topics() throws PulsarAdminException {
- log.info("peekMessages={}", admin.topics().peekMessages("persistent://public/default/my-topic", "my-subscription", 3));
- Message<byte[]> message = admin.topics().getMessageById("persistent://public/default/my-topic", 171, 16);
- log.info("getMessageById={}", new String(message.getData()));
- message = admin.topics().examineMessage("persistent://public/default/my-topic", "latest", 1);
- log.info("examineMessage={}", new String(message.getData()));
- log.info("getMessageIdByTimestamp={}", admin.topics().getMessageIdByTimestamp("persistent://public/default/my-topic", System.currentTimeMillis()));
- admin.topics().skipMessages("persistent://public/default/my-topic", "my-subscription", 1);
- admin.topics().skipAllMessages("persistent://public/default/my-topic", "my-subscription");
- admin.topics().resetCursor("persistent://public/default/my-topic", "my-subscription", System.currentTimeMillis() - 1000 * 60 * 15);
- log.info("lookupTopic={}", admin.lookups().lookupTopic("persistent://public/default/my-topic"));
- log.info("lookupPartitionedTopic={}", admin.lookups().lookupPartitionedTopic("persistent://public/default/my-topic2"));
- log.info("getSubscriptions={}", admin.topics().getSubscriptions("persistent://public/default/my-topic"));
- log.info("getLastMessageId={}", admin.topics().getLastMessageId("persistent://public/default/my-topic"));
- admin.topics().createNonPartitionedTopic("persistent://public/default/test-topic");
- admin.topics().delete("persistent://public/default/test-topic");
- log.info("getList={}", admin.topics().getList("public/default"));
- log.info("getStats={}", admin.topics().getStats("persistent://public/default/my-topic", false));
- log.info("getInternalStats={}", admin.topics().getInternalStats("persistent://public/default/my-topic"));
- admin.topics().createPartitionedTopic("persistent://public/default/test-topic-p", 2);
- log.info("getPartitionedTopicMetadata={}", admin.topics().getPartitionedTopicMetadata("persistent://public/default/test-topic-p"));
- admin.topics().updatePartitionedTopic("persistent://public/default/test-topic-p", 3);
- admin.topics().deletePartitionedTopic("persistent://public/default/test-topic-p");
- log.info("getStats={}", admin.topics().getPartitionedStats("persistent://public/default/my-topic2", false));
- log.info("getInternalStats={}", admin.topics().getPartitionedInternalStats("persistent://public/default/my-topic2"));
- admin.topics().createSubscription("persistent://public/default/my-topic", "test-subscription", MessageId.latest);
- log.info("getStats={}", admin.topics().getSubscriptions("persistent://public/default/my-topic"));
- admin.topics().deleteSubscription("persistent://public/default/my-topic", "test-subscription");
- }
- }
复制代码 AdminCase.java
参考:
https://pulsar.apache.org/docs/3.3.x/client-libraries-java/
https://pulsar.apache.org/docs/3.3.x/admin-api-overview/
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |