手记

15 Spark Streaming源码解读之No Receivers彻底思考

Spark Streaming在企业级使用中,一般会使用no receiver的方式读取数据,对应kafka中的Direct方式,采用no receiver的方式可以提高数据读取效率并保证事务的一致性,看看在Spark Streaming中是怎样使用kafka的Direct方式

  1. 首先展示一个Demo,代码如下

object DirectKafkaWordCount {
  def main(args: Array[String]) {    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(brokers, topics) = args    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 这里使用了KafkaUtils.createDirectStream方法来创建了一个DirectKafkaInputDStream,createDirectStream的代码如下

def createDirectStream[
    K: ClassTag,    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,      kafkaParams: Map[String, String],      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)
}
  1. 首先实例化了一个KafkaCluster,KafkaCluster里面封装了Spark Streaming操作kafka Api的所有方法。重点来了,如果我们在其他项目中需要读取kafka的数据,KafkaCluster就是最好的参考文档了。接着看getFromOffsets(kc, kafkaParams, topics)方法,返回的fromOffsets 变量类型为 Map[TopicAndPartition, Long],TopicAndPartition是kafka里的对象,代表了topic和他的第几个分区,Long的值就是offset(数据偏移量),kafka是一个分布式消息队列,每一个topic可以有多个partition,多个partition可以提高kafka的吞吐量。

  2. 接着看DirectKafkaInputDStream内部代码,compute()方法代码如下

//这里没有使用receiver,而是直接包装了KafkaRDD进行数据读取override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {    // 获取每个topic中每个partition的offset结束偏移量,对应currentOffsets。代表了每个partition数据的开始和结束位置
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))    // KafkaRDD用来从kafka中读取数据,类似hadoopRDD从hdfs中读取数据,如果我们读取其他数据来源,也可以自定义RDD
    val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    // Report the record number and metadata of this batch interval to InputInfoTracker.
    // offsetRanges 表示每次读取partition数据的范围
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }
    val description = offsetRanges.filter { offsetRange =>      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map("offsets" -> offsetRanges.toList,StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)    // 更新currentOffsets,这次的结束位置就下次的开始位置
    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
}

主要是获取untilOffsets ,把currentOffsets和untilOffsets 传递到KafkaRDD中,这样KafkaRDD就知道读取哪些数据了。

  1. 看KafkaRDD中都有哪些内容,首先使用KafkaRDD的伴生对象去创建Kafka的实例,看一下Kafka的apply方法,代码如下

def apply[
    K: ClassTag,    V: ClassTag,
    U <: Decoder[_]: ClassTag,
    T <: Decoder[_]: ClassTag,    R: ClassTag](
      sc: SparkContext,      kafkaParams: Map[String, String],      fromOffsets: Map[TopicAndPartition, Long],      untilOffsets: Map[TopicAndPartition, LeaderOffset],      messageHandler: MessageAndMetadata[K, V] => R
    ): KafkaRDD[K, V, U, T, R] = {
    val leaders = untilOffsets.map { case (tp, lo) =>
        tp -> (lo.host, lo.port)
    }.toMap    // offsetRanges代表了数据偏移量的开始和结束位置
    val offsetRanges = fromOffsets.map { case (tp, fo) =>
        val uo = untilOffsets(tp)
        OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }.toArray    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}

主要就是生成了一个offsetRanges,和leaders(数据所在的ip地址和端口),messageHandler也是一个很有意思的函数,一会再介绍

  1. 看KafkaRDD的代码,自定义一个RDD,最关键的三个方法分别是getPartitions、getPreferredLocations、compute。
    (1)首先看getPartitions,代码如下

override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
   }.toArray
}

每一个topic的每一个partition对应一个Spark中的partition,从这里我们可以评估一个kafka数据源(topic)需要多少个core资源来计算
(2)再看getPreferredLocations方法,代码如下

override def getPreferredLocations(thePart: Partition): Seq[String] = {
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    // TODO is additional hostname resolution necessary here
    Seq(part.host)
}

