搜索
您的当前位置:首页正文

Delta Lake 数据湖原理和实战

来源:尚车旅游网
数据湖这么火,到底是如何实现的,它是大数据架构的银弹吗?围绕这个问题为大家介绍数据湖的核心实现原理和实战。

图1

01我们得先知道数仓是个啥

数据湖是从数据仓库的概念上发展而来的。因此在开启数据湖原理和实战之旅前,我们首先得再往前追溯一些很直接的、有强相关性的内容点,说到这就有必要回顾下数仓的基础知识了。

咱就从数据仓库概念说起吧,它简称为数仓,英文名Data Warehouse,可简写为DW或DWH。数据仓库是为企业决策提供所有类型数据支持的集合。企业建立数仓的主要目的是为分析性报告和决策支持目的而创建。

简单介绍了数据仓库的概念,咱们看下数据有哪些特点。这将帮助我们进一步理解数仓。

1.数据是面向主题的:是在较高层次上对分析对象的数据的一个完整、一致的描述,能完整、统一地刻划各个分析对象所涉及的企业的各项数据,以及数据之间的联系。

2.数据是集成的:数据进入数据仓库之前,统一源数据中所有矛盾之处,如字段的同名异义、异名同义、单位不统一、字长不一致。

3.时间段内不可更新:无更新操作,不用考虑数据整性保护、并发控制;重点关注高性能查询;BI要求高。

图2

4.数据随时间不断变化:数据仓库随时间变化不断增加新的数据内容;数据仓库随时间变化不断删去旧的数据内容;数仓综合数据中很多跟时间有关。

02何为数据湖及其核心要点

首先我们看下数据湖的概念,数据湖指的是可以存储任意格式数据(结构化和非结构化)的存储中心。

为了便于大家更好地理解数据湖,咱们将通过数据湖与数仓的对比分析展开讲。这部分也是理解数据湖的核心,希望大家能够通过深入理解数据湖和数仓的区别,以辩证的思维方法对数据湖有个比较深入的了解,进而帮助大家实践过程中能够有更多看待问题的视角,尤其是方案落地上的思考。

(1)数据层面:数仓的数据大部分来自事务系统、运营数据库和业务线应用程序,有相对明确的数据结构;而数据湖的数据来自IoT设备、网站、移动应用程序、社交媒体和企业应用程序等,既包含结构化数据,也包含非结构化数据。

(2)Schema层面:数仓的Schema在数据入库之前的设计阶段就产生了,而数据湖的Schema是在写入数据时实时分析数据结构而产生的。数据仓库模式是schema on write,数据湖模式是schema on read。

图3

(3)数据质量:因为数仓的数据数据具备完整的数据结构并且支持事务等操作,因此数据质量方面比数仓较好。

(4)应用层面:数仓主要用于批处理报告、BI 和可视化等;数据湖主要用于机器学习、预测分析、数据发现和分析。

03数据湖面临的挑战

前面通过数据湖和数仓的对比分析,大家可能会感觉数据湖真是个“好东西”,让我赶快实战起来吧。莫急,其实数据湖的概念,是随着业务对需求的变化在数仓不能满足要求的情况下提出来的。不过要实现数据湖提出的愿景,其实还是面临很多具体的技术挑战。

接下来咱们介绍下数据湖实现上的技术挑战。只有搞清楚了这些,我们才能对如何构建数据湖有个整体的思路,这些挑战也是数据湖要解决的核心问题。

(1)对数据湖进行的读写操作不可靠。

由于数据湖中数据动辄上TB,因此其读写相对耗时,不可能像关系型数据库那样以锁表加事务的方式保障数据的一致性。这样的话会引起数据写入的过程中有人读取数据,从而看到中间状态数据的情况,类似数据库中的幻读。在实际数据湖的设计中,可以通过添加版本号等方式解决。

(2)数据湖的数据质量较差。

由于数据湖对数据的结构没有要求,因此大量的各种非结构化的数据会存入数据湖,这样会对后期数据湖中的数据治理,带来很大的挑战。

(3)随着数据量的增加,性能变差。

随着对数据湖中数据操作的增加,元数据也会不断增加,并且数据湖架构一般不会删除元数据信息,因此元数据湖不断膨胀,处理数据的作业在元数据的查询上会消耗大量的时间。

图4

(4)更新数据湖中的记录,非常困难。数据湖中的数据更新需要工程师通过复杂逻辑实现,维护困难。

(5)数据如何回滚方面:数据处理过程中错误是不可避免的,因此数据湖要有良好的回滚方案保障数据的完整性。

结合前面的分享,不难发现,要实现数据湖提出的愿景,还是有很多具体的技术难题需要解决。

04Copy-On-Write VS Merge-On-Read模式

