7.构建数据管道

  • 使用Kafka构建数据管道通常分为两种场景

    • 把Kafka作为数据管道的两个端点之一

    • 把Kafka作为数据管道两个端点之间的媒介。

  • 在0.9版本Kafka提供Kafka Connect来支持以上需求。

构建数据管道需要考虑的问题

及时性

  • 一个数据管道需要考虑数据及时性问题,Kafka作为数据管道相当于一个数据缓冲区的决策,降低生产者与消费者之间的时间敏感度 。实时的生产者和基于批处理的消费者可以同时存在,也可以任意组合。实现回压策略也因此变得更加容易,Kafka本身就是用反压策略,消费速率完全取决消费者。

可靠性

  • 避免单点故障,并能够自动从各种故障快速恢复。

高吞吐量和动态吞吐量

  • Kafka作为生产者和消费者之间的缓冲区,消费者的吞吐和生产者的吞吐不是耦合的,也无需在实现复杂的反压机制,如果生产者吞吐量超过消费者的吞吐量,可以把数据积压在Kafka里,等到消费者追赶上来。通过增加额外的消费者或生产者可以实现Kafka的伸缩,因此可以在数据管道的任一边进行动态伸缩。

数据格式

  • 数据管道需要协调各种数据格式和数据类型,这是数据管道的一个非常重要的因素 。数据类型取决于不同的数据库和数据存储系统.

转换

  • 数据管道可以分为ETL和ELT。

    • ETL:提取-转换-加载(Extractt-Transform-Load), 当数据流经数据管道时,数据管道会负责处理它们 。

    • ELT(提取-加载-转换): 数据管道只做少量的转换(主要是类型转换),确保到达数据池的数据尽可能地与数据源保持一致。 这种情况称为高保真(high fidelity)数据关到或数据湖(data lake)架构 。目标系统收集"原始数据",并负责处理它们。这种方式为目标系统的用户提供了最大的灵活性,因为它们可以访问到完整的数据。在这些系统里诊断问题也变得更加容易,因为数据被集中在同一个系统里进行处理,而不是分散在数据关到和其他应用里。

安全性

  • Kafka支持加密传输数据,从数据源到Kafka再从Kafka到数据池。支持认证和授权。

  • 支持审计日志跟踪访问记录,通过编写代码可以定位到每个事件的来源和修改者。

故障处理能力

  • Kafka会长时间保留数据,所以我们可以在适当的适合回过头来重新处理出错的数据。

耦合性和灵活性

  • 数据管道作用之一就是解耦数据源和数据池

如何在Connect API和客户端API选择

  • 在向Kafka写入数据或从Kafka读取数据时,要么使用传统的生产者和消费者客户端,要么使用Connect API和连接器。

  • 客户端要被内嵌到应用程序里,应用程序向Kafka写入数据或从Kafka读取数据 。

  • Kafka Connect 直接连接到数据存储系统,用于从外部数据存储系统读取数据,或者将数据推送到外部系统。 如果数据存储系统提供了相应的connect,就可以通过配置来完成,类似与Flume这种。

Kafka Connect

  • 它为在 Kafka和外部数据存储系统之间移动数据提供一种可靠且可伸缩的方式 。Connect 以worker进程集群的方式运行,基于worker进程安装连接器插件,然后使用REST API来管理和配置connector,这些worker进程都是长时间持续运行的作业 。连接器 启动额外的task,有效地利用工作节点的资源,以并行的方式移动大量的数据。 数据源的连接器负责从源系统读取数据, 并把数据对象提供给worker进程 。数据池的连接器负责 从worker进程获取数据,并把它们写入目标系统 。

  • 参考Debezium和Kafka connect的整合使用

运行Connect

  • 修改配置connect-distruibuted.properties

    • bootstrap.servers:列出将要与Connect协同工作的broker服务器

    • group.id:具有相同group.id的worker属于同一个Connect集群。集群的连接器和它们的 任务可以运行在任意一个wokrer上。

    • key.converter和value.converter:Connect可以处理存储Kafka里的不同格式的数据。指定消息键和值的转换器。默认为JSONConver。

      • key.converter.schemas.enable:指定JSON消息是否可以包含schema

      • value.converter.schemas.enable:指定JSON消息是否可以包含schema

    • rest.host.name=localhost rest的host

    • rest.port=8888 rest端口

查看支持的插件

http://localhost:8888/connector-plugins 

单机模式

  • Connect支持单机模式,单机模式与分布式模式,在启动时使用connect-standalone.sh代替connect-distrbuted.sh,也可以通过命令后传入连接器的配置文件。

配置Connectors

Connector 配置是简单的key-value映射。对于单机模式这是定义在一个属性文件和 传递到命令行上的连接进程。在分布式模式下,它们将包含在创建(或修改)连接器的请求的JSON有效负载中 。

  • name:connetctor的唯一名称。尝试注册相同的名称将会失败

  • connector.class:connector的java类, 支持类的全路径名,也支持别名。

  • task.max: tasks应该被创建在这个connector的最大数量。 这个connector可能创建最少的tasks如果它不能实现这种级别的并行度

  • key.converter:可选的,覆盖默认的key转换器在这个worker中

  • value.converter:可选的, 覆盖worker设置的默认值转换器。

sink connector有一些额外的可选项来控制它们的输入

  • topics- 使用逗号分隔的主题列表作为此连接器的输入

  • topics.regex-主题的Java正则表达式,用作此连接器的输入

Transformations转换

可以 使用转换配置连接器,以进行轻量级的每次消息修改。它们可以方便地进行数据处理和事件路由 。

  • transformas:transformas 的别名列表,指定应用 transformas 的顺序。

  • transformas.$alias.type:transformas 的完全合格的类名称。

  • transformas.$alias.$transformationSpecificConfig:transformas的配置属性

最后更新于