在Java / Scala中以JSON格式发送流数据

我习惯使用python,并使用Scala Spark流媒体库处理实时Twitter流数据。现在,我可以作为字符串发送,但是,我的流服务需要JSON。有没有一种方法可以使我的代码轻松适应以JSON字典(而不是字符串)形式发送?


%scala


import scala.collection.JavaConverters._

import com.microsoft.azure.eventhubs._

import java.util.concurrent._


val namespaceName = "hubnamespace"

val eventHubName = "hubname"

val sasKeyName = "RootManageSharedAccessKey"

val sasKey = "key"

val connStr = new ConnectionStringBuilder()

            .setNamespaceName(namespaceName)

            .setEventHubName(eventHubName)

            .setSasKeyName(sasKeyName)

            .setSasKey(sasKey)


val pool = Executors.newFixedThreadPool(1)

val eventHubClient = EventHubClient.create(connStr.toString(), pool)


def sendEvent(message: String) = {

  val messageData = EventData.create(message.getBytes("UTF-8"))

  // CONVERT IT HERE?

  eventHubClient.get().send(messageData)

  System.out.println("Sent event: " + message + "\n")

}


import twitter4j._

import twitter4j.TwitterFactory

import twitter4j.Twitter

import twitter4j.conf.ConfigurationBuilder


val twitterConsumerKey = "key"

val twitterConsumerSecret = "key"

val twitterOauthAccessToken = "key"

val twitterOauthTokenSecret = "key"


val cb = new ConfigurationBuilder()

  cb.setDebugEnabled(true)

  .setOAuthConsumerKey(twitterConsumerKey)

  .setOAuthConsumerSecret(twitterConsumerSecret)

  .setOAuthAccessToken(twitterOauthAccessToken)

  .setOAuthAccessTokenSecret(twitterOauthTokenSecret)


val twitterFactory = new TwitterFactory(cb.build())

val twitter = twitterFactory.getInstance()


val query = new Query(" #happynewyear ")

query.setCount(100)

query.lang("en")

var finished = false

while (!finished) {

  val result = twitter.search(query)

  val statuses = result.getTweets()

  var lowestStatusId = Long.MaxValue

  for (status <- statuses.asScala) {

    if(!status.isRetweet()){

      sendEvent(status.getText())

    }

    lowestStatusId = Math.min(status.getId(), lowestStatusId)

    Thread.sleep(2000)

  }

  query.setMaxId(lowestStatusId - 1)

}


 eventHubClient.get().close()


扬帆大鱼
浏览 171回答 1
1回答

牧羊人nacy

Scala没有将字符串转换为Json的本地方法,您需要使用一个外部库。我建议使用Jackson。如果你使用gradle这个你可以添加这样的依赖性:compile("com.fasterxml.jackson.module:jackson-module-scala_2.12")。(使用适当的scala版本)然后,您可以像这样将数据对象简单地转换为JSON:val mapper = new ObjectMapper() with ScalaObjectMappermapper.registerModule(DefaultScalaModule)val json = valueToTree(messageData)我强烈建议您向Jackson投入精力,如果您使用JSON,将非常需要它。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java