本文共 2320 字,大约阅读时间需要 7 分钟。
综合Spark Streaming和Spark SQL,进行word count的统计。核心理解DStream和RDD相互操作,需要通过使用foreachRDD这个API。
package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{Seconds, StreamingContext, Time}/** * spark streaming 整合spark sql完成词频统计操作 * https://github.com/apache/spark/blob/v2.1.0/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala */object SqlNetworkWordCount { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") /*** * 创建StreamingContext需要sparkConf和batch interval */ val ssc=new StreamingContext(sparkConf,Seconds(5)) val lines = ssc.socketTextStream("bigdata.ibeifeng.com", 6789) val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD { (rdd: RDD[String], time: Time) => // Get the singleton instance of SparkSession val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ // Convert RDD[String] to RDD[case class] to DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") // Do word count on table using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() } ssc.start() ssc.awaitTermination() } /** Case class for converting RDD to DataFrame */ case class Record(word: String) /** Lazily instantiated singleton instance of SparkSession */ object SparkSessionSingleton { @transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } }}
(1)打开nc输入
[root@bigdata hadoop-2.7.3]# nc -lk 678920180808,ww20180808,ww20180808,ww
(2)结果:
========= 1537287530000 ms =========+-----------+-----+| word|total|+-----------+-----+|20180808,ww| 3|+-----------+-----+
转载地址:http://ztygi.baihongyu.com/