陈肃作者

DataPipeline CTO陈肃:从ETL到ELT,AI时代数据集成的问题与解决方案

大家好!很高兴今天有机会和大家分享一些数据集成方面的看法和应用经验。先自我介绍一下。我叫陈肃,博士毕业于中国科学院大学,数据挖掘研究方向。现在北京数见科技(DataPipeline)任 CTO。之前在中国移动研究院任职算法工程师和用户行为实验室技术经理,之后作为合伙人加入过一家互联网教育公司,从事智能学习方面的研发工作。

在毕业后工作的这多年以来,我大部分时候在做大数据机器学习相关的应用系统研发工作,数据的整合是其中一个非常重要的环节。加入 DataPipeline 后,公司研发的是一款企业级的数据集成产品,旨在帮助企业一站式解决数据集成和元数据管理问题。

ELT 和 ETL 是数据集成的两种基本方式。前者专注于大数据的实时抽取和可靠传输,后者则包含了更丰富的数据转换功能。 由于今天是和 AI 前线的朋友们一起探讨数据集成,我主要结合 AI 应用的场景谈谈:为什么 ELT 是更适合 AI 应用场景的数据集成方案、采用 Kafka 技术栈来构建 ELT 平台所具备的优势和问题以及我们所做的一些优化工作。希望能够对大家的工作和学习有所帮助。

今天我的分享主要内容如上图:

首先,我会介绍一下 AI 应用中数据集成的典型场景,ETL 和 ELT 两种数据集成模式的异同点,以及为什么 AI 应用下更适合采用 ELT 模式。然后,我会花一些篇幅介绍数据集成中需要重点考虑的基本问题,以及我们所采用的底层平台——Kafka Connect 在解决这些问题上的优势和局限。

接下来,我会介绍 DataPipeline 对于 Kafka Connect 一些优化。有的是从底层做的优化,例如线程池的优化。有的则是从产品特性上的优化,例如错误数据队列。

最后,我们谈一谈 Kafka Connect 和 Kafka Stream 的结合,以及我们用 Kafka Stream 做数据质量预警方面的一个应用 Case。

一、AI 应用场景下的数据集成

数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,为企业提供全面的数据共享。AI 是典型的数据驱动应用,数据集成在其中起着关键的基础性作用。

以一个大家所熟悉的在线推荐服务为例,通常需要依赖三类数据:用户的属性 (年龄、性别、地域、注册时间等)、商品的属性(分类、价格、描述等)、用户产生的各类行为(登录、点击、搜索、加购物车、购买、评论、点赞、收藏、加好友、发私信等)事件数据。

随着微服务框架的流行,这三类数据通常会存在于不同的微服务中:“用户管理服务”储存着用户的属性、好友关系、登录等数据;“商品管理服务”存储的商品信息;“订单服务”存储着用户的订单数据;“支付服务”存储用户的支付数据;“评论服务”记录着用户的评论和点赞数据。为了实现一个推荐服务,我们首先需要让服务能够访问到这些数据。这种数据访问应该是非侵入式的,也就是说不能对原有系统的性能、稳定性、安全性造成额外的负担。因此,推荐服务不应当直接访问这些分散的数据源,而是应该通过某种方式将这些数据从各个业务子系统中提取出来,汇集到一个逻辑上集中的数据库 / 仓库,然后才能方便地使用机器学习框架(例如 Spark MLlib)来读取数据、训练和更新模型。

1. ETL 和 ELT 的区别与联系

数据集成包含三个基本的环节:Extract(抽取)、Transform(转换)、Load(加载)。

抽取是将数据从已有的数据源中提取出来,例如通过 JDBC/Binlog 方式获取 MySQL 数据库的增量数据;转换是对原始数据进行处理,例如将用户属性中的手机号替换为匿名的唯一 ID、计算每个用户对商品的平均打分、计算每个商品的购买数量、将 B 表的数据填充到 A 表中形成新的宽表等;加载是将数据写入目的地。

根据转换转换发生的顺序和位置,数据集成可以分为 ETL 和 ELT 两种模式。ETL 在数据源抽取后首先进行转换,然后将转换的结果写入目的地。ELT 则是在抽取后将结果先写入目的地,然后由下游应用利用数据库的聚合分析能力或者外部计算框架,例如 Spark 来完成转换的步骤。

2. 为什么 ELT 更适合 AI 应用场景

为什么说 ELT 更适合 AI 的应用场景呢?

