1.快速入门

快速使用

本地安装Pulsar

  • 单机模式pulsar需要pulsar broker、必要的zookeeper和BookKeeper组件

下载二进制包

wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz
tar xvfz apache-pulsar-2.8.0-bin.tar.gz
cd apache-pulsar-2.8.0

安装分层存储携带程序

wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-offloaders-2.8.0-bin.tar.gz
# 解压目录拷贝至pulsar/offloaders
mv apache-pulsar-offloaders-2.8.0/offloaders offloaders

启动单机模式Pulsar

bin/pulsar standalone

使用单机模式Pulsar

Consume消息

bin/pulsar-client consume my-topic -s "first-subscription"

Produce消息

bin/pulsar-client produce my-topic --messages "hello-pulsar"

终止单机模式Pulsar

  • Ctrl+C终止单机模式Pulsar的运行

Docker里配置单机Pulsar

docker中启动Pulsar

docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.8.0 bin/pulsar standalone

在Docker中运行Pulsar

  • pulsar://localhost:6650

  • http://localhost:8080

获取topic数据

curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool

概述

概述

Pulsar特性

  • 单实例原声支持多个集群,可跨机房在集群间无缝地完成消息复制。

  • 极低的发布延迟和端到端延迟。

  • 可无缝扩展到超过1百万个topic。

  • 支持多种订阅模式(独占订阅、共享订阅、故障转移订阅)

  • 通过Apache BookKeeper提供的持久化消息存储机制保证消息传递

    • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。

    • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。

    • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

消息

  • Pulsar采用发布-订阅的设计模式(pub-sub),producers发布消息到topic,consumers订阅这些topic处理消费消息并且当处理完毕时发送ack到broker。

消息格式

组件说明

Value / data payload

消息所承载的数据。 尽管消息数据也可以符合数据 schemas,但所有 Pulsar 消息都包含原始字节。

Key

消息可以选择用键进行标记,这在 topic 压缩 等操作很有用。

Properties

用户自定义属性的键值对(可选)。

Producer 名称

生成消息的 producer 的名称。 如果不指定,则使用默认名称

Sequence ID

每个 Pulsar 消息都存储在其主题上的有序序列中。消息的序列 ID 是其在该序列中的顺序。

Publish time

消息发布的时间戳,由 producer 自动添加。

Event time

应用程序可以附加到消息的时间戳(可选), 例如处理消息的时间。 如果没有明确设置,则消息的事件时间为 0

TypedMessageBuilder

用于构造消息。 您可以使用 TypedMessageBuilder 设置消息的键值对属性。 在设置 TypedMessageBuilder 时,最佳的选择是将 key 设置为字符串。 如果将 key 设置为其他类型(例如,AVRO 对象),则 key 会以字节形式发送,这时 consumer 就很难使用了。

消息配置

  • Message 默认最大可携带 5 MB 数据。

# broker.conf 消息的最大大小(字节数)。
maxMessageSize=5242880
# bookkeeper.conf
nettyMaxFrameSizeBytes=5253120

Producer

  • producer是连接topic的程序将消息发布到一个Pulsar broker上。

发送模式

发送模式说明

Sync send

Producer 将在发送每条消息后等待 broker 的确认。 如果未收到确认,则 producer 将认为发送失败。

异步发送

Producer 将把消息放于阻塞队列中,并立即返回 然后,客户端将在后台将消息发送给 broker。 如果队列已满(最大大小可配置),则调用 API 时,producer 可能会立即被阻止或失败,具体取决于传递给 producer 的参数。

访问模式

  • 对于producer来说topic上可以有不同的访问模式

    • Exclusive:默认的配置是仅有一个producer可以在topic上发布消息

    • WaitForExclusive:如果已经有一个生产者连接了主题,生产者创建过程被挂起(而不是超时) 直到这个生产者获得了 Exclusive 访问权限。

压缩策略

  • LZ4

  • ZLIB

  • ZSTD

  • SNAPPY

