Hive 終于等來了 Flink-阿里云開發者社區

開發者社區> 阿里云實時計算> 正文

Hive 終于等來了 Flink

簡介: Flink 社區在集成 Hive 功能方面付出很多,目前進展也比較順利,最近 Flink 1.10.0 RC1 版本已經發布,感興趣的讀者可以進行調研和驗證功能。

作者:Jason

Apache Spark 什么時候開始支持集成 Hive 功能?筆者相信只要使用過 Spark 的讀者,應該都會說這是很久以前的事情了。

那 Apache Flink 什么時候支持與 Hive 的集成呢?讀者可能有些疑惑,還沒有支持吧,沒用過?或者說最近版本才支持,但是功能還比較弱。

其實比較也沒啥意義,不同社區發展的目標總是會有差異,而且 Flink 在真正的實時流計算方面投入的精力很多。不過筆者想表達的是,Apache Hive 已經成為數據倉庫生態系統的焦點,它不僅是一個用于大數據分析和 ETL 的 SQL 引擎,也是一個數據管理平臺,所以無論是 Spark,還是 Flink,或是 Impala、Presto 等,都會積極地支持集成 Hive 的功能。

的確,對真正需要使用 Flink 訪問 Hive 進行數據讀寫的讀者會發現,Apache Flink 1.9.0 版本才開始提供與 Hive 集成的功能。不過,值得欣慰的是,Flink 社區在集成 Hive 功能方面付出很多,目前進展也比較順利,最近 Flink 1.10.0 RC1 版本已經發布,感興趣的讀者可以進行調研和驗證功能。

架構設計

首先,筆者基于社區公開的資料以及博客,概括性地講解 Flink 集成 Hive 的架構設計。

Apache Flink 與 Hive 集成的目的,主要包含了元數據和實際表數據的訪問。

元數據

為了訪問外部系統的元數據,Flink 剛開始提供了 ExternalCatalog 的概念。但是 ExternalCatalog 的定義非常不完整,基本處于不可用的狀態。Flink 1.10 版本正式刪除了 ExternalCatalog API (FLINK-13697),這包括:

  • ExternalCatalog(以及所有依賴的類,比如 ExternalTable)
  • SchematicDescriptor、MetadataDescriptor 和 StatisticsDescriptor

針對 ExternalCatalog 的問題,Flink 社區提出了一套全新的 Catalog 接口(new Catalog API)來取代現有的 ExternalCatalog。新的 Catalog 實現的功能包括:

  • 能夠支持數據庫、表、分區等多種元數據對象
  • 允許在一個用戶 Session 中維護多個 Catalog 實例,從而支持同時訪問多個外部系統
  • Catalog 以可插拔的方式接入 Flink可以领救济金的游戏,允許用戶提供自定義的實現

下圖展示了新的 Catalog API 的總體架構:

640.png

創建 TableEnvironment 的時候會同時創建一個 CatalogManager可以领救济金的游戏,負責管理不同的 Catalog 實例。TableEnvironment 通過 Catalog 來為 Table API 和 SQL Client 用戶提供元數據服務。

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val tableEnv = TableEnvironment.create(settings)
val name            = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir     = "/opt/hive-conf"http:// a local path
val version         = "2.3.4"
val hive = newHiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")

目前 Catalog 有兩個實現,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元數據管理機制,將所有元數據保存在內存中。而 HiveCatalog 會與一個 Hive Metastore 的實例連接,提供元數據持久化的能力。要使用 Flink 與 Hive 進行交互,用戶需要配置一個 HiveCatalog,并通過 HiveCatalog 訪問 Hive 中的元數據。

另一方面,HiveCatalog 也可以用來處理 Flink 自身的元數據,在這種場景下,HiveCatalog 僅將 Hive Metastore 作為持久化存儲使用,寫入 Hive Metastore 中的元數據并不一定是 Hive 所支持的格式。一個 HiveCatalog 實例可以同時支持這兩種模式,用戶無需為管理 Hive 和 Flink 的元數據創建不同的實例。

另外,通過設計 HiveShim 來支持不同版本的 Hive Metastore,具體支持的 Hive 版本列表,請參考官方文檔。

表數據

Flink 提供了 Hive Data Connector 來讀寫 Hive 的表數據。Hive Data Connector 盡可能的復用了 Hive 本身的 Input/Output Format 和 SerDe 等類,這樣做的好處一方面是減少了代碼重復,更重要的是可以最大程度的保持與 Hive 的兼容,即 Flink 寫入的數據 Hive 可以正常讀取,并且反之亦然。

集成 Hive 功能

