【遇见Doris 2019.04.13】Apache Doris在美团点评团队的实践

今天是 康凯森 同学代表 美团点评 团队 带来的关于Doris在美团的应用和分享。

这次美团的同学将主要从技术选型、典型应用、平台化建设和功能改造这4个方面给大家带来Doris的使用分享。

1eae0ba7e1b800d8f6e9b2c309adafa

背景

在2017年年底,美团已经维护了Kylin和Druid两个开源OLAP系统, Kylin主要满足离线固化多维分析的需求,Druid主要满足实时多维分析的需求

所谓的固化多维分析,指需要提前预定义维度和指标,然后查询时需要根据定义好的维度和指标进行查询,这样就无法满足即席的灵活多维分析需求,比如任意字段聚合,任意多表Join。

还有一点就是Kylin和Druid都是基于预计算的系统,没有保留明细数据,无法进行明细查询。(注:Kylin和Druid都可以通过一定的间接方式实现明细查询,但限制较多)

用户需求

随着用户对上面3 类的需求越来越多,美团决定在2018年初的时候调研当时开源的ROLAP 系统,并进行落地。

Why Not Other ROLAP

美团当时主要调研了SQL on Hadoop,ClickHouse,SnappyData,TiDB,Doris等系统,这些系统都是优秀的开源系统,并且都有其适用场景。在选型时主要从功能,架构,性能,易用性,运维成本等几个维度去考虑。

下面先介绍下为什么没有选择这些系统。

SQL on Hadoop系统 :无法支持更新,性能也较差。

◦TiDB :TiDB虽然当初号称可以支撑100%的TP和80%的AP,但是架构设计主要是面向TP场景,缺少针对AP场景专门的优化,所以OLAP查询性能较差,TiDB团队目前正在研发专门的OLAP产品:TiFlash,TiFlash 具有以下特点:列存,向量化执行,MPP,而这些特点Doris也都有。

◦SnappyData :SnappyData是基于Spark + GemFire实现的内存数据库,机器成本较高,而机器资源很有限,此外SnappyData的计算是基于JVM的,会有GC问题,影响查询稳定性。

◦ClickHouse :Clickhouse是一款单机性能十分彪悍的OLAP系统,但是当集群加减节点后,系统不能自动感知集群拓扑变化,也不能自动balance数据,导致运维成本很高,此外Clickhouse也不支持标准SQL,用户接入的成本也很高。

Why Doris

对用户来说, Doris的优点是功能强大,易用性好 。功能强大指可以满足用户的需求,易用性好主要指 兼容Mysql协议和语法 ,以及Online Schema Change 。兼容Mysql协议和语法让用户的学习成本和开发成本很低,Online Schema Change也是一个很吸引人的feature,因为在业务快速发展和频繁迭代的情况下,Schema变更会是一个高频的操作。

对于平台侧来说, Doris的优点是易运维,易扩展和高可用

◦易运维指Doris无外部系统依赖,部署和配置都很简单。

◦易扩展指Doris可以一键加减节点,并自动均衡数据。

◦高可用指Doris的FE 和BE 都可以容忍少数节点挂掉。

2f3504ab896bb3276f50ee5e20cc40b

1-变化维表Join

变化维表的Join是外卖业务很典型的一个应用,外卖中的蜂窝,商家类型等维表属性会经常更新,并且需要用最新的维表属性去关联商家事实表的历史数据。还有一点是外卖的商家属性是按照日,周,月,7,15,30 这6 个时间口径统计的,而且这里面的周,月,7,15,30不能按天直接累加,所以之前在MOLAP系统中,用户需要先建6张宽表,再基于6张宽表构建6个Cube。为了满足用最新的维表数据去关联事实表的历史数据的需求,就需要每天重刷几百天的历史数据。其实用户的高频查询都是近1个月的数据,但是为了满足极个别查很久历史数据的需求,就必须得重刷几百天的历史。这样就会浪费大量的计算资源,而且数据冗余比较严重,开发效率低下。

有了Doris之后,就只需要按天同步事实表和维表,然后查询时现场Join就可以。不需要每天重刷历史数据,开发效率也会提升很多。

上图中展示的是Doris和MOLAP系统对同一个商家分析应用在数据导入和数据存储方面的对比,可以看到,在保持查询性能不变的前提下,Doris在导入速度和存储效率上都有了很大的提升。

2-明细加聚合