批量处理

  • 当批量处理启用时,producer 会在单个请求中积累并发送一批消息。 批量处理的量大小由最大消息数最大发布延迟定义。 因此,积压数量是分批处理的总数,而不是信息总数。

  • Pulsar将批次被跟踪并存储为单个单元,而不是单个消息。 Consumer 将批量处理的消息拆分成单个消息。 但即使启用了批量处理,也始终将计划中的消息(通过 deliverAt 或者 deliverAfter 进行配置) 作为单个消息发送。当consumer确认了一个批的所有消息才算确认。Broker 维护批量索引的确认状态并跟踪每批索引的确认状态,以避免向 consumer 发送已确认的消息(端到端一致性)。默认情况下,禁用批处理索引确认(acknowledgmentAtBatchIndexLevelEnabled=false)。 您可以通过在代理端将knowledgeAtBatchIndexLevelEnabled 参数设置为true 来启用批量索引确认。启用批量索引确认将会导致更多内存开销。

分块

  • batch消息和分块不能同时开启,分块仅支持持久化的topic,分块仅支持独占和故障转移订阅模式。

  • 当启用分块(chunking) 时(chunkingEnabled=true) ,如果消息大小大于允许的最大发布有效载荷大小,则 producer 将原始消息分割成分块的消息,并将它们与块状的元数据一起单独和按顺序发布到 broker。 在 broker 中,分块的消息将和普通的消息以相同的方式存储在 Managed Ledger 上。 唯一的区别是,consumer 需要缓冲分块消息,并在收集完所有分块消息后将其合并成真正的消息。 Managed Ledger 上的分块消息可以和普通消息交织在一起。 如果 producer 未能发布消息的所有分块,则当 consumer 未能在过期时间(expire time) 内接收所有分块时,consumer 可以过期未完成的分块。 默认情况下,过期时间设置为1小时。

  • Consumer 会缓存收到的块状消息,直到收到消息的所有分块为止。 然后 consumer 将分块的消息拼接在一起,并将它们放入接收器队列中。 客户端从接收器队列中消费消息。 一旦 consumer 使用整个大消息并确认,consumer 就会在内部发送与该大消息关联的所有分块消息的确认。设置maxPendingChunkedMessage参数当达到阈值时,consumer通过静默确认未分块的消息或通过将其标记为未确认,要求broker稍后重新发送这些消息。

处理一个producer和一个订阅consumer 的分块消息

多个producer到一个consumer的分块消息

  • 当多个生产者发布块消息到单个主题,这个 Broker 在同一个 Ledger 里面保存来自不同生产者的所有块消息。 如下所示,生产者1发布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三个块组成。 生产者2发布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三个块组成。 这些特定消息的所有分块是顺序排列的,但是其在 ledger 里面可能不是连续的。 这种方式会给消费者带来一定的内存负担。因为消费者会为每个大消息在内存开辟一块缓冲区,以便将所有的块消息合并为原始的大消息。

Consumer

  • Consumer 向 broker 发送消息流获取申请(flow permit request)以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 你能够通过receiverQueueSize参数配置队列的长度 (队列的默认长度是1000) 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。

接收模式

  • consumer支持sync和async的方式从broker中接收数据

发送模式说明

同步接收

同步模式,在收到消息之前都是被阻塞的。

异步接收

异步接收模式会立即返回一个 future 值(如 Java 中的 CompletableFuture),一旦收到新的消息就立刻完成。

监听

  • pulsar java client提供一个MessageListener接口,可以通过实现接口一旦接收到消息,received方法将被调用,这里类似于RocketMQ

确认

  • consumer发送一个ack请求到broker后这条消息被消费成功,被消费的消息将会被永久的存储,只有在所有订阅用户确认后才能删除。

  • 可以通过以下两种方式之一确认消息:

    • 单独的ack。通过单独的确认,消费者确认每条消息并向broker发送确认请求。共享订阅模式,消息都是单条确认模式。

    consumer.acknowledge(msg);
    • 累计的ack。使用累积ack,使用者只确认它接收到的最后一条消息。流中直到(包括)所提供消息的所有消息都不会被重新传递给该消费者。

    consumer.acknowledgeCumulative(msg);

取消确认

  • 在独占消费模式和灾备订阅模式中,消费者仅仅只能对收到的最后一条消息进行取消确认。

consumer.negativeAcknowledge(msg);

死信topic

  • 当一些消息不能成功消费,在此机制中,无法使用的消息存储在单独的主题中,称为死信主题。您可以决定如何处理死信主题中的消息。默认死信topic名称,--DLQ

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
              .topic(topic)
              .subscriptionName("my-subscription")
              .subscriptionType(SubscriptionType.Shared)
              .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                    .deadLetterTopic("your-topic-name")
                    .build())
              .subscribe();
  • 死信消息依赖于消息的重放,消息重放是由于ack超时和取消ack导致。

