如何将从 RabbitMQ 队列收到的 JSON 转换为 CSV?

目前,我正在使用来自组织内 RabbitMQ 队列的消息。每天,我需要将所有收到的消息推送到一个csv中,该csv最终将作为Datawarehouse中的表登陆。


代码总是在监听队列,理想情况下,我希望将数据流式传输到csv。


#callback funtion on receiving messages

def onMessage(channel, method, properties, body):

    print(body)


while True:

    try:

        #connect

        credentials = pika.PlainCredentials(username, password)

        connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials))



channel = connection.channel()

channel.basic_consume(on_message_callback = onMessage, queue = queueName, auto_ack = True)

        channel.start_consuming()

开始使用队列后收到的输出如下所示:这是收到的一行数据。它基本上返回一个json对象,但是b'{“metrics”:在使用json对象时需要删除。


b'{“metrics”:[{“ci_id”:“SPN-EQSHATA1”,“client_id”:“39956e6fdb256757567567567433333193a”,“name”:“deviceHealthScore”,“source_id”:“Global”,“source_management_platform”:“XXX”,“timestamp”:1582886099642,“unit”:client_id ci_id“configAssuranceScore”,“source_id”:“Global”,“source_management_platform”:“XXX”,“timestamp”:1582886099325,“unit”:“count”,“value”:“1.0”},{“ci_id”:”SPN-EQSHATA1“,”client_id“:”39956e6fdb25675756756743333193a“,”name“:”imageAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099325,”unit“:”count“,”count“,”value“:”1.0“},{”ci_id“:”SPN-EQSHATA1“,”client_id“:”39956e6fdb256757567567567433333193a“,”name“:”vulnerabilityAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099325,”unit“:”count“,”value“:”10.0“},{”ci_id“:”SPN-EQSHATA1“,”client_id“:”39956e6fdb256757567567433333193a“,”name“:”overallAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099642,”unit“:”count“,”value“:”5.5“}],”emr_published_on“:1582886099642}'


慕虎7371278
浏览 56回答 1
1回答

炎炎设计

b'...'只是意味着你得到了一个json模块可以愉快地处理的字节字符串。您将获得一个字典,对于键,它具有字典列表的值。该列表可以直接馈送数据帧。metrics这意味着您可以像以下方式一样简单地处理它:df = pd.DataFrame(json.loads(body)['metrics'])
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python