首先这是由 AI 应用对数据转换的高度灵活性需求决定的。 绝大多数 AI 应用使用的算法模型都包括一个特征提取和变换的过程。根据算法的不同,这个特征提取可能是特征矩阵的简单的归一化或平滑处理,也可以是用 Aggregation 函数或 One-Hot 编码进行维度特征的扩充,甚至特征提取本身也需要用到其它模型的输出结果。这使得 AI 模型很难直接利用 ETL 工具内建的转换功能,来完成特征提取步骤。此外,企业现在很少会从零构建 AI 应用。当应用包括 Spark/Flink MLlib 在内的机器学习框架时,内建的模型库本身往往包含了特征提取和变换的逻辑,这使得在数据提取阶段就做复杂变换的必要性进一步降低。

其次,企业经常会基于同样的数据构建不同应用。 以我之前所在的一家在线教育公司为例,我们构建了两个 AI 的应用:其中一个是针对各类课程的推荐应用,主要用于增加用户的购买转化率。另外一个是自适应学习系统,用于评估用户的知识掌握程度和题目的难度和区分度,从而为用户动态地规划学习路径。两个应用都需要用户属性、做题记录、点击行为以及学习资料文本,但采用的具体模型的特征提取和处理方式完全不同。如果用 ETL 模式,我们需要从源端抽取两遍数据。而采用 ELT 模式,所有数据存储在 HBase 中,不同的应用根据模型需要过滤提取出所需的数据子集,在 Spark 集群完成相应的特征提取和模型计算,降低了对源端的依赖和访问频次。

最后,主流的机器学习框架,例如 Spark MLlib 和 Flink MLlib,对于分布式、并行化和容错都有良好的支持,并且易于进行节点扩容。 采用 ELT 模式,我们可以避免构建一个专有数据转换集群(可能还伴随着昂贵的 ETL 产品 License 费用),而是用一个通用的、易于创建和维护的分布式计算集群来完成所有的工作,有利于降低总体拥有成本,同时提升系统的可维护性和扩展性。

二、从 ETL 和 ELT 面临的主要问题

采用 ELT 模式,意味着可以较少的关注数据集成过程中的复杂转换,而将重点放在让数据尽快地传输上。然而,一些共性的问题依然需要得到解决:

1. 数据源的异构性: 传统 ETL 方案中,企业要通过 ETL 工具或者编写脚本的方式来完成数据源到目的地同步工作。当数据源异构的时候,需要特别考虑 Schema(可以简单理解为数据字段类型)兼容性带来的影响。无论是 ETL 还是 ELT,都需要解决这一问题。

2. 数据源的动态性: 动态性有两方面含义。一是如何获取数据源的增量;二是如何应对数据源端的 Schema 变化,例如增加列和删除列。

3. 任务的可伸缩性: 当面对少量几个数据源,数据增量不过每日几百 MB 的时候,ELT 平台的可伸缩性不是什么大问题。当 ELT 面对的是成百上千个数据源,或者数据源数据增速很快时,ELT 平台的任务水平切分和多任务并行处理就成为一个必备的要求。平台不仅要支持单节点的多任务并行,还需要支持节点的水平扩展。此外,ELT 的上游通常会遇到一些吞吐能力较差的数据源,需要能够对读取进行限速,避免对现有业务产生影响。

4. 任务的容错性:ELT 平台某些节点出现故障的时候,失败的作业必须能够迁移到健康的节点上继续工作。同时,作业的恢复需要实现断点重传,至少不能出现丢失数据,最好能够做到不产生重复的数据。

三、Kafka Connect 的架构
1. Kafka Connect:基于 Kafka 的 ELT 框架

可用于构建 ELT 的开源数据集成平台方案不止一种,较广泛采用的包括 Kafka Connect、DataX 等,也有公司直接采用 Flink 等流式计算框架。DataPipeline 作为一家提供企业数据集成产品的公司,我们在 Kafka Connect 之上踩了许多坑并且也做了许多优化。

四、踩过的坑与优化的点
1. Kafka Connect 应用于ELT的关键问题1

下面我们聊一聊 Kafka Connect 应用过程中的几个关键问题。

首先是 任务的限速和数据缓存问题。从 Kafka Connect 设计之初,就遵从从源端到目的地解耦性。当 Source 的写入速度长时间大于 Sink 端的消费速度时,就会产生 Kafka 队列中消息的堆积。如果 Kafka 的 Topic Retention 参数设置不当,有可能会造成数据在消费前被回收,造成数据丢失。Kafka Connect 框架本身并没有提供 Connector 级别的限速措施,需要进行二次开发。

2. Kafka Connect 应用于ELT的关键问题2