延时重试topic

  • 很多在线的业务系统,由于业务逻辑处理出现异常,消息一般需要被重新消费。 若需要允许延时重新消费失败的消息,你可以配置生产者同时发送消息到业务主题和重试主题,并允许消费者自动重试消费。 配置了允许消费者自动重试。如果消息没有被消费成功,它将被保存到重试主题当中。并在指定延时时间后,自动重新消费重试主题里面的消费失败消息。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                .topic(topic)
                .subscriptionName("my-subscription")
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .receiverQueueSize(100)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .maxRedeliverCount(maxRedeliveryCount)
                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
                        .build())
                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                .subscribe();
# 延时重试
consumer.reconsumeLater(msg,3,TimeUnit.SECONDS);

Topic

  • 一个topic由一下组成

{persistent|non-persistent}://tenant/namespace/topic
Topic名称组成说明

持久化 / 非持久化

用来标识 topic 的类型。 Pulsar 支持两种主题类型:持久化非持久化。 主题默认是持久化类型,如果不特殊指定主题类型,那主题就是持久化的。 对于持久化的主题,所有的消息都会被持久化的保存到磁盘当中(如果 broker 不是单机模式,消息会被持久化到多块磁盘),而非持久化的主题的数据不会被保存到磁盘里面。

租户

实例中的主题租户。租户是 Pulsar 多租户的基本要素,租户是可以跨越多个集群的。

命名空间

将相关联的 topic 作为一个组来管理,是管理 Topic 的基本单元。 大多数对 topic 的管理都是对命名空间的一项配置。 每个租户里面可以有一个或者多个命名空间。

topic

主题名称是主题命名结构的最后一个部分,主题名字在 Pulsar 实例当中没有特殊意义。

  • 如果客户端指定了不存在的topic消费消息,pulsar会自动使用该主题命在该命名空间下创建个同名主题,例如:my-topic的名称为persistent://my-tenant/my-namespace/my-topic

命名空间

  • 命名空间是租户内部逻辑上的命名术语。 可以通过admin API在租户下创建多个命名空间。 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。 Namespace使得程序可以以层级的方式创建和管理topic Topicmy-tenant/app1 ,它的namespace是app1这个应用,对应的租户是 my-tenant。 你可以在namespace下创建任意数量的topic

订阅

  • 订阅是命名好的配置规则,指导消息如何投递给消费者。 Pulsar 中有四种订阅模式: 独占共享灾备key共享 下图展示了这三种模式:

独享模式

  • 独享模式,只有一个消费者被运行链接subscription,如果多个消费者订阅一个topic使用相同的subscription就会报错。

Failover模式

  • Failover模式中,多个consumer可以绑定到同一个subscription。 主消费者会消费非分区主题或者分区主题中的每个分区的消息。当主消费者丢失链接,所有(未确认的和后续的)消息都将传递给下一个消费者。

  • 对于分区topic来说,broker将按照消费者的优先级和消费者名称的词汇表顺序对消费者进行排序,然后试图将topic均匀的分配给优先级最高的消费者。

Shard(共享)

  • shared或者round robin模式中,多个消费者可以绑定到同一个订阅上。 消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

Key_Shard

  • 消息以分布的形式跨消费者交付,具有相同键或相同顺序键的消息只交付给一个消费者。无论消息被重新传递多少次,它都被传递给相同的消费者。当使用者连接或断开连接时,将导致所服务的使用者更改消息的某些键。

Producer<byte[]> producer = client.newProducer()
        .topic("my-topic")
        .batcherBuilder(BatcherBuilder.KEY_BASED)
        .create();

多主题订阅

  • 通过正则方式订阅,例如persistent://public/default/finance-.*

  • 明确指定的topic列表

import java.util.regex.Pattern;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;

PulsarClient pulsarClient = // Instantiate Pulsar client object

// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsInNamespace)
                .subscriptionName("subscription-1")
                .subscribe();

