博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming(9):实例-Streaming整合Spark SQL,进行wordcount功能
阅读量:4280 次
发布时间:2019-05-27

本文共 2320 字,大约阅读时间需要 7 分钟。

1.功能实现

     综合Spark Streaming和Spark SQL,进行word count的统计。核心理解DStream和RDD相互操作,需要通过使用foreachRDD这个API。

2.代码

package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.{Seconds, StreamingContext, Time}/**  * spark streaming 整合spark sql完成词频统计操作  * https://github.com/apache/spark/blob/v2.1.0/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala  */object SqlNetworkWordCount {  def main(args: Array[String]): Unit = {    val sparkConf=new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")    /***      * 创建StreamingContext需要sparkConf和batch interval      */    val ssc=new StreamingContext(sparkConf,Seconds(5))    val lines = ssc.socketTextStream("bigdata.ibeifeng.com", 6789)    val words = lines.flatMap(_.split(" "))    // Convert RDDs of the words DStream to DataFrame and run SQL query    words.foreachRDD { (rdd: RDD[String], time: Time) =>      // Get the singleton instance of SparkSession      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)      import spark.implicits._      // Convert RDD[String] to RDD[case class] to DataFrame      val wordsDataFrame = rdd.map(w => Record(w)).toDF()      // Creates a temporary view using the DataFrame      wordsDataFrame.createOrReplaceTempView("words")      // Do word count on table using SQL and print it      val wordCountsDataFrame =      spark.sql("select word, count(*) as total from words group by word")      println(s"========= $time =========")      wordCountsDataFrame.show()    }    ssc.start()    ssc.awaitTermination()  }  /** Case class for converting RDD to DataFrame */  case class Record(word: String)  /** Lazily instantiated singleton instance of SparkSession */  object SparkSessionSingleton {    @transient  private var instance: SparkSession = _    def getInstance(sparkConf: SparkConf): SparkSession = {      if (instance == null) {        instance = SparkSession          .builder          .config(sparkConf)          .getOrCreate()      }      instance    }  }}

3.测试

(1)打开nc输入

[root@bigdata hadoop-2.7.3]#  nc -lk 678920180808,ww20180808,ww20180808,ww

(2)结果:

========= 1537287530000 ms =========+-----------+-----+|       word|total|+-----------+-----+|20180808,ww|    3|+-----------+-----+

 

转载地址:http://ztygi.baihongyu.com/

你可能感兴趣的文章
edison intel sst audio驱动注册流程
查看>>
sp<>,wp<>
查看>>
Linux ALSA 声卡驱动之一:ALSA架构简介
查看>>
Linux ALSA 声卡驱动之二:声卡的创建
查看>>
Linux ALSA 声卡驱动之三:PCM设备的创建
查看>>
Linux ALSA 声卡驱动之四:Control设备的创建
查看>>
Linux ALSA 声卡驱动之五:移动设备中的ALSA(ASoc)
查看>>
Linux ALSA 声卡驱动之六:ASoc架构中的Machine
查看>>
Linux ALSA 声卡驱动之七:ASoc中的Codec
查看>>
Linux ALSA 声卡驱动之八:ASoc中的Platform
查看>>
ALSA声卡驱动中的DAPM详解之一:kcontrol
查看>>
ALSA声卡驱动中的DAPM详解之二:widget-具备路径和电源管理信息的kcontrol
查看>>
ALSA声卡驱动中的DAPM详解之三:如何定义各种widget
查看>>
ALSA声卡驱动中的DAPM详解之四:在驱动程序中初始化并注册widget和route
查看>>
ALSA声卡驱动中的DAPM详解之五:建立widget之间的连接关系
查看>>
ALSA声卡驱动中的DAPM详解之六:精髓所在,牵一发而动全身
查看>>
ALSA声卡驱动中的DAPM详解之七:dapm事件机制(dapm event)
查看>>
Android Audio System之一:AudioTrack如何与AudioFlinger交换音频数据
查看>>
Android Audio System之二:AudioFlinger
查看>>
Android Audio System之三:AudioPolicyService和AudioPolicyManager
查看>>