用户有多个数据源,或者单一数据源中有大量的表需要进行并行同步时,任务的并行化问题 就产生了。Kafka Connect 的 rebalance 是牵一发动全身,一个新任务的开始和停止都会导致所有任务的 reload。当任务数很多的时候,整个 Kafka Connect 集群可能陷入长达数分钟的 rebalance 过程。

解决的方法,一是用 CDC(Change Data Capture)来捕获全局的数据增量;二是 在任务内部引入多线程轮询机制,减少任务数量并提高资源利用率。

3. Kafka Connect 应用于ELT的关键问题3

异构数据源同步会遇到 Schema 不匹配 的问题。在需要精确同步的场景下(例如金融机构的异构数据库同步),通常需要 Case by Case 的去定义映射规则。而在 AI 应用场景下,这个问题并不是很突出,模型训练对于损失一点精度通常是可容忍的,一些数据库独有的类型也不常用。

4. Kafka Connect 应用于ELT的关键问题4

Source 端需要能够检测到 Schema 的变化,从而生成具有正确 Schema 格式的 Source Record。CDC 模式下,通过解析 DDL 语句可以获取到。非 CDC 模式下,需要保存一个快照才能够获取到这种变化。

下面我用一些时间对 DataPipeline 所做的优化和产品特性方面的工作。

DataPipeline 是一个底层使用 Kafka Connect 框架的 ELT 产品。首先,我们在底层上引入了 Manager 来进行全局化的任务管理。Manager 负责管理 Source Connector 和 Sink Connector 的生命周期,与 Kafka Connect 的管理 API 通过 REST 进行交互。

系统的任何运行异常,都会进行统一的处理,并由通知中心发送给任务的负责人和运维工程师。我们还提供了一个 Dashboard,用于图形化方式对任务进行生命周期管理、检索和状态监控。用户可以告别 Kafka Connect 的命令行。

5. DataPipeline的任务并行模型

DataPipeline 在任务并行方面做了一些加强。在 DataPipeline Connector 中,我们在每个 Task 内部定义和维护一个线程池,从而能够用较少的 Task 数量达到比较高的并行度,降低了 rebalance 的开销。 而对于 JDBC 类型的 Connector,我们额外允许配置连接池的大小,减少上游和下游资源的开销。此外,每个 Connector 还可以定义自己限速策略,以适应不同的应用环境需求。

6. DataPipeline 的错误队列机制

通过产品中错误队列预警功能,用户可以指定面对错误数据暂存和处理逻辑,比如错误队列达到某个百分比的时候任务会暂停,这样的设置可以保证任务不会因少量异常数据而中断,被完整记录下来的异常数据可以被管理员非常方便地进行追踪、排查和处理。

相比以前通过日志来筛查异常数据,这种错误队列可视化功能能够大大提升管理员的工作效率。

7. DataPipeline 的数据转换

DataPipeline 实现了自己的 动态加载机制。提供了两种 可视化的转换器:基本转换器和高级转换器。前者提供包括字段过滤、字段替换和字段忽略等功能;后者基于 Java,可以更加灵活地对数据处理,并且校验处理结果的 Schema 一致性。DataPipeline 还提供了数据采样和动态调试能力,方便用户进行表级别的转换规则开发。

值得注意的是,Kafka 不仅仅是一个消息队列系统,本身也提供了持久化能力。一个很自然的问题就是:能否不额外引入 Sink 端的外部存储,直接从 Kafka 中获取训练数据?

如果模型本身要用到某个 Topic 的全量数据或者最近一段时间的数据,那么通过设置合适的 retention 参数,可以直接将 Kafka 作为训练数据的来源。Kafka 的顺序读模式可以提供非常高的读取速度;如果模型要根据消息的内容做数据筛选,那么由于 Kafka 本身并不提供检索能力,需要遍历所有消息,这样就显得比较低效了。

当模型用于线上时,可能还需要引入流式计算来完成实时特征的提取工作。Kafka 本身就提供了这种流式计算能力。

8. 流式计算在 ELT 中的作用 - 数据质量预警

DataPipeline 也将流式计算引入到平台的质量预警功能中。在我们的未来版本中,用户可以定义 Topic 级别的质量预警规则模型,例如“在 5 分钟时间内,数据记录的字段 1 均值超过历史均值记录的比率超过 70%”为异常,采取策略为“告警并暂停同步”。通过这种方式,可以在 ELT 的过程中,尽早发现数据中的异常现象,避免大量异常数据进入数据目的地。

五、总结与展望