getPreferredLocations决定了数据本地性,如果kafka中broker和Spark在同一个集群中,此时getPreferredLocations获取本地性就可以极大提高效率,因为没有了数据网络传输的成本
(3)最后看compute方法,代码如下

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
    val part = thePart.asInstanceOf[KafkaRDDPartition]    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))    if (part.fromOffset == part.untilOffset) {
      log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
        s"skipping ${part.topic} ${part.partition}")
      Iterator.empty
    } else {
      new KafkaRDDIterator(part, context)
    }
}

主要功能封装在KafkaRDDIterator中,代码如下

private class KafkaRDDIterator(part: KafkaRDDPartition,context: TaskContext) extends NextIterator[R] {

    context.addTaskCompletionListener{ context => closeIfNeeded() }

    log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
      s"offsets ${part.fromOffset} -> ${part.untilOffset}")

    val kc = new KafkaCluster(kafkaParams)
    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
      .newInstance(kc.config.props)
      .asInstanceOf[Decoder[K]]
    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
      .newInstance(kc.config.props)
      .asInstanceOf[Decoder[V]]
    val consumer = connectLeader
    var requestOffset = part.fromOffset
    var iter: Iterator[MessageAndOffset] = null

    // The idea is to use the provided preferred host, except on task retry attempts,
    // to minimize number of kafka metadata requests
    private def connectLeader: SimpleConsumer = {      if (context.attemptNumber > 0) {
        kc.connectLeader(part.topic, part.partition).fold(
          errs => throw new SparkException(s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
              errs.mkString("\n")),
          consumer => consumer
        )
      } else {
        kc.connect(part.host, part.port)
      }
    }    private def handleFetchErr(resp: FetchResponse) {      if (resp.hasError) {
        val err = resp.errorCode(part.topic, part.partition)        if (err == ErrorMapping.LeaderNotAvailableCode ||
          err == ErrorMapping.NotLeaderForPartitionCode) {
          log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
            s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
          Thread.sleep(kc.config.refreshLeaderBackoffMs)
        }        // Let normal rdd retry sort out reconnect attempts
        throw ErrorMapping.exceptionFor(err)
      }
    }    // 3、获取一批数据
    private def fetchBatch: Iterator[MessageAndOffset] = {      // 4、包装一个request请求对象
      val req = new FetchRequestBuilder()
        .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
        .build()      // 5、使用SimpleConsumer对象发送fetch请求,返回response
      val resp = consumer.fetch(req)
      handleFetchErr(resp)      // kafka may return a batch that starts before the requested offset
      // 5、从返回的response中得到一个数据的迭代器
      resp.messageSet(part.topic, part.partition)
        .iterator
        .dropWhile(_.offset < requestOffset)
    }    override def close(): Unit = {      if (consumer != null) {
        consumer.close()
      }
    }    // 1、从读取数据开始看
    override def getNext(): R = {      if (iter == null || !iter.hasNext) {        // 2、获取一些数据,数据包装在迭代器中
        iter = fetchBatch
      }      if (!iter.hasNext) {        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      } else {        // 6、不断读取数据
        val item = iter.next()        if (item.offset >= part.untilOffset) {          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
          finished = true
          null.asInstanceOf[R]
        } else {
          requestOffset = item.nextOffset          // 7、将数据封装到MessageAndMetadata类中
          messageHandler(new MessageAndMetadata(part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
        }
      }
    }
}

跟着KafkaRDDIterator里的步骤来看代码
(4)最后返回的消息被封装到了MessageAndMetadata中,那么messageHandler是个什么东东,我们顺着代码向上找,是在KafkaUtils的createDirectStream方法有定义的,代码如下

val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)

原来是取消息中的key和message,如果我们想取出其他参数可以自定义messageHandler函数,是不是有点意思。



作者:海纳百川_spark
链接:https://www.jianshu.com/p/7d71f34eb4c4


0人推荐
随时随地看视频
慕课网APP