【遇见Doris 2019.12.22】Spark Doris Connector的最佳实践

来自百度大数据部的张文歆为大家带来了 通过Spark(百度数据工厂Pingo)查询Doris的最佳实践

文歆,2015年进入百度,现任百度大数据部资深研发工程师。百度数据计算引擎QE和百度数据工厂Pingo的主力研发人员。百度数据工厂结构化元数据服务负责人。

Spark Doris Connector( Doris-Spark

Spark Doris Connector 是Doris在0.12版本中推出的新功能。用户可以使用该功能,直接通过Spark对Doris中存储的数据进行查询。

从Doris角度看,将其数据引入Spark,可以使用Spark一系列丰富的生态产品,拓宽了产品的想象力,也使得Doris和其他数据源的联合查询成为可能。

技术选型

在早期的方案中,我们直接将Doris的JDBC接口提供给Spark。对于JDBC这个Datasource,Spark侧的工作原理为,Spark的Driver通过JDBC协议,访问Doris的FE,以获取对应Doris表的Schema。然后,按照某一字段,将查询分位多个Partition子查询任务,下发给多个Spark的Executors。Executors将所负责的Partition转换成对应的JDBC查询,直接访问Doris的FE接口,获取对应数据。这种方案几乎无需改动代码,但是因为Spark无法感知Doris的数据分布,会导致打到Doris的查询压力非常大。

于是我们开发了针对Doris的新的Datasource,Spark-Doris-Connector。这种方案下,Doris可以暴露Doris数据分布给Spark。Spark的Driver访问Doris的FE获取Doris表的Schema和底层数据分布。之后,依据此数据分布,合理分配数据查询任务给Executors。最后,Spark的Executors分别访问不同的BE进行查询。大大提升了查询的效率。

使用方法

在Doris的代码库的 extension/spark-doris-connector/ 目录下编译生成doris-spark-1.0.0-SNAPSHOT.jar,将这个jar包加入Spark的ClassPath中,即可使用Spark-on-Doris功能了

SQL方式:

CREATE TEMPORARY VIEW spark_doris

USING doris

OPTIONS(

  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",

  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",

  "user"="$YOUR_DORIS_USERNAME",

  "password"="$YOUR_DORIS_PASSWORD"

);

SELECT * FROM spark_doris

RDD方式:

import org.apache.doris.spark._

val dorisSparkRDD = sc.dorisRDD(

  tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),

  cfg = Some(Map(

"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",

    "doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",

    "doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"

  ))

)

dorisSparkRDD.collect()

DataFrame方式:

val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()

dorisSparkDF.show(5)

更多的配置参数及使用方式,可以参阅使用文档:

适用场景

处理历史数据变更

在没有Spark Doris Connector前,Doris修改数据的成本很高,但数据的修改和删除需求在真实业务中时常出现。

Spark Doris Connector之前:

方案一:之前导入的错误数据不要删除,采用replace的方式,将错误的数据全部倒入一份负值的,从而将value刷成0,再将正确的数据导入进去。

方案二:把错误数据删除,然后再将正确数据insert进来。

上述方案都存在一个问题,即总有一段时间窗口内数据value为0。这对于外部系统来说是不能容忍的。例如广告主需要查看自己的账户信息,如果因数据变更问题而导致账户显示为0,将是难以接受的,很不友好。

Spark Doris Connector方案:

有了Spark Doris Connector,处理历史数据变更将会更加便捷。

如上图所示,第一行是错误数据,第二行是正确数据。Spark可以链接两条流,一条流使用Spark Doris Connector连接Doris,一条流连接外部的正确数据(例如业务部门生成的Parquet文件)。在Spark中做diff操作,将所有value算出diff值,即图中最后一行的结果。将其导入进Doris即可。这样的好处是可以消除中间的时间窗口,同时也便于平时经常使用Spark的业务方来进行操作,非常友好。

使用Spark对Doris中的数据和其他数据源进行联合分析

很多业务部门会将自己的数据放在不同的存储系统上,比如一些在线分析、报表的数据放在Doris中,一些结构化检索数据放在Elasticsearch中、一些需要事物的数据放在MySQL中,等等。业务往往需要跨多个存储源进行分析,通过Spark Doris Connector打通Spark和Doris后,业务可以直接使用Spark,将Doris中的数据与多个外部数据源做联合查询计算。

技术实现

架构一览

Doris对外提供更多能力

Doris FE

对外开放了获取内部表的元数据信息、单表查询规划和部分统计信息的接口。

所有的Rest API接口都需要进行HttpBasic认证,用户名和密码是登录数据库的用户名和密码,需要注意权限的正确分配。

// 获取表schema元信息
GET api/{database}/{table}/_schema

// 获取对单表的查询规划模版

POST api/{database}/{table}/_query_plan

{

"sql": "select k1, k2 from {database}.{table}"

}

// 获取表大小

GET api/{database}/{table}/_count

Doris BE

通过Thrift协议,直接对外提供数据的过滤、扫描和裁剪能力。