Flink 與 Hive 集成的功能在 1.9.0 版本中作為試用功能發布,存在不少使用的局限性,但是不久將發布的 Flink 1.10 穩定版本會更加完善集成 Hive 的功能并應用到企業場景中。

為了讓讀者提前體驗 Flink 1.10 集成 Hive 的功能,筆者會基于 Cloudera CDH 編譯 Flink 1.10.0 RC1 版本并進行較為完整的測試。

環境信息

CDH 版本:cdh5.16.2
Flink 版本:release-1.10.0-rc1

Flink 使用了 RC 版本,僅供測試,不建議用于生產環境。
目前 Cloudera Data Platform 正式集成了 Flink 作為其流計算產品,非常方便用戶使用。

CDH 環境開啟了 Sentry 和 Kerberos。

下載并編譯 Flink

$ wget https://github.com/apache/flink/archive/release-1.10.0-rc1.tar.gz
$ tar zxvf release-1.10.0-rc1.tar.gz
$ cd flink-release-1.10.0-rc1/
$ mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

不出意外的話,編譯到 flink-hadoop-fs 模塊時,會報如下錯誤:

[ERROR] Failed to execute goal on project flink-hadoop-fs: Could not resolve dependencies for project org.apache.flink:flink-hadoop-fs:jar:1.10.0: Failed to collect dependencies at org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Failed to read artifact descriptor for org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Could not transfer artifact org.apache.flink:flink-shaded-hadoop-2:pom:2.6.0-cdh5.16.2-9.0 from/to HDPReleases ( Remote host closed connection during handshake: SSL peer shut down incorrectly

編譯中遇到 flink-shaded-hadoop-2 找不到的問題,其實查看 Maven 倉庫會發現,根本原因是 CDH 的 flink-shaded-hadoop-2 的 jar 包在 Maven 中央倉庫是沒有對應的編譯版本,所以需要先對 Flink 依賴的 flink-shaded-hadoop-2 進行打包,再進行編譯。

■ 解決 flink-shaded-hadoop-2 問題

  • 獲取 flink-shaded 源碼
git clone https://github.com/apache/flink-shaded.git
  • 切換依賴的版本分支

根據上面報錯時提示缺少的版本切換對應的代碼分支,即缺少的是 9.0 版本的 flink-shaded-hadoop-2:

git checkout release-9.0
  • 配置 CDH Repo 倉庫

修改 flink-shaded 項目中的 pom.xml,添加 CDH maven 倉庫,否則編譯時找不到 CDH 相關的包。

在 ... 中添加如下內容:

<profile>
<id>vendor-repos</id>
<activation>
<property>
<name>vendor-repos</name>
</property>
</activation>
<!-- Add vendor maven repositories -->
<repositories>
<!-- Cloudera -->
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</profile>
  • 編譯 flink-shaded

開始執行編譯:

mvn clean install -DskipTests-Drat.skip=true-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

建議通過科學上網方式編譯,如果讀者遇到一些網絡連接的問題,可以試著重試或者更換依賴組件的倉庫地址。

編譯成功后,就會把 flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar 安裝在本地 maven 倉庫,如下為編譯的最后日志:

Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar to /Users/.../.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/dependency-reduced-pom.xml to /Users/.../.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.pom

重新編譯 Flink

mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2

漫長的等待過程,讀者可以并行做其他事情。

編譯過程中,如果不出意外的話,會看到類似下面的錯誤信息:

[INFO] Running 'npm ci --cache-max=0 --no-save' in /Users/xxx/Downloads/Flink/flink-release-1.10.0-rc1/flink-release-1.10.0-rc1/flink-runtime-web/web-dashboard [WARNING] npm WARN prepare removing existing node_modules/ before installation [ERROR] WARN registry Unexpected warning for Miscellaneous Warning ECONNRESET: request to failed, reason: read ECONNRESET [ERROR] WARN registry Using stale package data from due to a request error during revalidation. [ERROR] WARN registry Unexpected warning for Miscellaneous Warning ECONNRESET: request to failed, reason: read ECONNRESET

可以看到, flink-runtime-web 模塊引入了對 frontend-maven-plugin 的依賴,需要安裝 node、npm 和依賴組件。

如果沒有通過科學上網,可以修改 flink-runtime-web/pom.xml 文件,添加 nodeDownloadRoot 和 npmDownloadRoot 的信息:

<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>install node and npm</id>
<goals>
<goal>install-node-and-npm</goal>
</goals>
<configuration>
<nodeDownloadRoot>https://registry.npm.taobao.org/dist/</nodeDownloadRoot>
<npmDownloadRoot>https://registry.npmjs.org/npm/-/</npmDownloadRoot>
<nodeVersion>v10.9.0</nodeVersion>
</configuration>
</execution>
<execution>
<id>npm install</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>ci --cache-max=0 --no-save</arguments>
<environmentVariables>
<HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL>
</environmentVariables>
</configuration>
</execution>
<execution>
<id>npm run build</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>run build</arguments>
</configuration>
</execution>
</executions>
<configuration>
<workingDirectory>web-dashboard</workingDirectory>
</configuration>
</plugin>

編譯成功后,Flink 安裝文件位于 flink-release-1.10.0-rc1/flink-dist/target/flink-1.10.0-bin 目錄下,打包并上傳到部署到節點:

$ cd flink-dist/target/flink-1.10.0-bin
$ tar zcvf flink-1.10.0.tar.gz flink-1.10.0

部署和配置

Flink 部署比較簡單,解壓縮包即可。另外可以設置軟鏈接、環境變量等,筆者不再介紹。

Flink 的核心配置文件是 flink-conf.yaml,一個典型的配置如下:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 2048m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1
high-availability: zookeeper
high-availability.storageDir:hdfs:///user/flink110/recovery
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
state.backend: filesystem
state.checkpoints.dir: hdfs:///user/flink110/checkpoints
state.savepoints.dir:hdfs:///user/flink110/savepoints
jobmanager.execution.failover-strategy: region
rest.port: 8081
taskmanager.memory.preallocate: false
classloader.resolve-order: parent-first
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab:/home/flink_user/flink_user.keytab
security.kerberos.login.principal: flink_user
jobmanager.archive.fs.dir:hdfs:///user/flink110/completed-jobs
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.dir:hdfs:///user/flink110/completed-jobs
historyserver.archive.fs.refresh-interval: 10000

筆者只羅列了一些常見的配置參數,讀者根據實際情況修改。配置參數其實還是比較容易理解的,以后結合實戰的文章再進行詳細講解。

■ 集成 Hive 配置的依賴

如果要使用 Flink 與 Hive 集成的功能,除了上面的配置外,用戶還需要添加相應的依賴:

  • 如果需要使用 SQL Client,則需要將依賴的 jar 拷貝到 Flink 的 lib 目錄中
  • 如果需要使用 Table API,則需要將相應的依賴添加到項目中(如 pom.xml)
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>1.11-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.11-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>

筆者主要介紹使用 SQL Client 的方式,由于使用的 CDH 版本為 5.16.2,其中 Hadoop 版本為 2.6.0,Hive 版本為 1.1.0,所以需要將如下 jar 包拷貝到 flink 部署家目錄中的 lib 目錄下:

  • Flink 的 Hive connector

flink-connector-hive2.11-1.10.0.jar
flink-hadoop-compatibility2.11-1.10.0.jar
flink-orc_2.11-1.10.0.jar

flink-release-1.10.0-rc1/flink-connectors/flink-hadoop-compatibility/target/flink-hadoop-compatibility_2.11-1.10.0.jar
flink-release-1.10.0-rc1/flink-connectors/flink-connector-hive/target/flink-connector-hive_2.11-1.10.0.jar
flink-release-1.10.0-rc1/flink-formats/flink-orc/target/flink-orc_2.11-1.10.0.jar
  • Hadoop 依賴

flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar

flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
  • Hive 依賴

hive-exec-1.1.0-cdh5.16.2.jar
hive-metastore-1.1.0-cdh5.16.2.jar
libfb303-0.9.3.jar

/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec-1.1.0-cdh5.16.2.jar
/opt/cloudera/parcels/CDH/lib/hive/lib/hive-metastore-1.1.0-cdh5.16.2.jar
/opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-
0.9.3.jar

其中 flink-shaded-hadoop-2-uber 包含了 Hive 對于 Hadoop 的依賴。如果不用 Flink 提供的包,用戶也可以將集群中使用的 Hadoop 包添加進來,不過需要保證添加的 Hadoop 版本與 Hive 所依賴的版本是兼容的。

依賴的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用戶集群中 Hive 所提供的 jar 包,詳情請見支持不同的 Hive 版本。

Flink 部署的節點要添加 Hadoop、Yarn 以及 Hive 的客戶端。

■ 配置 HiveCatalog

多年來,Hive Metastore 在 Hadoop 生態系統中已發展成為事實上的元數據中心。許多公司在其生產中有一個單獨的 Hive Metastore 服務實例,以管理其所有元數據(Hive 元數據或非 Hive 元數據)。

如果同時部署了 Hive 和 Flink,那么通過 HiveCatalog 能夠使用 Hive Metastore 來管理 Flink 的元數據。

如果僅部署 Flink,HiveCatalog 就是 Flink 開箱即用提供的唯一持久化的 Catalog。如果沒有持久化的 Catalog,那么使用 Flink SQL CREATE DDL 時必須在每個會話中重復創建像 Kafka 表這樣的元對象,這會浪費大量時間。HiveCatalog 通過授權用戶只需要創建一次表和其他元對象,并在以后的跨會話中非常方便地進行引用和管理。

如果要使用 SQL Client 時,用戶需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的 catalogs 列表中可以指定一個或多個 Catalog 實例。

以下的示例展示了如何指定一個 HiveCatalog:

execution:
    planner: blink
    type: streaming
    ...
    current-catalog: myhive  # set the HiveCatalog as the current catalog of the session
    current-database: mydatabase
catalogs:  
  - name: myhive
     type: hive
     hive-conf-dir: /opt/hive-conf  # contains hive-site.xml
     hive-version:2.3.4

其中:

  • name 是用戶給每個 Catalog 實例指定的名字,Catalog 名字和 DB 名字構成了 FlinkSQL 中元數據的命名空間,因此需要保證每個 Catalog 的名字是唯一的。
  • type 表示 Catalog 的類型,對于 HiveCatalog 而言,type 應該指定為 hive。
  • hive-conf-dir 用于讀取 Hive 的配置文件,用戶可以將其設定為集群中 Hive 的配置文件目錄。
  • hive-version 用于指定所使用的 Hive 版本。

指定了 HiveCatalog 以后,用戶就可以啟動 sql-client,并通過以下命令驗證 HiveCatalog 已經正確加載。

Flink SQL> show catalogs;
default_catalog
myhive
Flink SQL> use catalog myhive;

其中 show catalogs 會列出加載的所有 Catalog 實例。需要注意的是,除了用戶在 sql-client-defaults.yaml 文件中配置的 Catalog 以外,FlinkSQL 還會自動加載一個 GenericInMemoryCatalog 實例作為內置的 Catalog,該內置 Catalog 默認名字為 default_catalog。

讀寫 Hive 表

設置好 HiveCatalog 以后就可以通過 SQL Client 或者 Table API 來讀寫 Hive 中的表了。

假設 Hive 中已經有一張名為 mytable 的表,我們可以用以下的 SQL 語句來讀寫這張表。

■ 讀數據

Flink SQL> show catalogs;
myhive
default_catalog
Flink SQL> use catalog myhive;
Flink SQL> show databases;
default
Flink SQL> show tables;
mytable
Flink SQL> describe mytable;
root
|-- name: name 
|-- type: STRING 
|-- name: value 
|-- type: DOUBLE
Flink SQL> SELECT * FROM mytable;
   name      value
__________ __________
   Tom        4.72
   John       8.0    
   Tom        24.2
   Bob.       3.14    
   Bob        4.72    
   Tom        34.9    
   Mary       4.79    
   Tiff          2.72    
   Bill          4.33    
   Mary       77.7

■ 寫數據

Flink SQL> INSERT INTO mytable SELECT 'Tom',
 25;
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;
# 靜態分區
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
# 動態分區
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
# 靜態分區和動態分區
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';

總結

在本文中,筆者首先介紹了 Flink 與 Hive 集成功能的架構設計,然后從源碼開始編譯,解決遇到的一些問題可以领救济金的游戏,接著部署和配置 Flink 環境以及集成 Hive 的具體操作過程,最后參考官方的案例,對 Hive 表進行讀寫操作。

后續,筆者會結合生產環境的實際使用情況,講解通過 Flink SQL 來操作 Hive。

參考:

版權聲明:本文中所有內容均屬于阿里云開發者社區所有,任何媒體、網站或個人未經阿里云開發者社區協議授權不得轉載、鏈接、轉貼或以其他方式復制發布/發表。申請授權請郵件developerteam@list.alibaba-inc.com,已獲得阿里云開發者社區協議授權的媒體、網站,在轉載使用時必須注明"稿件來源:阿里云開發者社區,原文作者姓名",違者本社區將依法追究責任。 如果您發現本社區中有涉嫌抄襲的內容,歡迎發送郵件至:developer2020@service.aliyun.com 進行舉報,并提供相關證據,一經查實,本社區將立刻刪除涉嫌侵權內容。

分享:
阿里云實時計算
使用釘釘掃一掃加入圈子
+ 訂閱

一套基于Apache Flink構建的一站式、高性能實時大數據處理平臺,廣泛適用于流式數據處理、離線數據處理、DataLake計算等場景。

官方博客
鏈接
  • 阿里云實時計算官網:
  • 實時計算應用案例與解決方案:
  • 實時計算應用場景:
  • 控制臺: