sparkstreaming+kafka实时计算例子(二)结果写回redis

大数据评论612阅读模式
摘要

sparkstreaming+kafka实时计算,将计算结果写回redis。

 

sparkstreaming+kafka实时计算例子(二)结果写回redis

 

sparkstreaming+kafka实时计算案例,此处不是将结果写回kafka,而写到redis数据库中,完整代码如下,可验证测试(需要再写一个数据生产者,来不断产生数据)。

import java.util.Properties
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import org.json4s._
import org.json4s.jackson.JsonMethods._
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import kafka.serializer.StringDecoder
import redis.clients.jedis.{Jedis, JedisPool}

object OpRedis {

  import redis.clients.jedis.JedisPoolConfig

  val poolConfig = new JedisPoolConfig()
  //redis的连接池
  private lazy val Jpool = new JedisPool(poolConfig,"172.18.245.104",6379,3000,"ruanke123")

  def getJedis():Jedis ={Jpool.getResource}
}

//case class logjson(userId:String, day:String, begintime:String, endtime:String, data: String)
case class obddate(data:String,rotate_speed:String,vehicle_speed:String,
                   oil_temperature:String,water_temperature:String,
                   turbine_pressure:String,battery_voltage:String,
                   throttle_percentage:String,intake_temperature:String,
                   environment_temperature:String,time:String)

case class logjson(userId:String, timestamp:String, data:Array[obddate])

object KafkaDirector {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    //构建conf ssc 对象
    val conf = new SparkConf().setAppName("Kafka_director").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(3))
    //设置数据检查点进行累计统计单词
    ssc.checkpoint("hdfs://172.18.245.105:8020/checkpoint/test")
    //kafka 需要Zookeeper  需要消费者组
    val topics = Set("test") //testcardata
    //broker的原信息 ip地址以及端口号
    val kafkaPrams = Map[String, String]("metadata.broker.list" -> "172.18.245.105:9092")
    //数据的输入了类型数据的解码类型
    val data = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaPrams, topics)
    val updateFunc = (curVal: Seq[Int], preVal: Option[Int]) => {
      //进行数据统计当前值加上之前的值
      var total = curVal.sum
      //最初的值应该是0
      var previous = preVal.getOrElse(0)
      //Some 代表最终的但会值
      Some(total + previous)
    }
    //统计结果,这部分才是算法程序
    //    val result = data.map(_._2).flatMap(_.split(" "))
    //      .map(value=>{
    //        //  隐式转换,使用json4s的默认转化器
    //        implicit val formats: DefaultFormats.type = DefaultFormats
    //        val json = parse(value)
    //        // 样式类从JSON对象中提取值
    //        val formatedJson = json.extract[logjson]
    //        val itermCount = formatedJson.data.size
    //        (formatedJson.userId,itermCount)
    //      })
    //      .updateStateByKey(updateFunc)
    //      .print()

    val result = data.map(_._2).flatMap(_.split(" "))
      .map(value =>
        try {
          implicit val formats: DefaultFormats.type = DefaultFormats
          val json = parse(value)
          // 样式类从JSON对象中提取值
          val formatedJson = json.extract[logjson]
          val itermCount = formatedJson.data.size
          (formatedJson.userId, itermCount)
        } catch {
          case e: Exception => {
            ("err", 1)
          }
        })
      .updateStateByKey(updateFunc)
    // .print()

    // 控制台打印
    result.print()
    // 写回Kafka
    result.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        //配置说明
        val jedis: Jedis = OpRedis.getJedis()
        partitionOfRecords.foreach({
          result =>
            //清楚结果类型
            jedis.set(result._1,result._2.toString)
        })
        jedis.close()
      }
    }
      //启动程序
      ssc.start()
      ssc.awaitTermination()
  }
}