如何使用 confluent 的 REST 代理将 JSON 数据发送到 kafka?

对于我的学士论文,我尝试使用 http 连接向 kafka 发送机器数据(在这种情况下,使用 python 脚本发送的历史数据)。我正在使用在 Windows 系统上的 docker 中运行的融合平台。


使用 python 脚本,我尝试将数据发送到 REST 代理。起初,我收到了关于我能够解决的数据类型的错误响应。


import pandas as pd

import csv, os, json, requests, time, datetime, copy, sys


if len(sys.argv) > 1:

    bgrfc_value = str(sys.argv[1])

else:

    print("No arguments for bgrfc given, defaulting to 'false'")

    bgrfc_value = 'false'


if len(sys.argv) > 2:

    filePath = str(sys.argv[2])

else:

    filePath = "path"



if len(sys.argv) > 3:

    batchSize = int(float(str(sys.argv[3])))

else:

    batchSize = 10



# Build skeleton JSON

basejson = {"message": {"meta" : "", "data": ""}}

#metajson = [{'meta_key' : 'sender', 'meta_value': 'OPCR'},

#           {'meta_key' : 'receiver', 'meta_value': 'CAT'},

#            {'meta_key' : 'message_type', 'meta_value': 'MA1SEK'},

#            {'meta_key' : 'bgrfc', 'meta_value': bgrfc_value}]

#basejson['message']['meta'] = metajson

url = "http://127.0.0.1:8082/"

headers = {'Content-Type':'application/json','Accept':'application/json'}


def assign_timestamps(batch):

    newtimestamps = []

    oldtimestamps = []


    # Batch timestamps to list, add 10 newly generated timestamps to a list

    for item in batch['tag_tsp'].values.tolist():

        newtimestamps.append(datetime.datetime.now())

        oldtimestamps.append(datetime.datetime.strptime(str(item), "%Y%m%d%H%M%S.%f"))


    # Sort old timestamps without sorting the original array to preserve variance

    temp = copy.deepcopy(oldtimestamps)

    temp.sort()

    mrtimestamp = temp[0]


    # Replicate variance of old timestamps into the new timestamps

    for x in range(batchSize):

        diff = mrtimestamp - oldtimestamps[x]

        newtimestamps[x] = newtimestamps[x] - diff

        newtimestamps[x] = newtimestamps[x].strftime("%Y%m%d%H%M%S.%f")[:-3]


    # Switch old timestamps with new timestamps

    batch['tag_tsp'] = newtimestamps

    return batch



该脚本发送数据,但作为响应,我得到状态代码 500。


慕丝7291255
浏览 234回答 2
2回答

茅侃侃

您的标头值不正确。你需要设置Accept和Content-type两个头下面给出:&nbsp;Accept: application/vnd.kafka.v2+json&nbsp;Content-Type : application/vnd.kafka.json.v2+json此外,数据应按以下方式结构化:{"records":[{"value":{<Put your json record here>}}]}例如 :{"records":[{"value":{"foo":"bar"}}]}

烙印99

我相信您放入“value”的数据必须是字符串。像这样的事情会起作用:{"records":[{"value":"{'foo':'bar'}"}]}如果您在阅读主题时收到一条有趣的消息,请尝试使用 base64 编码对消息进行编码。编码后的原始 json 字符串应如下所示:{"records":[{"value":"eyJmb28iOiJiYXIifQ=="}]}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python