同时查询明细和聚合是用户比较常见的需求,但是由于Kylin和Druid不能支持明细查询,所以用户就需要用Mysql或者ES来满足明细需求,再用Kylin和Druid来满足聚合需求,这样就有两条开发链路,数据也有冗余,并且可能还会有数据一致性的问题。

有了Doris之后,只需要Doris一个引擎就可以同时Cover明细+ 聚合的需求,用户的开发流程就会简化许多。

3-外卖准实时数仓

上图中是美团外卖准实时数仓的一个示意图,数据会从Kafka和Hive进入Doris中,然后每15 分钟会通过Doris to Doris ETL计算实时和离线的同环比(外卖的特殊业务需求)。

这个应用中主要依赖了Doris以下特性:

◦同时支持实时和离线数据导入。

◦Doris To Doris ETL,这个指的是Doris insert into select的功能。

◦还有一个是主键去重,建模时用的是Doris的UNIQUE KEY模型,Doris的主键去重和主键更新也是用户广泛使用的功能。

上图中展示的是美团外卖基于Doris构建准实时数仓,和基于Storm构建的实时应用的开发效率对比,用Storm开发需要20人日,用Doris开发需要10人日,这个效率的差别应该主要来自写SQL和写代码的效率差别。

Doris服务现状

上图中展示的是Doris服务的现状,规模不大,目前还在快速增长中。

其实脱离业务场景谈这些数字的意义都不是很大,目前的规模虽然不大,但是很多业务场景还是蛮有挑战的。

8b84ca233c0490272a8cbb45ef77204

接下来是美团同学在数据导入方面的平台化建设,包括离线的Hive to Doris和实时的Kafka to Doris。

Hive To Doris

Hive To Doris是基于Doris 的Http mini load实现的,上图是整个Hive To Doris的流程示意,首先会用Hive客户端将Hive表数据经过过滤,Null值处理,格式转换,Split后存储到HDFS上,然后多线程从HDFS将数据拉取到本地,紧接着将数据通过Http方式导入到Doris中。

我们知道,Doris HTTP mini Load对单次导入文件的大小是有限制的。所以这里讲Hive to Doris,主要是想分享这个Hive小文件合并过程,是用Hive小文件合并解决了大文件Split的问题,可以利用MR来分布式Split,让Split过程十分高效。

这里需要注意的一点是,在极端情况下,有些ORC格式存储的Hive表压缩比很高,导致第一步MR的单个Mapper输出文件大小达到好几G,进而无法触发Hive小文件合并过程。解决方法可以调大Doris BE的mini_load_max_mb参数,或者让用户修改Hive表存储格式。

Kafka To Doris

图中是Doris的Stream Load示意图,FE负责事务管理和导入的Plan生成,执行Plan的BE会将实时数据传输到每行数据对应的Table所在的BE上,数据首先会以Skiplist的数据结构保存在内存中,等超过一定大小后,会flush成列存。

这里需要注意两点:

◦Doris的Stream Load是基于HTTP 的。

◦Doris的Stream Load有Label 机制,也就是一次导入可以指定一个Label,Doris内部一个Label会对应一次事务,所以可以保证同一个Label的导入只会成功提交一次。

由于Doris的Stream Load当时不支持从Kafka直接消费数据,所以就在Doris的外围实现了Kafka to Doris,和Druid的tranquility比较类似。但和Druid的tranquility不同的是,这里实现的Kafka to Doris可以保证Exactly Once,Exactly Once即数据不丢也不重:

◦不丢是通过Mysql记录Kafka Offsets来保证的,只有确认Kafka的一批数据已经被Doris成功消费后,才会更新Mysql中Kafka 的offsets。

◦不重是通过Doris的Label机制保证的,前面提到Doris内部的事务机制可以保证同一个Label的导入只会成功提交一次。

关于Kafka To Doris的更多原理可以参考:

https://blog.bcmeng.com/post/kafka-to-doris.html

619c7d95ea7d95a47c7776d7429e80a

最后介绍所做的一部分功能改造:包括谓词下推的传递性优化,查询执行多实例并发和Colocate Join。

1-谓词下推的传递性优化

对于下面的SQL

Doris默认会对t2表进行全表Scan,这样当t2表数据量很大时,就会导致上面的查询超时,进而导致外卖业务在Doris上的第一批应用无法上线。其实我们知道,t2表是没有必要访问所有分区数据的。