分区topic

  • 普调topic限制在一个broker上,限制了topic的最大吞吐量,分区topic将数据按照分区存储在多个broker,吞吐量也会提升。分区topic实际是通过在底层拥有N个内部topic来实现的,这个N的数量就是等于分区的梳理。当向分区的topic发送消息,没条消息都被路由到其中一个broker。Pulsar自动处理跨broker的分区分布。

  • topic1有5个分区,因为分区多于broker数量,其中有两个broker要处理两个分区。第三个broker则只处理一个。(再次强调,分区的分布是Pulsar自动处理的)。

路由模式

  • producer发送消息到分区topic需要指定路由模式,决定每条消息被发布到的分区

发送模式说明

RoundRobinPartition

如果消息没有指定 key,为了达到最大吞吐量,生产者会以 round-robin 方式将消息发布到所有分区。 请注意round-robin并不是作用于每条单独的消息,而是作用于延迟处理的批次边界,以确保批处理有效。 如果消息指定了key,分区生产者会根据key的hash值将该消息分配到对应的分区。 这是默认的模式。

SinglePartition

如果消息没有指定 key,生产者将会随机选择一个分区,并发布所有消息到这个分区。 如果消息指定了key,分区生产者会根据key的hash值将该消息分配到对应的分区。

CustomPartition

使用自定义消息路由器实现来决定特定消息的分区。 用户可以创建自定义路由模式:使用 Java client 并实现MessageRouter 接口。

顺序保证

  • 消息的排序与MessageRoutingMode和Message Key有关。通常,用户希望按键分区的顺序进行保证。当使用SinglePartitionRoundRobinPartition模式时,如果消息有key,消息将被路由到匹配的分区。

顺序保证说明路由策略与消息Key

按键分区

所有具有相同 key 的消息将按顺序排列并放置在相同的分区(Partition)中。

使用 SinglePartitionRoundRobinPartition 模式,每条消息都需要有key。

生产者排序

来自同一生产者的所有消息都是有序的

路由策略为SinglePartition, 且每条消息都没有key。

HashingScheme

  • HashingScheme 是代表一组标准散列函数的枚举。为一个指定消息选择分区时使用。有两种可用的散列函数: JavaStringHashMurmur3_32Hash. The default hashing function for producer is JavaStringHash. 请注意,当producer可能来自于不同语言客户端时,JavaStringHash是不起作用的。建议使用Murmur3_32Hash。有两种可用的散列函数: JavaStringHashMurmur3_32Hash. The default hashing function for producer is JavaStringHash. 请注意,当producer可能来自于不同语言客户端时,JavaStringHash是不起作用的。建议使用Murmur3_32Hash

非持久topic

  • 默认的,pulsar保存所有没有去人的消息到多个Bookeeper的bookies中(存储节点)。持久化topic的消息数据可以在broker重启或订阅者出问题的情况下存活下来。隐藏,持久化topic上的消息数据可以在broker重启和consumer故障转移的时候继续存在。

  • 非持久topic不持久存储到磁盘,只存在内存中。pulsar也提供了非持久topic。非持久topic的消息不会被保存在硬盘上,只存活于内存中。当使用非持久化topic分发时,杀pulsar的broker或关闭consumer,此topic上所有的瞬时消息都会丢失。

non-persistent://tenant/namespace/topic
  • 非持久topic中,broker会立即发布消息给所有连接的订阅者,而不会在BookKeeper存储。 如果有一个订阅者断开连接,broker将无法重发这些瞬时消息,订阅者将永远也不能收到这些消息了。 去掉持久化存储的步骤,在某些情况下,使得非持久topic的消息比持久topic稍微变快。

  • 非持久topic的ack会在消息发送后立刻返回给producer

客户端API

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();
String npTopic = "non-persistent://public/default/my-topic";
String subscriptionName = "my-subscription-name";

Consumer<byte[]> consumer = client.newConsumer()
        .topic(npTopic)
        .subscriptionName(subscriptionName)
        .subscribe();
        
Producer<byte[]> producer = client.newProducer()
                .topic(npTopic)
                .create();        

消息保留和过期

  • broker默认配置

    • 立即删除所有已经被consumer确认过的消息

    • 以消息backlog的形式,持久保存所有的未被确认消息

  • pulsar的俩个特性可以覆盖默认行为

    • 消息存留让你可以保存consumer确认过的消息

    • 消息过期让你可以给未被确认的消息设置存活时长(TTL)

  • 没有被留存规则覆盖的消息将会被删除,没有留存规则的话所有被确认的消息都会被删除。

  • 有些消息即使还没有被确认,也被删除掉了。因为根据设置在namespace上的TTL,他们已经过期了。(例如,TTL为5分钟,过了十分钟消息还没被确认)

