Spark Streaming 集成 Kafka 總結

點擊hadoop123關注我喲

最知名的hadoop/spark大數據技術分享基地,分享hadoop/spark技術內幕hadoop/spark最新技術進展hadoop/spark行業技術應用發布hadoop/spark相關職位和求職信息hadoop/spark技術交流聚會講座以及會議等。


最近在做利用Spark streaming和Kafka進行數據分析的研究, 整理一些相應的開發文檔, 做了一些代碼實踐。 本文特意將這些資料記錄下來。

本文最後列出了一些參考的文檔,實際調研中參考了很多的資料,並沒有完全將它們記錄下來, 只列出了主要的一些參考資料。
當前的版本:

  • Spark: 1.2.0

  • Kafka: 0.8.1.1

Spark Streaming屬於Spark的核心api,它支持高吞吐量、支持容錯的實時流數據處理。 有以下特點:

  • 易於使用
    提供了和批處理一致的高級操作API,可以進行map, reduce, join, window。

  • 容錯
    Spark Streaming可以恢復你計算的狀態, 包括lost work和operator state (比如 sliding windows)。 支持worker節點和driver 節點恢復。

  • Spark集成
    可以結合批處理流和交互式查詢。 可以重用批處理的代碼。還可以直接使用內置的機器學習算法、圖算法包來處理數據。
    它可以接受來自文件系統, Akka actors, rsKafka, Flume, Twitter, ZeroMQ和TCP Socket的數據源或者你自己定義的輸入源。

它的工作流程像下面的圖所示一樣,接受到實時數據後,給數據分批次,然後傳給Spark Engine處理最後生成該批次的結果流。

Spark Streaming提供了一個高級的抽象模型,叫做discretized stream或者叫做DStream,它代表了一個持續的數據流。DStream既可以從Kafka, Flume, 和 Kinesis中產生, 或者在其它DStream上應用高級操作得到。 內部做到上一個DStream代表一個RDD序列。

一個簡單例子

在我們開始進入編寫我們自己的Spark Streaming程序細節之前, 讓我們先快速的看一個簡單的Sparking Streaming程序是什麼樣子的。 這個程序接收網路發過來的文本數據,讓我們統計一下文本中單詞的數量。 全部代碼如下:

首先, 我們導入Spark Streaming類名以及StreamingContext的一些隱式轉換到我們的環境中, 這樣可以為我們需要的類(比如DStream)增加一些有用的方法。. StreamingContext是所有功能的主入口。 我們創建了一個本地StreamingContext, 它使用兩個線程, 批處理間隔為1秒.

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))


使用這個context, 我們可以創建一個DStream, 代表來自TCP源的流數據。需要指定主機名和端口(如 localhost 和 9999).

val lines = ssc.socketTextStream("localhost", 9999)


這一行代表從數據服務器接受到的數據流. DStream中每條記錄是一行文本. 接下來, 我們想使用空格分隔每一行,這樣就可以得到文本中的單詞。

// Split each line into words
val words = lines.flatMap(_.split(" "))


flatMap 是一個一對多的DStream操作, 它從源DStream中的每一個Record產生多個Record, 這些新產生的Record組成了一個新的DStream。 在我們的例子中, 每一行文本被分成了多個單詞, 結果得到單詞流DStream. 下一步, 我們想統計以下單詞的數量.

import org.apache.spark.streaming.StreamingContext._

// Count each word in each batch
val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print()


單詞DStream 被mapped (one-to-one transformation) 成*(word, 1)對*的DStream ,然後reduced 得到每一批單詞的頻度. 最後, wordCounts.print()會列印出每一秒產生的一些單詞的統計值。 注意當這些行執行時,Spark Streaming僅僅設置這些計算, 它並沒有馬上被執行。 當所有的計算設置完後,我們可以調用下面的代碼啟動處理

ssc.start()

ssc.awaitTermination()


完整的代碼可以在例子 [NetworkWordCount](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala) 中找到. 如果你已經下載並編譯了Spark, 你可以按照下面的命令運行例子. 你要先運行Netcat工具作為數據服務器

$ nc -lk 9999


然後, 在另一個終端中, 你可以啟動例子

$ ./bin/run-example streaming.NetworkWordCount localhost 9999


然後, 在netcat服務器中輸入的每一行都會被統計,然後統計結果被輸出到螢幕上。 類似下面的輸出

# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

...

-------------------------------------------

Time: 1357008430000 ms

-------------------------------------------

(hello,1)

(world,1)

...


核心概念

本節介紹一些Spark和Kafka的概念
Spark cluster:
一個Spark集群至少包含一個worker節點。

worker node:
一個工作節點可以執行一個或者多個executor.