基于数据湖的理念,有多种数据湖的实现方案。例如KUDU、Hudi和Delta Lake。这些数据湖方案中很重要一个技术选型,就是更新数据是采用Copy-On-Write 模式,还是采用 Merge-On-Read模式。

要想理解各个数据湖实现的特点,还是得清楚两者的区别到底在哪儿,如此一来,才能够帮助我们更好地做分析判断。

那么这两种模式有何区别呢? 01

Copy-On-Write 模式,指的是当执行数据更新时,从原来的数据复制出来一个副本,在副本上进行数据更新,当副本更新完成后把原始数据替换为当前副本。它属于写放大的操作,适合于数据读取请求多,数据更新请求少的应用场景。 02

Merge-On-Read 模式,指的是用户在写入和更新数据的时候不处理数据的准确性、Schema的兼容性和一直性等问题,当用户查询该部分数据时再对数据进行合并,分析。处理数据的准确性、Schema的兼容性和一直性。然后将处理后的结果返回给用户。也就是说Merge-On-Read 是在读数据的时候对数据进行修复的。

做个小结:

图5

通过将数据湖和数仓进行对比分析,我们介绍完了数据湖的核心概念,下面让我们进入这次分享的重头戏,Delta Lake数据湖原理和实战。

05Delta Lake数据湖原理

在介绍Delta Lake数据湖原理前,让我们先快速看下Delta Lake的概念:Delta Lake是Databricks公司开发的数据湖解决方案,它提供了基于大数据的ACID、版本控制、数据回滚等功能,使得用户基于Delta Lake和云存储能快速构建起数据湖应用。它采用Copy-On-Write模式。

1、Delta Lake 的主要特点

Delta Lake 解决了数据湖面对的问题,简化了数据湖构建。下面咱们看下 Delta Lake 的主要特点,具体包括ACID 事务、模式(Schema)管理、数据版本控制和时间旅行、统一的批处理和流接收、记录更新和删除。其中ACID 事务、模式(Schema)管理和统一的批处理和流接收是Delta Lake 设计的核心要点,需要同学们重点关注。

(1)ACID 事务:

Delta Lake支持事务操作,在Delta Lake中每个写操作都是一个事务,事务的状态通过事务日志来记录,事务的日志会跟踪每个文件的写操作状态。Delta Lake的并发写操作使用乐观锁的方式控制,当有多个操作同时对一份数据进行写操作时只会有一个写操作成功,其他写操作会抛出异常,以便客户端根据异常情况选择重试还是放弃修改操作。

(2)模式(Schema)管理:

数据在写入过程中,Delta Lake会检查DataFrame中数据的Schema信息,如果发现新的列数据在表中存在但是在DataFrame中不存在,就会将该列数据存储为null;如果发现DataFrame新的列在表中不存在,则会抛出异常。同时Delta Lake还可以显式地添加新列的DDL和自动更新Schema。

图6

(3)数据版本控制和时间旅行:

Delta Lake将用户的写操作以版本的形成存储,也就是说每次对数据执行一次更新操作Delta Lake都会生成一个新的版本,用户在读取数据时,可以在API中传入版本号读取任何历史版本的数据。

同时,还可以将表中数据的状态还原到历史的某个版本。

(4)统一的批处理和流接收(Streaming Sink):

Delta Lake可以从Spark的结构化流获取数据并结合自身的ACID、事务、可伸缩的元数据处理的能力,来实现多种近实时的数据分析。

(5)记录更新和删除:

在未来的版本中,Delta Lake计划支持DML的合并、更新和删除功能。

2、Delta Lake数据存储

数据的存储是数据湖要实现的核心技术点,你需要重点关注。接下来咱们介绍下Delta Lake的数据存储原理。

Delta Lake 的数据存储原理其实很简单。它通过 Partition Directories 存储数据,数据格式推荐为 Parquet,然后通过 Transaction Log (事务日志)记录的表版本(Table Version) 和变更历史,以维护历史版本数据。

Delta Lake中的表其实是一些列操作的结果,例如:更新元数据、更新表名、变更 Schema、增加或删除Partition、添加或者移除文件。

图7

Delta Lake会以日志的形式将所有的操作存储在表中。也就是说当前表中的数据是一些列历史操作的结果,为Delta Lake表的结构信息。其中包含了表名称、事务日志(每个事务日志文件代表了一个数据版本)、分区目录和数据文件。

3、Delta Lake原子性保障

如何保障在多用户并发读写下的情况下数据的原子性,是一个数据湖设计必须要考虑的问题,在该问题的处理上,Delta Lake解决问题的方式很巧妙,它只要保障 Commit File 的顺序和原子性就可以了。

这里举个例子,我们先在表中添加一个文件001.snappy.parquet,形成一个版本,以00.json的日志文件形式存储。接着删除刚才新增的01.snappy.parquet文件,重新加入一个新的02.snappy.parquet文件形成另外一个版本01.json。

