【遇见Doris 2019.08.11】Spark Doris Sink的设计和实现

今天是 朱良昌 同学代表 百度智能云流式计算团队 带来Spark Streaming对接Doris 设计与实现的分享。

80e417affefbb4d7508f4f8e2b2a1d1 业务场景d3658adcbbd794449806c21ddcbbaa3

Spark Streaming(主要是Structured Streaming) 在百度内部被广泛应用于实时计算,日志分析,ETL等业务场景。其中有很多业务方希望可以使用structured streaming读取上游数据源(例如:kafka、 hdfs、 database等),然后对数据进行处理后实时导入Doris以供查询分析。

为此流式计算团队专门开发了Doris sink的组件来适配Doris。Doris sink支持exactly-once语义,封装并对用户屏蔽了与Doris的交互细节,用户只需要关注用户逻辑和计算本身,经过简单配置即可非常方便的将流式数据导入到Doris中。

80e417affefbb4d7508f4f8e2b2a1d1 Structured Streaming介绍145

Structured Streaming是Spark 2.3版本之后提出的新的流式计算引擎,具有以下特点:

  1. 具备良好的扩展性和高容错性

  2. 基于Spark-SQL引擎,使用DataFrame API,使用简单

  3. 相比于Spark 2.2版本之前使用DStream API的spark Streaming模型,Structured Streaming支持更加丰富的流式语义。例如:基于eventTime的window、聚合计算,watermark,流和static DataFrame的join,流和流的简单join等

  4. 端到端的exactly-once语义。非常适用于要求数据不重不丢的业务

Spark工作模型

Spark作业整体架构如上图所示:

Driver :作为程序的入口,执行用户代码,生成DAG(有限无环图),对整个app的资源进行协调和管理。

Executor : 执行用户逻辑,dataset计算(transformation和action)。一个executor中可以并行执行多个task,task的并发量由启动executor时指定的core数决定,而每个task负责对一个partition进行计算。

Cluster Manager : 百度内部使用的主要是Yarn。

Structured Streaming编程模型

Structured Streaming与传统意义上的流式计算系统不同,它是一个 微批次(micro-batch) 的流式计算系统。其主要原理是将源源不断的数据,切分成一个一个的小数据块,其中每一小数据块称之为一个 batch 。当每次触发计算时,系统会处理一个batch中的数据,而batch和batch之间则是串行执行的。一个batch的计算,可以看做是一个Spark-SQL job的一次执行,故一个Structured Streaming作业可以看做是无穷多个job组成的一个不会停止作业。

WordCount Demo

上图是一个Structured Streaming使用Complete output mode执行Wordcount的示例。

nc是指linux的netcat指令,input数据通过socket传入。在时间点1,系统接收到4条数据作为一个batch进行计算,产生了结果(cat, 1)(dog, 3)。在时间点2的时候又来了两条数据,这两条数据会成为一个新的batch进行计算,新的计算结果会加到最终的结果里(cat, 2)(dog,3)(owl,1)。以此类推,每来一批新数据,将这批数据作为一个batch进行计算,然后对结果进行更新。Structured Streaming就这样源源不断的将输入数据切分为一个一个小的batch,然后执行计算。

Structured Streaming的exactly-once语义要求数据从 读取->计算->写出 的过程,实现 端到端的不重不丢 。因此对各模块有如下要求:

  1. Source可回溯且可回放。简单来说就是可以重复消费,常用Source主要是kafka,bigpipe(百度内部以c++实现的类似kafka的消息队列)

  2. Execution engine记录checkpoint。引擎会在处理每个batch之前,先写WAL来记录当前batch要读哪些数据。如果发生failover,可以利用checkpoint对batch的数据进行重新计算。故Source + Execution engine做到了at-least once语义,即数据的不丢

  3. 支持幂等写的sink,对重复数据去重,从而保证了任何情况failover的exactly-once语义

Checkpoint & WAL:

上图是一个Batch的计算流程, 以此来讲解Execution engine如果做到 数据不丢

  1. batch刚开始时,调用getOffset, 获取该batch要处理的数据范围,即offsetRange

  2. 将offsetRange [startOffset, endOffset)存储在OffsetLog中

  3. 调用getBatch,利用offsetRange构建Dataset, 提交batch

  4. Executor对batch进行计算,并将结果sink到下游系统

  5. batch运行结束,由driver写commit log,标识该batch运行完成

因此一个batch从执行开始到结束会写两个log,一个offsetlog,一个commitlog。通过这个两个log可以保证任何Failover场景下的 数据不丢

Failover case分析:

Case 1中,因为OffsetLog中记录的最新batchId和CommitLog中记录的最新batchId相等,所以Failover后,引擎发现第75个batch已经成功运行结束,且没有batch需要重放,则从第76个batch开始继续执行。

Case 2中,OffsetLog中最新的batchId是85, 而CommitLog中记录的最新的batchId是84,两者不相等,说明作业failover发生在batch 85执行过程中,此时需要重新执行batch 85。

Sink幂等写入:

Source + Execution-engine保证了数据的不丢,如果在此基础上希望实现端到端的exactly once,就需要Sink支持幂等写入以支持数据的去重。

80e417affefbb4d7508f4f8e2b2a1d1 Doris Sink的设计与实现145

由上文介绍可知,要实现端到端的exacly-once语义,需要下游系统支持对数据的去重,所以在设计Doris Sink时,就要考虑Doris对数据的 去重 功能。Doris有一个很明显的特点:它的写入是唯一的,即对同一Database,对同一个label的导入是唯一的,同一个label只能被导入一次,不可以被多次导入,即一个label对应唯一一批数据。 因此我们可以利用该特性来进行Doris sink的去重设计。

Label的生成逻辑

  1. 每个structured streaming作业启动时都必须指定一个checkpointlocation,且每个作业的checkpoint必须是唯一的,不能混用。

  2. batch是顺序执行的,因此每个batch的id是顺序递增且唯一的。

  3. 每个batch实际上是一个普通的spark job,其中的每个数据分片,可以通过paritionId的来标识。

因此,由3元组( checkpointLocation + batchId + paritionId )组成的label可以唯一的确定一个structured streaming作业中的一段数据。那么只要确保在failover前后同一段数据对应的label相同,即可以此来去重以实现exactly-once语义。

 1    val replace = tmp.replaceAll("[-|/|:|.]", "_") +
 2        s"_${batchId}_${TaskContext.getPartitionId}"
 3
 4    // shrink multiple underscores to one
 5    // e.g: label___test => label_test
 6    val builder = new StringBuilder
 7    for (index <- 0 to replace.length - 1) {
 8      if (index == 0) {
 9        builder += replace(0)
10      } else if (replace(index - 1) != '_' || replace(index) != '_') {
11        builder += replace(index)
12      }
13    }
14
15    val resultStr = builder.toString
16    val length = resultStr.length
17    if (length > 128) {
18      logWarning("palo label size larger than 128!, we truncate it!")
19      resultStr.substring(length - 128, length)
20    } else {
21      resultStr
22    }
23

以上是生成label代码的部分实现,我们会对特殊字符进行一些处理,且如果生成的label超过128个字符会截断,因为Doris 的label最多只支持128个字符。

Spark本身提供了Sink接口,我们通过继承Sink接口来实现Doris sink组件。

这里重点关注DorisWriterTask,该task是executor中实际进行计算的task逻辑,它有3种实现,分布对应了Doris的3种Load模式(Bulk load, Broker load, Streaming load)。下面主要介绍Bulk load和Broker load。Streaming load将在近期实现。

  1. 开始执行后,每个task会首先先将数据写入本地磁盘形成文件,文件则以上文提到的label来命名。

  2. Task发送http请求,以bulk load的方式,向Doris发起load请求

  3. Task轮询Doris,查询步骤2中的load是否结束

  4. Finish load之后,删除步骤1中生成的本地文件

注意点:

  1. 容错处理:每个task执行过程中会对部分异常进行处理并重试,重试4次后(可配置),如果仍旧失败,则整个batch重算

  2. 上述过程中步骤3的轮询的意义在于我们需要确保一个batch的数据成功导入后才能开始执行下一个batch,所以我们一定要通过query load info的方法,确保label对应的数据被成功load。

  3. bulk load的模式,适用于单个partition数据不超过1GB的导入。

  1. 开始执行后,每个task会首先先将数据写入hdfs,文件则以上文提到的label来命名。

  2. Task 想Doris发送broker load请求

  3. Doris broker去hdfs load文件

  4. Task轮询Doris,查询步骤2中的load是否结束。

  5. Finish load之后,删除步骤1中生成的hdfs文件

注意点:

  1. 容错处理:每个task执行过程中会对部分异常进行处理并重试,重试4次后(可配置),如果仍旧失败,则整个batch重算

  2. 适用于单个partition数据超过1GB的导入。

80e417affefbb4d7508f4f8e2b2a1d1 Doris 社区 Pull Request145

https://github.com/apache/incubator-doris/pull/1332

原文链接:https://mp.weixin.qq.com/s/uoPLfFBv9Vt2gg9HEriR0Q