实际上基于谓词t1.id = t2.id和t1.id = 1, 我们可以推断出新的谓词t2.id = 1,并将谓词t2.id = 1下推到t2 的Scan节点。这样假如t2表有数百个分区的话,查询性能就会有数十倍甚至上百倍的提升,因为t2表参与Scan和Join的数据量会显著减少。

当然,不是所有的谓词都可以下推的,我们需要区分Where中的过滤条件和On中的连接条件,比如在Left Join中,如果连接条件引用了外表,则谓词不能下推。

当时除了这个优化,谓词下推相关的还做了Having中非聚合函数的谓词下推,窗口函数中分区Key的谓词下推。

2-查询执行多实例并发优化

第二个是近期刚做的查询执行多实例并发优化,问题的背景是在做查询优化时发现,Doris默认在每个BE节点上为每个算子只会生成1个执行实例,以简单的count *查询为例,每个BE 节点只会有1个scan算子和1个聚合算子,这样每个BE节点的Scan算子和聚合算子都需要处理大量数据,导致查询性能比较差,而且发现当时这个集群的资源是比较空闲,并没有充分利用。

如上图所示,不难想到的一个优化手段是,我们可以在每个BE节点上为每个算子生成多个执行实例,这样每个算子只需要处理少量数据,而且多个执行实例可以并行执行,充分利用集群资源,提高单个复杂查询的性能。

上图中是并发度设置为5的优化效果,可以看到对于多种类型的查询,会有3到5倍的查询性能提升。

该优化比较适合集群资源充足,但是单机资源没有被充分利用的场景,该优化可以通过充分利用集群资源来提高单次复杂查询的性能。这个优化已经贡献到社区,大家可以试用下,美团目前在生产环境配置的并发度是3。

3-Colocate Join

分布式Join 的执行有4 种方式:

第一种是Replicated join,或叫Local Join。即将小表的数据提前复制到集群所有节点,查询时进行本地join, 没有网络传输的开销。Replicated join 比较适合星型模型的Join,可以提前将所有小维表复制到集群所有节点。

第二种就是今天要分享的Colocate Join,即将两表或多表的数据提前按照要高频查询的Join Key Shard,查询时进行本地的join,也没有网络传输的开销。Colocate Join比较适合Join Key较固定的应用场景,比如我们外卖中的商家分析,都是按照商家ID join。

第三种和第四种就是大家都熟知的Broadcast Join和Shuffle Join,它们的实现原理和适用场景我就不赘述了。其中shuffle join是最通用的实现方式,Colocate join的优势主要是没有网络传输的优化,但是不通用。

整个Colocate Join在Doris中实现的关键点如下:

◦数据导入时保证数据本地性

◦查询调度时保证数据本地性

◦数据Balance后保证数据本地性

◦查询Plan的修改Colocate Table元数据的持久化和一致性

◦Hash Join的粒度从Server粒度变为Bucket粒度

◦Colocate Join的条件判定

关于Colocate Join的更多细节可以参考:

https://blog.bcmeng.com/post/doris-colocate-join.html

对于下面的SQL, Doris Colocate Join和Shuffle Join在不同数据量下的性能对比如下:

Colocate Join功能已经贡献到Doris社区,大家在最新版本中就可以使用。假如你需要将t1表和t2表设置为Colocate table,只需要在建表时指定一个colocate_with属性就可以,使用起来十分简单。

75efb8030308340a3949bf118ac038a

最后简单介绍下美团的未来规划:

◦第一个是 Duplicate Key模型支持聚合类型的RollUp 。目前Doris明细表的RollUp 表也必须是明细表,不能进行聚合,这样对上百亿数据现场进行聚合的成本还是蛮高的,所以计划让Doris的明细表可以支持聚合模型的RollUp,提前预计算,减少现场计算时的成本。

◦第二个是 支持倒排索引 。美团业务多维过滤的需求很多,而仅靠目前的前缀索引很难满足这类需求,所以需要倒排索引来提升多维过滤查询的性能。

◦第三个是 面向内存的存储和查询引擎 。这个主要有4点原因:

  1. 目前较多业务线的数据并没有很大,内存的容量完全可以hold住。

  2. 内存容量越来越大,价格越来越低。

  3. 之前做查询执行多实例并发优化的时候发现,当并发度设置的更大时,Doris的查询瓶颈首先出现在IO,而不是CPU。

  4. 业内已经有了成功的案例,比如Snappydata, Memsql等。

原文链接:https://mp.weixin.qq.com/s/z6AFnPwBDjrQDhCp7iiAvg