目前,我正在使用来自组织内 RabbitMQ 队列的消息。每天,我需要将所有收到的消息推送到一个csv中,该csv最终将作为Datawarehouse中的表登陆。
代码总是在监听队列,理想情况下,我希望将数据流式传输到csv。
#callback funtion on receiving messages
def onMessage(channel, method, properties, body):
print(body)
while True:
try:
#connect
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials))
channel = connection.channel()
channel.basic_consume(on_message_callback = onMessage, queue = queueName, auto_ack = True)
channel.start_consuming()
开始使用队列后收到的输出如下所示:这是收到的一行数据。它基本上返回一个json对象,但是b'{“metrics”:在使用json对象时需要删除。
b'{“metrics”:[{“ci_id”:“SPN-EQSHATA1”,“client_id”:“39956e6fdb256757567567567433333193a”,“name”:“deviceHealthScore”,“source_id”:“Global”,“source_management_platform”:“XXX”,“timestamp”:1582886099642,“unit”:client_id ci_id“configAssuranceScore”,“source_id”:“Global”,“source_management_platform”:“XXX”,“timestamp”:1582886099325,“unit”:“count”,“value”:“1.0”},{“ci_id”:”SPN-EQSHATA1“,”client_id“:”39956e6fdb25675756756743333193a“,”name“:”imageAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099325,”unit“:”count“,”count“,”value“:”1.0“},{”ci_id“:”SPN-EQSHATA1“,”client_id“:”39956e6fdb256757567567567433333193a“,”name“:”vulnerabilityAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099325,”unit“:”count“,”value“:”10.0“},{”ci_id“:”SPN-EQSHATA1“,”client_id“:”39956e6fdb256757567567433333193a“,”name“:”overallAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099642,”unit“:”count“,”value“:”5.5“}],”emr_published_on“:1582886099642}'
炎炎设计
相关分类