最后总结一下。数据集成并不是什么新的概念,在过去二十多年间已经广泛应用于各个行业的信息系统。ELT 和 ETL 相比,最大的区别是“重抽取和加载,轻转换”,从而可以用更简单的技术栈、更轻量的方案搭建起一个满足现代企业应用的数据集成平台。AI 应用内在的特点也使得 ELT 特别适合这个场景。

Kafka Connect 本身是一个业界被广泛采用的 ELT 框架,针对容错、分布式、Schema 一致性等方面都提供了良好的支持,同时有大量的社区和商业资源可供参考和选择。DataPipeline 基于 Kafka Connect 做了大量数据集成场景下的优化,与 Kafka Stream 相结合,能够为包括 AI 在内的各种应用场景构建起一个完整的数据层支撑方案。

有其它关于数据集成的技术问题,也欢迎一起探讨、共同提高

参考资料

· How to Build and Deploy Scalable Machine Learning in Production with Apache Kafka

https://www.confluent.io/blog/  

· Kafka Connect 官方文档

https://docs.confluent.io/current/connect/index.html

· Machine Learning + Kafka Streams Examples

https://github.com/kaiwaehner  

· PredictionIO- 基于 Spark 的机器学习框架

http://predictionio.apache.org

DataPipeline
DataPipeline

国内领先的“iPaaS+AI”一站式大数据融合服务提供商。为企业提供:数据融合、数据任务管理、数据质量管控、可视化运维管理、错误队列管理、用户管理、元数据管理等服务。

https://www.datapipeline.com/
理论Kafka ConnectETLELTDataPipeline机器学习数据挖掘
3
相关数据
机器学习技术

机器学习是人工智能的一个分支,是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、计算复杂性理论等多门学科。机器学习理论主要是设计和分析一些让计算机可以自动“学习”的算法。因为学习算法中涉及了大量的统计学理论,机器学习与推断统计学联系尤为密切,也被称为统计学习理论。算法设计方面,机器学习理论关注可以实现的,行之有效的学习算法。

参数技术

在数学和统计学裡,参数(英语:parameter)是使用通用变量来建立函数和变量之间关系(当这种关系很难用方程来阐述时)的一个数量。

规划技术

人工智能领域的「规划」通常是指智能体执行的任务/动作的自动规划和调度,其目的是进行资源的优化。常见的规划方法包括经典规划(Classical Planning)、分层任务网络(HTN)和 logistics 规划。

自适应学习技术

自适应学习也称为适应性教学(Adaptive Learning),是一种以计算机作为交互式教学手段的教学方法,根据每个学习者的特别需求,以协调人力资源和调解资源的分配。计算机根据学生的学习需求(如根据学生对问题、任务和经验的反馈)调整教育材料的表达方式。自适应学习技术已经涵盖了来自各个研究领域,包括计算机科学,教育,心理学和脑科学等等。

数据挖掘技术

数据挖掘(英语:data mining)是一个跨学科的计算机科学分支 它是用人工智能、机器学习、统计学和数据库的交叉方法在相對較大型的数据集中发现模式的计算过程。 数据挖掘过程的总体目标是从一个数据集中提取信息,并将其转换成可理解的结构,以进一步使用。

映射技术

映射指的是具有某种特殊结构的函数,或泛指类函数思想的范畴论中的态射。 逻辑和图论中也有一些不太常规的用法。其数学定义为:两个非空集合A与B间存在着对应关系f,而且对于A中的每一个元素x,B中总有有唯一的一个元素y与它对应,就这种对应为从A到B的映射,记作f:A→B。其中,y称为元素x在映射f下的象,记作:y=f(x)。x称为y关于映射f的原象*。*集合A中所有元素的象的集合称为映射f的值域,记作f(A)。同样的,在机器学习中,映射就是输入与输出之间的对应关系。

逻辑技术

人工智能领域用逻辑来理解智能推理问题;它可以提供用于分析编程语言的技术,也可用作分析、表征知识或编程的工具。目前人们常用的逻辑分支有命题逻辑(Propositional Logic )以及一阶逻辑(FOL)等谓词逻辑。

大数据技术技术

大数据,又称为巨量资料,指的是传统数据处理应用软件不足以处理它们的大或复杂的数据集的术语。

分布式计算技术技术

在计算机科学中,分布式计算,又译为分散式運算。这个研究领域,主要研究分布式系统如何进行计算。分布式系统是一组电脑,通过网络相互链接传递消息与通信后并协调它们的行为而形成的系统。组件之间彼此进行交互以实现一个共同的目标。

暂无评论
暂无评论~