手记

基于Kafka+SparkStreaming+HBase实时点击流案例


背景

Kafka实时记录从数据采集工具Flume或业务系统实时接口收集数据,并作为消息缓冲组件为上游实时计算框架提供可靠数据支撑,Spark 1.3版本后支持两种整合Kafka机制(Receiver-based Approach 和 Direct Approach),具体细节请参考文章最后官方文档链接,数据存储使用HBase

实现思路

实现Kafka消息生产者模拟器

Spark-Streaming采用Direct Approach方式实时获取Kafka中数据

Spark-Streaming对数据进行业务计算后数据存储到HBase

本地虚拟机集群环境配置

由于笔者机器性能有限,hadoop/zookeeper/kafka集群都搭建在一起主机名分别为hadoop1,hadoop2,hadoop3; hbase为单节点 在hadoop1

缺点及不足

由于笔者技术有限,代码设计上有部分缺陷,比如spark-streaming计算后数据保存hbase逻辑性能很低,希望大家多提意见以便小编及时更正

代码实现

Kafka消息模拟器

package clickstream

import java.util.{Properties, Random, UUID}

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import org.codehaus.jettison.json.JSONObject

/**  * 

Created by 郭飞 on 2016/5/31.  

*/

object KafkaMessageGenerator {

  private val random = new Random()

  private var pointer = -1

  private val os_type = Array(

    "Android", "IPhone OS",

    "None", "Windows Phone")

  def click() : Double = {

    random.nextInt(10)

  }

  def getOsType() : String = {

    pointer = pointer + 1

    if(pointer >= os_type.length) {

      pointer = 0

      os_type(pointer)

    } else {

      os_type(pointer)

    }

  }

  def main(args: Array[String]): Unit = {

    val topic = "user_events"

    //本地虚拟机ZK地址

    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"

    val props = new Properties()

    props.put("metadata.broker.list", brokers)

    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val kafkaConfig = new ProducerConfig(props)

    val producer = new Producer[String, String](kafkaConfig)

    while(true) {

      // prepare event data

      val event = new JSONObject()

      event

        .put("uid", UUID.randomUUID())//随机生成用户id

        .put("event_time", System.currentTimeMillis.toString) //记录时间发生时间

        .put("os_type", getOsType) //设备类型

        .put("click_count", click) //点击次数

      // produce event message

      producer.send(new KeyedMessage[String, String](topic, event.toString))

      println("Message sent: " + event)

      Thread.sleep(200)

    }

  }

}

Spark-Streaming主类

package clickstream

import kafka.serializer.StringDecoder

import net.sf.json.JSONObject

import org.apache.hadoop.hbase.client.{HTable, Put}

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

  * Created by 郭飞 on 2016/5/31.

  */

object PageViewStream {

  def main(args: Array[String]): Unit = {

    var masterUrl = "local[2]"

    if (args.length > 0) {

      masterUrl = args(0)

    }

    // Create a StreamingContext with the given master URL

    val conf = new SparkConf().setMaster(masterUrl).setAppName("PageViewStream")

    val ssc = new StreamingContext(conf, Seconds(5))

    // Kafka configurations

    val topics = Set("PageViewStream")

    //本地虚拟机ZK地址

    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"

    val kafkaParams = Map[String, String](

      "metadata.broker.list" -> brokers,

      "serializer.class" -> "kafka.serializer.StringEncoder")

    // Create a direct stream

    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    val events = kafkaStream.flatMap(line => {

      val data = JSONObject.fromObject(line._2)

      Some(data)

    })

    // Compute user click times

    val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)

