KSQL:Apache Kafka 的開源 Streaming SQL 引擎

收藏待读

KSQLApache Kafka開源 Streaming SQL 引擎

KSQL:Apache Kafka 的開源 Streaming SQL 引擎

微信公眾號: 深廣大數據Club

關注可了解更多大數據相關資訊。問題或建議,請公眾號留言;

如果你覺得深廣大數據Club你有幫助,歡迎轉發朋友圈分享

現代企業以數據為核心,隨着數量的增加,這些數據正在快速變化。流處理允許企業實時利用這些信息,Netflix,Uber,Airbnb,PayPal和紐約時報等數萬家公司使用Apache Kafka作為重塑其行業的首選流媒體平台。無論您是預訂酒店還是航班,乘坐的士,玩視頻遊戲,閱讀報紙,在線購物或匯款,許多日常活動都是由Kafka在幕後提供支持。

然而,流處理的世界仍然具有很高的進入門檻。當今最流行的流處理技術,包括Apache Kafka的Streams API,仍然要求用戶使用Java或Scala等編程語言編寫代碼。對編碼技能的這一嚴格要求阻礙了許多公司將流處理的優勢發揮到極致。但幸運的是,現在有更好的方法。

什麼是KSQL?

KSQL是Apache Kafka的流式SQL引擎,它大大降低了流處理世界的門檻。

KSQL實現了非常有效的功能:使用數據領域中大多數社區已知的語義SQL實時讀取,編寫和轉換數據!

KSQL:Apache Kafka 的開源 Streaming SQL 引擎

KSQL解決了什麼問題?

如前所述,KSQL解決了在Kafka上提供SQL接口的主要問題,而無需使用Python或Java等外部語言。

然而,人們可能會爭辯說,之前通過在Oracle數據庫或BigQuery等目標數據存儲上進行的ETL操作解決了同樣的問題。那麼在KSQL方法中有什麼不同?有什麼好處?

我認為的主要區別在於連續查詢的概念:隨着新數據到達Kafka主題,KSQL轉換會不斷完成。另一方面,在數據庫(或BigQuery等大數據平台)中完成的轉換是一次性的,如果新數據到達,則必須再次執行相同的轉換。

KSQL:Apache Kafka 的開源 Streaming SQL 引擎

KSQL能做什麼呢?

KSQL是開源的(Apache 2.0許可),並構建在Kafka的Streams API之上。這意味着它支持各種強大的流處理操作,包括過濾,轉換,聚合,連接,窗口和會話。

通過這種方式,您可以實時檢測異常和欺詐活動,監控基礎架構和物聯網設備,執行基於會話的用戶活動分析,執行實時ETL等等。

從通用的角度來看,當數據流中需要動態地進行轉換,集成和分析時,你應該使用KSQL。

實時監控和實時分析

KSQL的一個用途是定義實時計算的自定義業務級度量標準,您可以從中監視和提醒。例如,展示視頻遊戲特許經營權的並發在線玩家數量(「我們的玩家是否參與?最新遊戲擴展是否增加了遊戲時間?」)或報告電子商務網站的廢棄購物車數量(「我們的在線商店的最新更新是否讓客戶更容易結賬?」)

另一個用途是在KSQL中為您的業務應用程序定義正確性概念,然後檢查它們是否在生產中運行時滿足此要求

KSQL可以直接從原始事件流中定義適當的度量標準,無論這些是從數據庫更新,應用程序,移動設備還是任何其他類型生成的:

CREATE TABLE possibly_failing_vehicles AS
   SELECT vehicle, COUNT(*)
   FROM vehicle_monitoring_stream
   WINDOW TUMBLING (SIZE 5 MINUTES)
   WHERE  event_type = 'ERROR'
   GROUP BY vehicle
   HAVING COUNT(*) > 2;

在線數據集成和豐富

公司完成的大多數數據處理屬於數據豐富領域:從幾個數據庫中獲取數據,轉換數據,將其連接在一起,並將其存儲到鍵值存儲,搜索索引,緩存或其他數據服務系統。

KSQL與Kafka連接器一起用於Oracle,MySQL,Elasticsearch,

HDFS或S3等系統時,可以實現從批量數據集成到實時數據集成的轉變。

如下面的KSQL查詢所示,您可以使用流表連接來豐富包含存儲在表中的元數據的數據流,或者在將流加載到另一個系統之前對個人身份信息(PII)進行簡單過濾。

CREATE STREAM vip_users AS
   SELECT user_id, user_country, web_page, action
   FROM website_clickstream c
   LEFT JOIN users u ON u.user_id = c.user_id
   WHERE u.level = 'Platinum';

安全和異常檢測

KSQL查詢可以將事件流轉換為數字時間序列聚合,這些聚合使用Kafka-Elastic連接器注入系統(如Elastic),然後在實時儀錶板

(如Grafana)中可視化。安全用例通常與監視和分析類似。在這裡,您不是要監控應用程序行為或業務行為,而是在尋找欺詐,濫用,垃圾郵件,入侵或其他不良行為的模式。

KSQL提供了一種簡單而複雜的實時方法來定義這些模式並查詢實時流:

CREATE TABLE possible_fraud AS
   SELECT card_number, COUNT(*)
   FROM authorization_attempts
   WINDOW TUMBLING (SIZE 5 SECONDS)
   GROUP BY card_number
   HAVING COUNT(*) > 3;