executor:
executor就是一個進程, 負責啟在一個worker節點上啟動應用,運行task執行計算,存儲數據到記憶體或者磁盤上。 每個Spark應用都有自己的executor。一個executor擁有一定數量的cores, 也被叫做「slots」, 可以執行指派給它的task。

job:
一個並行的計算單元,包含多個task。 在執行Spark action (比如 save, collect)產生; 在log中可以看到這個詞。

task:
一個task就是一個工作單元, 可以發送給一個executor執行。 它執行你的應用的實際計算的部分工作。 每個task占用父executor的一個slot (core)。

stage:
每個job都被分隔成多個彼此依賴稱之為stage的task(類似MapReduce中的map 和 reduce stage);

共享變量: 普通可序列化的變量復制到遠程各個節點。在遠程節點上的更新並不會返回到原始節點。因為我們需要共享變量。 Spark提供了兩種類型的共享變量。

Broadcast 變量。 SparkContext.broadcast(v)通過創建, **只讀**。

Accumulator: 累加器,通過SparkContext.accumulator(v)創建,在任務中只能調用add或者+操作,不能讀取值。只有驅動程序才可以讀取值。

receiver:
receiver長時間(可能7*24小時)運行在executor。 每個receiver負責一個 input DStream (例如 一個 讀取Kafka消息的input stream)。 每個receiver, 加上input DStream會占用一個core/slot.

input DStream:
一個input DStream是一個特殊的DStream, 將Spark Streaming連接到一個外部數據源來讀取數據。

kafka topic:
topic是發布消息發布的category 或者 feed名. 對於每個topic, Kafka管理一個分區的log,如下圖所示:


分區內的消息都是有序不可變的。

kafka partition:
partitions的設計目的有多個.最根本原因是kafka基於文件存儲.通過分區,可以將日志內容分散到多個server上,來避免文件尺寸達到單機磁盤的上限,每個partiton都會被當前server(kafka實例)保存;可以將一個topic切分多任意多個partitions(備註:基於sharding),來消息保存/消費的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升並發消費的能力.

kafka consumer group:
在kafka中,每個消費者要標記自己在那個組中。
如果所有的消費者都在同一個組中,則類似傳統的queue消息模式,消息只發給一個消費者。
如果消費者都在不同的組中, 則類似發布-訂閱消息模式。 每個消費者都會得到所有的消息。
最通用的模式是混用這兩種模式,如下圖:

關於kafka和消費者線程, 遵循下面的約束:
如果你的消費者讀取包含10個分區的 test的topic,

  • 如果你配置你的消費者只使用1個線程, 則它負責讀取十個分區

  • 如果你配置你的消費者只使用5個線程, 則每個線程負責讀取2個分區

  • 如果你配置你的消費者只使用10個線程, 則每個線程負責讀取1個分區

  • 如果你配置你的消費者只使用14個線程, 則10個線程各負責讀取1個分區,4個空閒

  • 如果你配置你的消費者只使用8個線程, 則6個線程個負責一個分區,2個線程各負責2個分區

從Kafka並行讀取

有幾種方法可以並行的讀取Kafka的消息。

Spark的KafkaInputDStream (也叫做Kafka 「connector」)使用 Kafka high-level consumer API讀取數據,所以有兩種方式可以並行的讀取數據。

  • 多個input DStream: Spark為每個input dstream運行一個receiver. 這意味著多個input dstream可以運行在多個core上並行讀取。 如果它們使用相同的topic,則相當於一個load balancer, 一個時間點上只有一個receiver讀取。 如果不同的topic,可以同時讀取。

val ssc: StreamingContext = ??? // ignore for now
val kafkaParams: Map[String, String] = Map("group.id" -> "test", /* ignore rest */)

val numInputDStreams = 5
val kafkaDStreams = (1 to numInputDStreams).map
{ _ => KafkaUtils.createStream(...) }


  • 每個input dstream的消費者線程數。 同一個receiver可以運行多個線程。 可以配置和分區相同的線程。

val ssc: StreamingContext = ??? // ignore for now
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)

val consumerThreadsPerInputDstream = 3
val topics = Map("test" -> consumerThreadsPerInputDstream)

val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)


或者你還可以混合這兩種情況:

val ssc: StreamingContext = ???

val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)

val numDStreams = 5
val topics = Map("zerg.hydra" -> 1)

val kafkaDStreams = (1 to numDStreams).map { _ =>

KafkaUtils.createStream(ssc, kafkaParams, topics, ...)

}


Spark並行處理