    userClicks.foreachRDD(rdd => {

      rdd.foreachPartition(partitionOfRecords => {

        partitionOfRecords.foreach(pair => {

          //Hbase配置

          val tableName = "PageViewStream"

          val hbaseConf = HBaseConfiguration.create()

          hbaseConf.set("hbase.zookeeper.quorum", "hadoop1:9092")

          hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")

          hbaseConf.set("hbase.defaults.for.version.skip", "true")

          //用户ID

          val uid = pair._1

          //点击次数

          val click = pair._2

          //组装数据

          val put = new Put(Bytes.toBytes(uid))

          put.add("Stat".getBytes, "ClickStat".getBytes, Bytes.toBytes(click))

          val StatTable = new HTable(hbaseConf, TableName.valueOf(tableName))

          StatTable.setAutoFlush(false, false)

          //写入数据缓存

          StatTable.setWriteBufferSize(3*1024*1024)

          StatTable.put(put)

          //提交

          StatTable.flushCommits()

        })

      })

    })

    ssc.start()

    ssc.awaitTermination()

  }

}

Maven POM文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.guofei.spark</groupId>

  <artifactId>RiskControl</artifactId>

  <version>1.0-SNAPSHOT</version>

  <packaging>jar</packaging>

  <name>RiskControl</name>

  <url>http://maven.apache.org</url>

  <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  </properties>

  <dependencies>

    <!--Spark core 及 streaming -->

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-core_2.10</artifactId>

      <version>1.3.0</version>

    </dependency>

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-streaming_2.10</artifactId>

      <version>1.3.0</version>

    </dependency>

    <!-- Spark整合Kafka-->

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-streaming-kafka_2.10</artifactId>

      <version>1.3.0</version>

    </dependency>

    <!-- 整合Hbase-->

    <dependency>

      <groupId>org.apache.hbase</groupId>

      <artifactId>hbase</artifactId>

      <version>0.96.2-hadoop2</version>

      <type>pom</type>

    </dependency>

    <!--Hbase依赖 -->

    <dependency>

      <groupId>org.apache.hbase</groupId>

      <artifactId>hbase-server</artifactId>

      <version>0.96.2-hadoop2</version>

    </dependency>

    <dependency>

      <groupId>org.apache.hbase</groupId>

      <artifactId>hbase-client</artifactId>

      <version>0.96.2-hadoop2</version>

    </dependency>

    <dependency>

      <groupId>org.apache.hbase</groupId>

      <artifactId>hbase-common</artifactId>

      <version>0.96.2-hadoop2</version>

    </dependency>

    <dependency>

      <groupId>commons-io</groupId>

      <artifactId>commons-io</artifactId>

      <version>1.3.2</version>

    </dependency>

    <dependency>

      <groupId>commons-logging</groupId>

      <artifactId>commons-logging</artifactId>

      <version>1.1.3</version>

    </dependency>

    <dependency>

      <groupId>log4j</groupId>

      <artifactId>log4j</artifactId>

      <version>1.2.17</version>

    </dependency>

    <dependency>

      <groupId>com.google.protobuf</groupId>

      <artifactId>protobuf-java</artifactId>

      <version>2.5.0</version>

    </dependency>

    <dependency>

      <groupId>io.netty</groupId>

      <artifactId>netty</artifactId>

      <version>3.6.6.Final</version>

    </dependency>

    <dependency>

      <groupId>org.apache.hbase</groupId>

      <artifactId>hbase-protocol</artifactId>

      <version>0.96.2-hadoop2</version>

    </dependency>

    <dependency>

      <groupId>org.apache.zookeeper</groupId>

      <artifactId>zookeeper</artifactId>

      <version>3.4.5</version>

    </dependency>

    <dependency>

      <groupId>org.cloudera.htrace</groupId>

      <artifactId>htrace-core</artifactId>

      <version>2.01</version>

    </dependency>

    <dependency>

      <groupId>org.codehaus.jackson</groupId>

      <artifactId>jackson-mapper-asl</artifactId>

      <version>1.9.13</version>

    </dependency>

    <dependency>

      <groupId>org.codehaus.jackson</groupId>

      <artifactId>jackson-core-asl</artifactId>

      <version>1.9.13</version>

    </dependency>

    <dependency>

      <groupId>org.codehaus.jackson</groupId>

      <artifactId>jackson-jaxrs</artifactId>

      <version>1.9.13</version>

    </dependency>

    <dependency>

      <groupId>org.codehaus.jackson</groupId>

      <artifactId>jackson-xc</artifactId>

      <version>1.9.13</version>

    </dependency>

    <dependency>

      <groupId>org.slf4j</groupId>

      <artifactId>slf4j-api</artifactId>

      <version>1.6.4</version>

    </dependency>

    <dependency>

      <groupId>org.slf4j</groupId>

      <artifactId>slf4j-log4j12</artifactId>

      <version>1.6.4</version>

    </dependency>

    <!-- Hadoop依赖包-->

    <dependency>

      <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-client</artifactId>

      <version>2.6.4</version>

    </dependency>

    <dependency>

      <groupId>commons-configuration</groupId>

      <artifactId>commons-configuration</artifactId>

      <version>1.6</version>

    </dependency>

    <dependency>

      <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-auth</artifactId>

      <version>2.6.4</version>

    </dependency>

    <dependency>

      <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-common</artifactId>

      <version>2.6.4</version>

    </dependency>

    <dependency>

      <groupId>net.sf.json-lib</groupId>

      <artifactId>json-lib</artifactId>

      <version>2.4</version>

      <classifier>jdk15</classifier>

    </dependency>

    <dependency>

      <groupId>org.codehaus.jettison</groupId>

      <artifactId>jettison</artifactId>

      <version>1.1</version>

    </dependency>

    <dependency>

      <groupId>redis.clients</groupId>

      <artifactId>jedis</artifactId>

      <version>2.5.2</version>

    </dependency>

    <dependency>

      <groupId>org.apache.commons</groupId>

      <artifactId>commons-pool2</artifactId>

      <version>2.2</version>

    </dependency>

  </dependencies>

  <build>

    <sourceDirectory>src/main/scala</sourceDirectory>

    <testSourceDirectory>src/test/scala</testSourceDirectory>

    <plugins>

      <plugin>

        <groupId>net.alchim31.maven</groupId>

        <artifactId>scala-maven-plugin</artifactId>

        <version>3.2.2</version>

        <executions>

          <execution>

            <goals>

              <goal>compile</goal>

              <goal>testCompile</goal>

            </goals>

            <configuration>

              <args>

                <arg>-make:transitive</arg>

                <arg>-dependencyfile</arg>

                <arg>${project.build.directory}/.scala_dependencies</arg>

              </args>

            </configuration>

          </execution>

        </executions>

      </plugin>

      <plugin>

        <groupId>org.apache.maven.plugins</groupId>

        <artifactId>maven-shade-plugin</artifactId>

        <version>2.4.3</version>

        <executions>

          <execution>

            <phase>package</phase>

            <goals>

              <goal>shade</goal>

            </goals>

            <configuration>

              <filters>

                <filter>

                  <artifact>*:*</artifact>

                  <excludes>

                    <exclude>META-INF/*.SF</exclude>

                    <exclude>META-INF/*.DSA</exclude>

                    <exclude>META-INF/*.RSA</exclude>

                  </excludes>

                </filter>

              </filters>

            </configuration>

          </execution>

        </executions>

      </plugin>

    </plugins>

  </build>

</project>

执行Spark-Streaming程序报错

org.apache.spark.SparkException: Task not serializable

userClicks.foreachRDD(rdd => { 

rdd.foreachPartition(partitionOfRecords => { 

partitionOfRecords.foreach(

这里面的代码中所包含的对象必须是序列化的

这里面的代码中所包含的对象必须是序列化的

这里面的代码中所包含的对象必须是序列化的

}) 

}) 

})

执行Maven打包报错,找不到依赖的jar包

error:not found: object kafka

ERROR import kafka.javaapi.producer.Producer

©著作权归作者所有:来自51CTO博客作者java架构师1的原创作品,如需转载,请注明出处,否则将追究法律责任


0人推荐
随时随地看视频
慕课网APP