當前無論是傳統(tǒng)企業(yè)還是互聯(lián)網(wǎng)公司對大數(shù)據(jù)實時分析和處理的要求越來越高,數(shù)據(jù)越實時價值越大,面向毫秒~秒級的實時大數(shù)據(jù)計算場景,Spark和Flink各有所長。CarbonData是一種高性能大數(shù)據(jù)存儲方案,已在20+企業(yè)生產(chǎn)環(huán)境上部署應(yīng)用,其中最大的單一集群數(shù)據(jù)規(guī)模達到幾萬億。
為幫助開發(fā)者更深入的了解這三個大數(shù)據(jù)開源技術(shù)及其實際應(yīng)用場景,9月8日,InfoQ聯(lián)合華為云舉辦了一場實時大數(shù)據(jù)Meetup,集結(jié)了來自Databricks、華為及美團點評的大咖級嘉賓前來分享。
本文整理了其中的部分精彩內(nèi)容,同時,作為本次活動的承辦方,InfoQ整理上傳了所有講師的演講PPT,感興趣的同學(xué)可以下載講師PPT獲取完整資料 。
Spark Structured Streaming特性介紹 (講師PPT下載)
作為Spark Structured Streaming最核心的開發(fā)人員、Databricks工程師,Tathagata Das(以下簡稱“TD”)在開場演講中介紹了Structured Streaming的基本概念,及其在存儲、自動流化、容錯、性能等方面的特性,在事件時間的處理機制,最后帶來了一些實際應(yīng)用場景。
首先,TD對流處理所面對的問題和概念做了清晰的講解。TD提到,因為流處理具有如下顯著的復(fù)雜性特征,所以很難建立非常健壯的處理過程:
? 一是數(shù)據(jù)有各種不同格式(Jason、Avro、二進制)、臟數(shù)據(jù)、不及時且無序;
? 二是復(fù)雜的加載過程,基于事件時間的過程需要支持交互查詢,和機器學(xué)習(xí)組合使用;
? 三是不同的存儲系統(tǒng)和格式(SQL、NoSQL、Parquet等),要考慮如何容錯。
因為可以運行在Spark SQL引擎上,Spark Structured Streaming天然擁有較好的性能、良好的擴展性及容錯性等Spark優(yōu)勢。除此之外,它還具備豐富、統(tǒng)一、高層次的API,因此便于處理復(fù)雜的數(shù)據(jù)和工作流。再加上,無論是Spark自身,還是其集成的多個存儲系統(tǒng),都有豐富的生態(tài)圈。這些優(yōu)勢也讓Spark Structured Streaming得到更多的發(fā)展和使用。
流的定義是一種無限表(unbounded table),把數(shù)據(jù)流中的新數(shù)據(jù)追加在這張無限表中,而它的查詢過程可以拆解為幾個步驟,例如可以從Kafka讀取JSON數(shù)據(jù),解析JSON數(shù)據(jù),存入結(jié)構(gòu)化Parquet表中,并確保端到端的容錯機制。其中的特性包括:
? 支持多種消息隊列,比如Files/Kafka/Kinesis等。
? 可以用join(), union()連接多個不同類型的數(shù)據(jù)源。
? 返回一個DataFrame,它具有一個無限表的結(jié)構(gòu)。
? 你可以按需選擇SQL(BI分析)、DataFrame(數(shù)據(jù)科學(xué)家分析)、DataSet(數(shù)據(jù)引擎),它們有幾乎一樣的語義和性能。
? 把Kafka的JSON結(jié)構(gòu)的記錄轉(zhuǎn)換成String,生成嵌套列,利用了很多優(yōu)化過的處理函數(shù)來完成這個動作,例如from_json(),也允許各種自定義函數(shù)協(xié)助處理,例如Lambdas, flatMap。
? 在Sink步驟中可以寫入外部存儲系統(tǒng),例如Parquet。在Kafka sink中,支持foreach來對輸出數(shù)據(jù)做任何處理,支持事務(wù)和exactly-once方式。
? 支持固定時間間隔的微批次處理,具備微批次處理的高性能性,支持低延遲的連續(xù)處理(Spark 2.3),支持檢查點機制(check point)。
? 秒級處理來自Kafka的結(jié)構(gòu)化源數(shù)據(jù),可以充分為查詢做好準備。
Spark SQL把批次查詢轉(zhuǎn)化為一系列增量執(zhí)行計劃,從而可以分批次地操作數(shù)據(jù)。
在容錯機制上,Structured Streaming采取檢查點機制,把進度offset寫入stable的存儲中,用JSON的方式保存支持向下兼容,允許從任何錯誤點(例如自動增加一個過濾來處理中斷的數(shù)據(jù))進行恢復(fù)。這樣確保了端到端數(shù)據(jù)的exactly-once。
在性能上,Structured Streaming重用了Spark SQL優(yōu)化器和Tungsten引擎,而且成本降低了3倍!!更多的信息可以參考作者的blog。
Structured Streaming隔離處理邏輯采用的是可配置化的方式(比如定制JSON的輸入數(shù)據(jù)格式),執(zhí)行方式是批處理還是流查詢很容易識別。同時TD還比較了批處理、微批次-流處理、持續(xù)流處理三種模式的延遲性、吞吐性和資源分配情況。
在時間窗口的支持上,Structured Streaming支持基于事件時間(event-time)的聚合,這樣更容易了解每隔一段時間發(fā)生的事情。同時也支持各種用戶定義聚合函數(shù)(User Defined Aggregate Function,UDAF)。另外,Structured Streaming可通過不同觸發(fā)器間分布式存儲的狀態(tài)來進行聚合,狀態(tài)被存儲在內(nèi)存中,歸檔采用HDFS的Write Ahead Log (WAL)機制。當然,Structured Streaming還可自動處理過時的數(shù)據(jù),更新舊的保存狀態(tài)。因為歷史狀態(tài)記錄可能無限增長,這會帶來一些性能問題,為了限制狀態(tài)記錄的大小,Spark使用水印(watermarking)來刪除不再更新的舊的聚合數(shù)據(jù)。允許支持自定義狀態(tài)函數(shù),比如事件或處理時間的超時,同時支持Scala和Java。
TD在演講中也具體舉例了流處理的應(yīng)用情況。在蘋果的信息安全平臺中,每秒將產(chǎn)生有百萬級事件,Structured Streaming可以用來做缺陷檢測,下圖是該平臺架構(gòu):
在該架構(gòu)中,一是可以把任意原始日志通過ETL加載到結(jié)構(gòu)化日志庫中,通過批次控制可很快進行災(zāi)難恢復(fù);二是可以連接很多其它的數(shù)據(jù)信息(DHCP session,緩慢變化的數(shù)據(jù));三是提供了多種混合工作方式:實時警告、歷史報告、ad-hoc分析、統(tǒng)一的API允許支持各種分析(例如實時報警系統(tǒng))等,支持快速部署。四是達到了百萬事件秒級處理性能。
華為大數(shù)據(jù)架構(gòu)師蔡強在以CarbonData為主題的演講中主要介紹了企業(yè)對數(shù)據(jù)應(yīng)用的挑戰(zhàn)、存儲產(chǎn)品的選型決策,并深入講解了CarbonData的原理及應(yīng)用,以及對未來的規(guī)劃等。
企業(yè)中包含多種數(shù)據(jù)應(yīng)用,從商業(yè)智能、批處理到機器學(xué)習(xí),數(shù)據(jù)增長快速、數(shù)據(jù)結(jié)構(gòu)復(fù)雜的特征越來越明顯。在應(yīng)用集成上,需要也越來越多,包括支持SQL的標準語法、JDBC和ODBC接口、靈活的動態(tài)查詢、OLAP分析等。
針對當前大數(shù)據(jù)領(lǐng)域分析場景需求各異而導(dǎo)致的存儲冗余問題,CarbonData提供了一種新的融合數(shù)據(jù)存儲方案,以一份數(shù)據(jù)同時支持支持快速過濾查找和各種大數(shù)據(jù)離線分析和實時分析,并通過多級索引、字典編碼、預(yù)聚合、動態(tài)Partition、實時數(shù)據(jù)查詢等特性提升了IO掃描和計算性能,實現(xiàn)萬億數(shù)據(jù)分析秒級響應(yīng)。蔡強在演講中對CarbonData的設(shè)計思路做了詳細講解。
? 在數(shù)據(jù)統(tǒng)一存儲上:通過數(shù)據(jù)共享減少孤島和冗余,支持多種業(yè)務(wù)場景以產(chǎn)生更大價值。
? 大集群:區(qū)別于以往的單機系統(tǒng),用戶希望新的大數(shù)據(jù)存儲方案能應(yīng)對日益增多的數(shù)據(jù),隨時可以通過增加資源的方式橫向擴展,無限擴容。
? 易集成:提供標準接口,新的大數(shù)據(jù)方案與企業(yè)已采購的工具和IT系統(tǒng)要能無縫集成,支撐老業(yè)務(wù)快速遷移。另外要與大數(shù)據(jù)生態(tài)中的各種軟件能無縫集成。
? 高性能:計算與存儲分離,支持從GB到PB大規(guī)模數(shù)據(jù),十萬億數(shù)據(jù)秒級響應(yīng)。
? 開放生態(tài):與大數(shù)據(jù)生態(tài)無縫集成,充分利用云存儲和Hadoop集群的優(yōu)勢。
數(shù)據(jù)布局如下圖,CarbonData用一個HDFS文件構(gòu)成一個Block,包含若干Blocklet作為文件內(nèi)的列存數(shù)據(jù)塊,F(xiàn)ile Header/Fille Footer提供元數(shù)據(jù)信息,內(nèi)置Blocklet索引以及Blocklet級和Page級的統(tǒng)計信息,壓縮編碼采用RLE、自適應(yīng)編碼、Snappy/Zstd壓縮,數(shù)據(jù)類型支持所有基礎(chǔ)和復(fù)雜類型:
Carbon表支持索引,支持Segment級(注:一個批次數(shù)據(jù)導(dǎo)入為一個segment)的讀寫和數(shù)據(jù)靈活管理,如按segment進行數(shù)據(jù)老化和查詢等,文件布局如下:
? Spark Driver將集中式的索引存在內(nèi)存中,根據(jù)索引快速過濾數(shù)據(jù),Hive metastore存儲表的元數(shù)據(jù)(表的信息等)。
? 一次Load/Insert對應(yīng)生成一個Segment, 一個Segment包含多個Shard, 一個Shard就是一臺機器上導(dǎo)入的多個數(shù)據(jù)文件和一個索引文件組成。每個Segment 包含數(shù)據(jù)和元數(shù)據(jù)(CarbonData File和Index文件),不同的Segment可以有不同的文件格式,支持更多其他格式(CSV, Parquet),采用增量的數(shù)據(jù)管理方式,處理比分區(qū)管理的速度快很多。
查詢時會將filter和projection下推到DataMap(數(shù)據(jù)地圖)。它的執(zhí)行模型如下:
? 主要包括Index DataMap和MV DataMap兩種不同DataMap,三級Index索引架構(gòu)減少了Spark Task數(shù)和磁盤IO,MV可以進行預(yù)匯聚和join的操作,用數(shù)據(jù)入庫時間換取查詢時間。
? DataMap根據(jù)實際數(shù)據(jù)量大小選擇集中式或者分布式存儲,以避免大內(nèi)存問題。
? DataMap支持內(nèi)存或磁盤的存儲方式。
最后,蔡強也分析了CarbonData的具體使用和未來計劃。
在使用上,CarbonData提供了非常豐富的功能特性,用戶可權(quán)衡入庫時間、索引粒度和查詢性能,增量入庫等方面來靈活設(shè)置。表操作與SparkSQL深度集成,支持高檢測功能的可配置Table Properties。語法和API保持SparkSQL一致,支持并發(fā)導(dǎo)入、更新、合并和查詢。DataMap類似一張視圖表,可用于加速Carbon表查詢,通過datamap_provider支持Bloomfilter、Pre-aggregate、MV三種類型的地圖。流式入庫與Structured Streaming集成,實現(xiàn)準實時分析。支持同時查詢實時數(shù)據(jù)和歷史數(shù)據(jù),支持預(yù)聚合并自動刷新,聚合查詢會先檢查聚合操作,從而取得數(shù)據(jù)返回客戶端。準實時查詢,提供了Stream SQL標準接口,建立臨時的Source表和Sink表。支持類似Structured Streaming(結(jié)構(gòu)化流)的邏輯語句和調(diào)度作業(yè)。
CarbonData從2016年進入孵化器到2017年畢業(yè),一共發(fā)布了10多個穩(wěn)定的版本,今年9月份將會迎來1.5.0版的發(fā)布。1.5.0將支持Spark File Format,增強對S3上數(shù)據(jù)的支持,支持Spark2.3和Hadoop3.1以及復(fù)雜類型的支持。而1.5.1主要會對MV支持增量的加載,增強對DataMap的選擇,以及增強了對Presto的支持。