上面介紹了Kafka的並行化讀取的控制,在Spark中我們可以進行並行化處理。類似Kafka,Spark將parallelism設置的與(RDD)分區數量有關, 通過在每個RDD分區上運行task進行。在有些文檔中,分區仍然被稱為「slices」。
同樣兩個控制手段:

  • input DStreams的數量

  • DStream transformation的重分配(repartition): 這裡將獲得一個全新的DStream,其parallelism等級可能增加、減少,或者保持原樣。在DStream中每個返回的RDD都有指定的N個分區。DStream由一系列的RDD組成,DStream.repartition則是通過RDD.repartition做到。
    因此,repartition是從processing parallelism分隔read parallelism的主要途徑。在這裡,我們可以設置processing tasks的數量,也就是說設置處理過程中所有core的數量。間接上,我們同樣設置了投入machines/NICs的數量。

一個DStream轉換相關是 union。這個方法同樣在StreamingContext中,它將從多個DStream中返回一個統一的DStream,它將擁有相同的類型和滑動時間。union會將多個 DStreams壓縮到一個 DStreams或者RDD中,但是需要注意的是,這裡的parallelism並不會發生改變。

你的用例將決定你如何分區。如果你的用例是CPU密集型的,你希望對test topic進行5 read parallelism讀取。也就是說,每個消費者進程使用5個receiver,但是卻可以將processing parallelism提升到20。

val ssc: StreamingContext = ???
val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)
val readParallelism = 5
val topics = Map("test" -> 1)
val kafkaDStreams = (1 to readParallelism).map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
}
//> collection of five *input* DStreams = handled by five receivers/tasks
val unionDStream = ssc.union(kafkaDStreams)
//> single DStream
val processingParallelism = 20
val processingDStream = unionDStream(processingParallelism)
//> single DStream but now with 20 partitions


注意事項

在對Kafka進行讀寫上仍然存在一些含糊不清的問題,你可以在類似 Multiple Kafka Receivers and UnionHow to scale more consumer to Kafka stream mailing list的討論中發現。

  • Spark 1.1並不會恢復那些已經接收卻沒有進行處理的原始數據(查看)。因此,在某些情況下,你的Spark可能會丟失數據。Tathagata Das指出驅動恢復問題會在Spark的1.2版本中解決,現在已經提供Reliable Receiver 和Unreliable Receiver兩種Receiver。

  • 1.1版本中的Kafka連接器是基於Kafka的高級消費者API。這樣就會造成一個問題,Spark Streaming不可以依賴其自身的KafkaInputDStream將數據從Kafka中重新發送,從而無法解決下遊數據丟失問題(比如Spark服務器發生故障)。
    Dibyendu Bhattacharya 做到了使用簡單消費者API: kafka-spark-consumer.

  • 使用最新的Spark和Kafka,一些bugs已經在最新的Spark和Kafka中修復。

  • 在使用window操作時,window duration和sliding duration必須是DStream批處理的duration的整數倍。

  • 如果分配給應用的core的數量小於或者等於input DStream/receiver數量,則系統只接收數據, 沒有額外的core處理數據

  • 接上一條, 你在本地進行測試的時候,如果將master URL設置為「local」的話,則只有一個core運行任務,這明顯違反上一條, 只能接收數據,無法處理。

Kafak Topic 的分區和 Spark RDD的分區沒有任何關係。 它倆是分別設置的。

容錯

有兩種情況的機器失敗。

worker節點失敗

receiver接收到的消息在集群間有備份。如果只是一個節點失敗, Spark可以恢復。
但是如果是receiver所在的那個節點失敗,可能會有一點點數據丟失。 但是Receiver可以在其它節點上恢復啟動,繼續接收數據。

driver節點失敗

如果7*24工作的應用, 如果driver節點失敗,Spark Streaming也可以恢復。 Spark streaming定期的把元數據寫到HDFS中。 你需要設置checkpoint 文件夾。
為了支持恢復,必須遵循下面的處理:

  1. 當應用首次啟動時, 它會創建一個新的StreamingContext, 設置所有的流,然後啟動start().

  2. 當應用因失敗而恢復時, 它會從checkpoint文件中的checkpoint重建StreamingContext.

就像這樣:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {

val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams

...

ssc.checkpoint(checkpointDirectory) // set checkpoint directory

ssc

}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted

context. ...

// Start the context

context.start()

context.awaitTermination()


參考

  1. http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

  2. https://spark.apache.org/docs/1.2.0/streaming-kafka-integration.html

  3. https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html

關於作者@colobu

2000年畢業於中國科技大學,先後在清華同方,Motorola等公司工作,現在在一家創業型公司擔任PM & Engineer。
專注於以下四個領域:後端服務器的開發,大數據,前端開發,Android開發。


閱讀原文


關於作者:
最知名的Hadoop/Spark/Docker大數據技術基地,分享Hadoop技術內幕,Hadoop最新技術進展,發布Hadoop相關職位和求職信息,Hadoop技術交流聚會、講座以及會議等。

微信號:hadoop-123