消息去重

生产者幂等

  • 消息去重的另外一种方法是确保每条消息仅生产一次。 这种方法通常被叫做生产者幂等。 这种方式的缺点是,把消息去重的工作推给了应用去做。 在 Pulsar 中,消息去重是在 broker上处理的,用户不需要去修改客户端的代码。 相反,你只需要通过修改配置就可以实现。

去重和exctly-onece

  • 消息去重,使 Pulsar 成为了流处理引擎(SPE)或者其他寻求 "仅仅一次" 语义的连接系统所需的理想消息系统。 如果消息系统没有提供自动去重能力,那么 SPE (流处理引擎) 或者其他连接系统就必须自己实现去重语义,这意味着需要应用去承担这部分的去重工作。 使用Pulsar,严格的顺序保证不会带来任何应用层面的代价。

消息延迟传递

  • 延时消息功能允许你能够过一段时间才能消费到这条消息,而不是消息发布后,就马上可以消费到。

  • Broker 保存消息是不经过任何检查的。 当消费者消费一条消息时,如果这条消息是延时消息,那么这条消息会被加入到DelayedDeliveryTracker当中。 订阅检查机制会从DelayedDeliveryTracker获取到超时的消息,并交付给消费者。

broker

  • 延迟的默认时间和是否开启可以通过修改broker配置

# Whether to enable the delayed delivery for messages.
# If disabled, messages are immediately delivered and there is no tracking overhead.
delayedDeliveryEnabled=true

# Control the ticking time for the retry of delayed message delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000

producer

// message to be delivered at the configured delay interval
producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();

架构

架构概述

  • 单个pulsar实例由一个或多个pulsra集群组成,实例中的集群之前可以互相复制数据。

  • 单个pulsar集群由以下三个部分组成:

    • 一个或者多个 broker 负责处理和负载均衡 producer 发出的消息,并将这些消息分派给 consumer;Broker 与 Pulsar 配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies);Broker 依赖 ZooKeeper 集群处理特定的任务,等等。

    • 包含一个或多个 bookie 的 BookKeeper 集群负责消息的持久化存储

    • 一个Zookeeper集群,用来处理多个Pulsar集群之间的协调任务。

Brokers

  • pulsar的broker是一个无状态组件,主要负责运行另外俩个组件

    • 一个HTTP服务器,它为生产者和消费者提供管理任务和主题查找的REST API。生产者连接到代理以发布消息,而消费者连接到代理以消费消息。

    • 一个调度分发器,它是异步的TCP服务器,通过自定义的二进制协议应用所有相关的数据传输。

  • 出于性能的考虑, 通常从 managed ledger (ledger是Pulsar底层存储BookKeeper中的概念,相当于一种记录的集合) 缓存中调度消息, 除非 积压的消息超过这个缓存的大小。 如果积压的消息对于缓存来说太大了, 则Broker将开始从BookKeeper那里读取Entries(Entry同样是BookKeeper中的概念,相当于一条记录)。

集群

  • 一个pulsar实例包含一个或多个pulsar集群。集群包括

    • 一个或者多个Pulsar brokers

    • 一个ZooKeeper协调器,用于集群级别的配置和协调

    • 一组BookKeeper的Bookies用于消息的 持久化存储

元数据存储

  • Pulsar 元数据存储维护一个 Pulsar 集群的所有元数据,例如主题元数据、模式、代理负载数据等。 Pulsar 使用 Apache ZooKeeper 进行元数据存储、集群配置和协调。 Pulsar 元数据存储可以部署在单独的 ZooKeeper 集群上,也可以部署在现有的 ZooKeeper 集群上。 您可以将一个 ZooKeeper 集群同时用于 Pulsar 元数据存储和 BookKeeper 元数据存储。 如果要部署连接到现有 BookKeeper 集群的 Pulsar broker,则需要分别为 Pulsar 元数据存储和 BookKeeper 元数据存储部署单独的 ZooKeeper 集群。

  • 配置与仲裁存储: 存储租户,命名域和其他需要全局一致的配置项

  • 每个集群有自己独立的ZooKeeper保存集群内部配置和协调信息,例如归属信息,broker负载报告,BookKeeper ledger信息(这个是BookKeeper本身所依赖的)等等。