图8

这样在数据的写入过程中,如果有人读取数据,则每次只能读取到已经 Commit 的结果数据。

4、Delta Lake并发写

Delta Lake使用Copy-On-Write实现写操作。由于Spark应用属于高并发读,低并发写的应用,因此它比较适合使用乐观锁来控制并发写。接下来分别从数据读取和写入两个方面,看下Delta Lake的数据一致性问题。

数据读取的是表的某个版本的快照(Snapshot),因此即使在读取过程中数据有更新,读操作看到的也是之前版本的数据, 也就是说在数据更新过程中读操作看到的数据不会有变动。

在写数据的情况下,Delta 使用乐观锁来保证事务,写操作分为 3 个阶段。

(1)读取:读取最新版本的数据,作为一个数据集的一个Snapshot, 然后定位需要改变的文件,后续写操作在Snapshot的版本上写入新数据。

(2)写入:执行写操作,并将数据版本加1,准备提交写操作结果。

图9

(3)验证和提交:在提交写操作结果之前,检测是否有其他已经提交的操作更新了文件,并检查和本事务需要更新的文件是否有冲突。

没冲突,就提交本次写操作结果,产生一个新的版本数据。如果有冲突,就会抛出一个并发修改异常,放弃修改。这样保障了如果有多个并发写,即使来自不同的集群,也会保证一致性。

5.Delta Lake大规模元数据处理

当不断地对 Delta Lake 表进行操作时,会不断地产生提交日志文件(Commit Log File)、小文件,并且随着时间的推移日志文件会不断的增加,最终会形式很多的小文件。如果将元数据像Hive那样存储在Hive Metastore 上,则每次读取数据都需要一行行读取Partition信息,并找出Partition下所有的问题信息,效率低下。

而Delta Lake将元数据存储在事务日志中,基于Spark对文件快速分析的能力,使得Delta Lake 能够在固定的时间内列出大型目录中的文件,从而提高了数据读取的效率。

06Delta Lake数据湖实战

前面介绍完了Delta Lake数据湖的概念和原理,接下来让我们从实战的角度看下Delta Lake数据湖的使用。

(1)开启Delta支持:首先需要定义一个SparkSession实例对象Spark,并在SparkSession定义时添加Delta的支持。

val spark = SparkSession.builder().master(\"local\").appName(\"DeltaDemo\") .config(\"spark.sql.extensions\.config(\"spark.sql.catalog.spark_catalog\

\"org.apache.spark.sql.delta.catalog.DeltaCatalog\")

.config(\"spark.databricks.deltaschema.autoMerge.enabled\.getOrCreate() code1

(2)数据插入:当定义好spark后,便可以将spark中的数据以delta的格式写入到spark中。下面代码中的format(\"delta\")表示写入的数据格式为delta,mode(\"overwrite\")表示写入模式为覆写。

val data_update = spark.range(5, 10)

data_update.write.format(\"delta\").mode(\"overwrite\").save(basePath+\"delta/delta-table\") data_update.show() code2

(3)数据更新:当需要对数据进行更新操作时,可以调用update方法对符合条件的数据进行更新,比如要将id为偶数的数据id值加100,具体实现为:

deltaTable.update(condition = expr(\"id % 2 == 0\"), set = Map(\"id\" -> expr(\"id + 100\"))) deltaTable.toDF.show() code3

(4)数据删除:当一些数据不再需要时,可以调用delete方法对数据进行删除。例如删除id为偶数的数据代码是这样的:

deltaTable.delete(condition = expr(\"id % 2 == 0\")) deltaTable.toDF.show() code4

(5)merge操作:delta支持数据的merge操作,具体做法是将原始数据表命名为oldData,并和newData进行merge,来看下具体实现。

deltaTable.as(\"oldData\")

.merge(newData.as(\"newData\"), \"oldData.id = newData.id\") //当数据存在是更新

.whenMatched.update(Map(\"id\" -> col(\"newData.id\"))) //当数据不存在是插入

.whenNotMatched.insert(Map(\"id\" -> col(\"newData.id\"))).execute() deltaTable.toDF.show() code5

(6)在Delta中,可以定义在Schema发生变化时,是采用overwrite(覆写)模式还是mergeSchema(合并Schema)模式更新Schema。

val df = spark.read.format(\"delta\").option(\"versionAsOfable\")

//overwriteSchema模式

df.write.format(\"delta\").option(\"overwriteSchema\.mode(\"overwrite\").save(basePath+\"delta/delta-table\") //mergeSchema模式

df.write.format(\"delta\").option(\"mergeSchema\.mode(\"overwrite\").save(basePath+\"delta/delta-table\") code6

因篇幅问题不能全部显示,请点此查看更多更全内容

Top