sparkstreaming+kafka实时计算例子(一)结果写回kafka

大数据评论708阅读模式
摘要

sparkstreaming+kafka的Demo,实时计算并将结果写回kafka。

 

sparkstreaming+kafka实时计算例子(一)结果写回kafka

 

使用sparkstreaming与kafka进行实时计算的例子,对obd数据进行实时统计计算,并将计算结果写回kafka。

import java.util.Properties
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.json4s._
import org.json4s.jackson.JsonMethods._
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import kafka.serializer.StringDecoder

//case class logjson(userId:String, day:String, begintime:String, endtime:String, data: String)
case class obddate(data:String,rotate_speed:String,vehicle_speed:String,
                   oil_temperature:String,water_temperature:String,
                   turbine_pressure:String,battery_voltage:String,
                   throttle_percentage:String,intake_temperature:String,
                   environment_temperature:String,time:String)

case class logjson(userId:String, timestamp:String, data:Array[obddate])

object KafkaDirector {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    //构建conf ssc 对象
    val conf = new SparkConf().setAppName("Kafka_director").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(3))
    //设置数据检查点进行累计统计单词
    ssc.checkpoint("hdfs://172.18.245.105:8020/checkpoint/test")
    //kafka 需要Zookeeper  需要消费者组
    val topics = Set("test") //testcardata
    //broker的原信息 ip地址以及端口号
    val kafkaPrams = Map[String,String]("metadata.broker.list" -> "172.18.245.105:9092")
    //数据的输入了类型数据的解码类型
    val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaPrams, topics)
    val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
      //进行数据统计当前值加上之前的值
      var total = curVal.sum
      //最初的值应该是0
      var previous = preVal.getOrElse(0)
      //Some 代表最终的但会值
      Some(total+previous)
    }
    //统计结果,这部分才是算法程序
//    val result = data.map(_._2).flatMap(_.split(" "))
//      .map(value=>{
//        //  隐式转换,使用json4s的默认转化器
//        implicit val formats: DefaultFormats.type = DefaultFormats
//        val json = parse(value)
//        // 样式类从JSON对象中提取值
//        val formatedJson = json.extract[logjson]
//        val itermCount = formatedJson.data.size
//        (formatedJson.userId,itermCount)
//      })
//      .updateStateByKey(updateFunc)
//      .print()

    val result = data.map(_._2).flatMap(_.split(" "))
      .map(value=>
       try {
          implicit val formats: DefaultFormats.type = DefaultFormats
          val json = parse(value)
          // 样式类从JSON对象中提取值
          val formatedJson = json.extract[logjson]
          val itermCount = formatedJson.data.size
           (formatedJson.userId,itermCount)
      }catch {
          case e: Exception => {
              ("err",1)
          }
      })
      .updateStateByKey(updateFunc)
    // .print()

    // 控制台打印
    result.print()
    // 写回Kafka
    result.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        //配置说明
        val producerProperties = new Properties()
        producerProperties.put("serializer.class", "kafka.serializer.StringEncoder")
        producerProperties.put("metadata.broker.list", "172.18.245.104:9092")
        producerProperties.put("request.required.acks", "1")
        val config: ProducerConfig = new ProducerConfig(producerProperties)
        //与kafka进行连接。此处用的是kafka自带的Producer,用spark的kafkaproducer也可以,但传送的方式不同
        val producer = new Producer[String,String](config)
        partitionOfRecords.foreach(record =>
          //发送数据,在这里key简单的用了相同的。实际情况应该用别的
          producer.send(new KeyedMessage("count_result","key",record.toString()))
        )}

    //启动程序
    ssc.start()
    ssc.awaitTermination()

  }
}