配置存储

  • 配置存储维护一个 Pulsar 实例的所有配置,例如集群、租户、命名空间、分区主题相关配置等。 一个 Pulsar 实例可以有一个本地集群、多个本地集群或多个跨区域集群。 因此,配置存储可以在 Pulsar 实例下的多个集群之间共享配置。 配置存储可以部署在单独的 ZooKeeper 集群上,也可以部署在现有的 ZooKeeper 集群上。

持久化存储

  • 未确认送大的消息需要持久化存储直到它们被确认送达,这种消息推送方式叫做持久化消息推送。在pulsar内部,所有消息都被保存并同步N份,例如2个服务器保存4份,每个服务器上都有镜像的RAID存储。

Apache bookeeper

  • pulsar用bookeeper作为持久化存储,bookeeper是一个分布式的wal系统,有以下特性适合pulsar

    • 能让Pulsar创建多个独立的日志,这种独立的日志就是ledgers. 随着时间的推移,Pulsar会为Topic创建多个ledgers

    • 为按条目复制的顺序数据提供了非常高效的存储。

    • 保证了多系统挂掉时ledgers的读取一致性。

    • 提供不同的Bookies之间均匀的IO分布的特性。

    • 容量和吞吐量都能水平扩展。并且容量可以通过在集群内添加更多的Bookies立刻提升。

    • Bookies被设计成可以承载数千的并发读写的ledgers。 使用多个磁盘设备,一个用于日志,另一个用于一般存储,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开。

  • 除了消息数据,cursors也会被持久化入BookKeeper。 Cursors是消费端订阅消费的位置。 BookKeeper让Pulsar可以用一种可扩展的方式存储消费位置。

# 消息存储
persistent://tenant/namespace/topic

Ledgers

  • Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个Bookeeper存储节点(Bookies)的写入。Ledger的条目会被复制到多个bookies。

    • Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger。

    • 当一个ledger被关闭后,除非明确的要写数据或者是因为写入器挂掉导致ledger关闭,这个ledger只会以只读模式打开。

    • 最后,当ledger中的条目不再有用的时候,整个legder可以被删除(ledger分布是跨Bookies的)

Ledgers的读一致性

  • BookKeeper的主要优势在于他能在有系统故障时保证读的一致性。 由于Ledger只能被一个进程写入(之前提的写入器进程),这样这个进程在写入时不会有冲突,从而写入会非常高效。 在一次故障之后,ledger会启动一个恢复进程来确定ledger的最终状态并确认最后提交到日志的是哪一个条目。 在这之后,能保证所有的ledger读进程读取到相同的内容。

Managed ledgers

  • 由于BookKeeper Ledgers提供了单一的日志抽象,在ledger的基础上我们开发了一个叫managed ledger的库,用以表示单个topic的存储层。 managed ledger即消息流的抽象,有一个写入器进程不断在流结尾添加消息,并且有多个cursors 消费这个流,每个cursor有自己的消费位置。

    一个managed ledger在内部用多个BookKeeper ledgers保存数据,这么做有两个原因:

    1. 在故障之后,原有的某个ledger不能再写了,需要创建一个新的。

    2. ledger在所有cursors消费完它所保存的消息之后就可以被删除,这样可以实现ledgers的定期翻滚从头写。

日志存储

  • BookKeeper的日志文件包含事务日志。 在更新到 ledger之前,bookie需要确保描述这个更新的事务被写到持久(非易失)存储上面。 在bookie启动和旧的日志文件大小达到上限(由 journalMaxSizeMB 参数配置)的时候,新的日志文件会被创建。

Pulsar proxy

  • 架构上来看,Pulsar Proxy从ZooKeeper上面读取他所需要的所有信息。 当启动代理时,你只需要提供用于集群独有和实例范围的配置存储的ZooKeeper连接串。

bin/pulsar proxy \
  --zookeeper-servers zk-0,zk-1,zk-2 \
  --configuration-store-servers zk-0,zk-1,zk-2

服务发现

  • 客户端需要能够使用单个URL与整个Pulsar实例进行同学。Pulsar内部提供了服务发现机制,可以通过配置Pulsar实例指南设置。

最后更新于