流和數據庫

當然,KSQL的使用案例比我在這篇短篇文章中所展示的更多,例如監控車隊(「未來幾天卡車是否需要預測性維護?」)或分佈式物聯網設備和家庭自動化傳感器(「為什麼二樓的溫度會上升?」),或者實時分析Oracle中的數據庫更新。一些有創意的用戶甚至使用KSQL 實時分析賽車遙測數據。

但是,讓我們先從這些具體的例子後退一步。在我看來,更令人興奮的是,通過將數據庫從內向外轉換,KSQL將流(kafka)和數據庫(Oracle、MySQL和Friends)的世界結合在一起。在KSQL中,類似於Kafka的Streams API,有兩個核心數據抽象:流和表。它們允許您以流或表格式處理數據。這一點很重要,因為在實踐中,幾乎每個想要實現的實時用例都需要流和表。

KSQL:Apache Kafka 的開源 Streaming SQL 引擎

以下是一個稍微簡單的例子:作為零售商,您可以使用KSQL將Kafka中的實時客戶活動事件流(購買,地理位置更新等)聚合到不斷更新的客戶360度配置文件表中,加入了有關這些客戶的其他內部和外部信息。然後,此整合的客戶資料表可以為應用程序提供支持,例如通過KSQL或Kafka的Streams API檢測金融交易流中的欺詐性付款,或者可以通過Kafka的Connect框架和即用型連接器實時流式傳輸其數據傳統的RDBMS,如Oracle,PostgreSQL或MySQL,它們是您現有基礎架構的一部分。由於Apache Kafka(分佈式流媒體平台)的強大技術基礎,所有這些都是實時,容錯和大規模完成的。

KSQL如何工作?

那麼KSQL如何在幕後工作呢?要記住兩個概念:流和表。

流是結構化數據的序列,一旦事件被引入到流是不可變的,這意味着它不能被更新或刪除。想像一下從存儲器中推出或拉出的物品數量:「例如,今天庫存了200件ProductA,而100件ProductB被取出」。

另一方面,表表示基於來自流的事件的當前情況。例如:ProductA的庫存總量是多少?表格中的事實是可變的,如果ProductA不再有庫存,可以更新或刪除ProductA的數量。

KSQL:Apache Kafka 的開源 Streaming SQL 引擎

KSQL通過簡單的SQL方言實現流和表的定義。來自不同來源的各種流和表可以直接在KSQL中連接,從而實現數據組合和轉換。

在KSQL中創建的每個流或表將存儲在單獨的Topic中,允許使用常用的連接器或腳本從中提取信息。

KSQL內部結構

KSQL:Apache Kafka 的開源 Streaming SQL 引擎

有一個KSQL服務器進程執行查詢。一組KSQL進程作為一個集群來運行。可以通過啟動KSQL服務器的更多實例來動態添加更多的處理能力。這些實例具有容錯性:如果一個實例失效,另外幾個會接過它處理的工作。使用交互式KSQL命令行客戶軟件來啟動查詢,客戶軟件通過REST API向集群發送命令。命令行讓你可以檢查可用的數據流和表,執行新的查詢,檢查運行中查詢的狀態,並終止運行中查詢。在內部,KSQL是使用Kafka的Streams API構建的;它繼承了Kafka的彈性可擴展性、先進的狀態管理及容錯功能,還支持Kafka最近推出的只處理一次(exactly-once proecessing)語義。KSQL服務器嵌入這個機制,另外添加了分佈式SQL引擎(包括一些新穎的功能,比如提升查詢性能的位元組碼自動生成)以及用於查詢和控制的REST API。

啟動KSQL

KSQL可以在standalone模式和client-server模式下工作,第一個用於開發和測試場景,第二個用於支持生產環境。

使用standalone模式,KSQL客戶端和服務器託管在同一台機器上,位於同一個JVM中。另一方面,在Client-Server模式下,KSQL服務器池運行在遠程計算機上,客戶端通過HTTP進行連接。

這裡使用獨立模式,該過程在confluent文檔中得到了很好的解釋,包括三個步驟:

  • 克隆KSQL存儲庫

  • 編譯代碼

  • 使用local參數啟動KSQL

./bin/ksql-cli local

總結

KSQL提供了一種將Kafka保存為數據庫的唯一方法:無需在Kafka中取出數據,轉換和重新插入。每次轉換都可以使用Kafka SQL完成。

如前所述,KSQL現在可用於開發人員預覽Kafka數據,與更成熟的SQL產品相比,功能/功能列表在某種程度上受到限制。但是,在需要進行非常複雜的轉換的情況下,一旦數據落在目標數據存儲區中,仍然可以通過另一種語言(如Java)或專用ETL(或視圖)來解決這些轉換。

參考鏈接

https://www.oreilly.com/ideas/big-fast-easy-data-with-ksql

https://www.rittmanmead.com/blog/2017/10/ksql-streaming-sql-for-apache-kafka/

關注公眾號

KSQL:Apache Kafka 的開源 Streaming SQL 引擎

KSQL:Apache Kafka 的開源 Streaming SQL 引擎

原文 :

相關閱讀

免责声明:本文内容来源于mp.weixin.qq.com,已注明原文出处和链接,文章观点不代表立场,如若侵犯到您的权益,或涉不实谣言,敬请向我们提出检举。