service TDorisExternalService {

    // 初始化查询执行器

    TScanOpenResult open_scanner(1: TScanOpenParams params);

    // 流式batch获取数据,Apache Arrow数据格式

    TScanBatchResult get_next(1: TScanNextBatchParams params);

    // 结束扫描

    TScanCloseResult close_scanner(1: TScanCloseParams params);

}

Thrift相关结构体定义可参考:

Doris这些对外提供的接口,同样可以用于Flink On Doris等。

实现Spark Doris Datasource

Datasource V1 API

V1API 由一系列的抽象类和接口组成,它们位于 spark/sql/sources/interfaces.scala 文件中。主要的内容有:

trait RelationProvider {

    def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

abstract class BaseRelation {

    def sqlContext: SQLContext
    def schema: StructType

}

trait TableScan {
  def buildScan(): RDD[Row]
}

通过实现 RelationProvider 接口,表明该类定义了一种新的数据源,可以供 Spark 读取数据所用。createRelation 方法的参数可以用来做初始化,如DorisFE的地址,用户等信息。BaseRelation 抽象类则用来定义数据源的表结构,在Doris Datasource中,表结构来源于Doris FE接口。该类还必须实现某个 Scan 接口,Spark 会调用 buildScan 方法来获取数据源的 RDD。

实现RDD

DorisDatasource定义好后,我们需要实现DorisRDD用于真正的读取Doris的数据到Spark中。

在Spark RDD中,最主要的接口为getPartitons()和compute(),getPartitions负责将数据切分成多个子集,用于并行计算。而compute则返回一个迭代器,迭代器会依次获取在子集上的结果序列。

override def getPartitions: Array[Partition] = {

// 1. 调用FE 获取对单表的查询规划模版接口
// 2. 为每个Tablet选择BE,并形成Seq<Pair<BE, List<Tablet>>>。每一组Pair作为一个Partition

}

override def compute(split: Partition, context: TaskContext):

ScalaDorisRDDIterator[T] = {

// 返回一个获取结果数据的迭代器

    new ScalaDorisRDDIterator(context, split.asInstanceOf[DorisPartition].dorisPartition)

}

// 迭代器内部实现

class ScalaDorisRDDIterator[T] {

  def init: Unit = {

    // 调用BE 初始化查询执行器,获取Context id;

  }

  override def hasNext: Boolean = {

        if (没有缓存的数据) {

       // 调用BE 流式batch获取数据API,将数据放入缓存;

       // 返回API内的eos;

       }

      else {

          // 返回true;

      }

  }

  override def next(): T = {

    // 读取缓存中的一行数据;

  }

}

Doris Spark后续规划

  1. 扩展SparkSessionExtensions, 支持聚合如sum、count等聚合算子下推,充分利用Doris的物化视图模型。

  2. 持续优化数据传输协议,提高扫描速度

百度数据工厂Pingo介绍

百度数据工厂Pingo是基于Spark的数据工厂产品,是增强版的数据湖。Pingo拥有以下主要功能:

  1. 能够对异地、异构的非结构化数据、结构化数据、计算资源进行统一接入、访问和权限管理
  2. 由于核心引擎是Spark,因此查询语言支持SparkSQL,Dataframe。
  3. 提供WebUI、客户端、Restful接口三种交互方式

Pingo在百度内部提供离线计算服务。通过百度公有云、私有云的方式对外提供商业版本,并已经积累了很多行业案例。
架构一览

下面是Pingo的架构图,灰色部分是Pingo提供的功能组件,绿色部分为Pingo可挂载的外部组件。

主要特点

Pingo提供了比原生Spark更优异的性能和更丰富的数据处理能力,主要特点如下:

  1. 批流一体的计算引擎:Pingo提供Spark Dataframe,Spark Structed Streaming等业界流行的离线、流式计算框架,支持标准SQL访问。并自研了Spark Streaming SQL,统一批、流查询语言。
  2. 支持多种形式数据采集:Pingo提供数据传输服务,可以完成结构化数据库、非结构化数据的批量与增量采集。
  3. 多存储后端数据联合查询:Pingo不仅自身能够提供文件的存储能力,而且能够无缝的接入多种外部的存储系统。无需停机维护和数据迁移,用户可以直接将存储在BOS,S3,私有HDFS等存储资源上的数据接入到Pingo中。Pingo可实现多存储混合查询,与此同时,Pingo还能够缓存常用的数据,加速计算过程。
  4. 多结构化元信息联合查询:Pingo的元数据存储及管理模块可无缝对接用户已有的元数据存储系统(包括但不限于Hive Metastore, MySQL, Doris等)。可直接通过Pingo执行联合查询等离线计算任务。
  5. 多计算集群同时调度:Pingo自带资源调度模块。在提供默认计算资源的同时,用户也可通过简单操作挂载已有计算资源,从而无需在不同的计算集群中重新分配计算资源。

欢迎试用

目前Pingo已在百度云上线,欢迎使用:

https://cloud.baidu.com/product/pingo.html

原文链接:https://mp.weixin.qq.com/s/c8zE7ymv6jC1WTlV44dldQ