手记

Spark 整合 MongoDB

  • 环境准备
    1. mongodb下载
    2. 解压安装
    3. 启动mongodb服务
      $MONGODB_HOME/bin/mongod --fork --dbpath=/root/data/mongodb/ --logpath=/root/data/log/mongodb/mongodb.log 
  • pom依赖
     <dependency>
      <groupId>org.mongodb.spark</groupId>
      <artifactId>mongo-spark-connector_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
  • 实例代码
    object ConnAppTest {
    def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("ConnAppTest")
      .config("spark.mongodb.input.uri", "mongodb://192.168.31.136/testDB.testCollection") // 指定mongodb输入
      .config("spark.mongodb.output.uri", "mongodb://192.168.31.136/testDB.testCollection") // 指定mongodb输出
      .getOrCreate()
    // 生成测试数据
    val documents = spark.sparkContext.parallelize((1 to 10).map(i => Document.parse(s"{test: $i}")))
    // 存储数据到mongodb
    MongoSpark.save(documents)
    // 加载数据
    val rdd = MongoSpark.load(spark)
    // 打印输出
    rdd.show
    }
    }
2人推荐
随时随地看视频
慕课网APP