对于我的学士论文,我尝试使用 http 连接向 kafka 发送机器数据(在这种情况下,使用 python 脚本发送的历史数据)。我正在使用在 Windows 系统上的 docker 中运行的融合平台。
使用 python 脚本,我尝试将数据发送到 REST 代理。起初,我收到了关于我能够解决的数据类型的错误响应。
import pandas as pd
import csv, os, json, requests, time, datetime, copy, sys
if len(sys.argv) > 1:
bgrfc_value = str(sys.argv[1])
else:
print("No arguments for bgrfc given, defaulting to 'false'")
bgrfc_value = 'false'
if len(sys.argv) > 2:
filePath = str(sys.argv[2])
else:
filePath = "path"
if len(sys.argv) > 3:
batchSize = int(float(str(sys.argv[3])))
else:
batchSize = 10
# Build skeleton JSON
basejson = {"message": {"meta" : "", "data": ""}}
#metajson = [{'meta_key' : 'sender', 'meta_value': 'OPCR'},
# {'meta_key' : 'receiver', 'meta_value': 'CAT'},
# {'meta_key' : 'message_type', 'meta_value': 'MA1SEK'},
# {'meta_key' : 'bgrfc', 'meta_value': bgrfc_value}]
#basejson['message']['meta'] = metajson
url = "http://127.0.0.1:8082/"
headers = {'Content-Type':'application/json','Accept':'application/json'}
def assign_timestamps(batch):
newtimestamps = []
oldtimestamps = []
# Batch timestamps to list, add 10 newly generated timestamps to a list
for item in batch['tag_tsp'].values.tolist():
newtimestamps.append(datetime.datetime.now())
oldtimestamps.append(datetime.datetime.strptime(str(item), "%Y%m%d%H%M%S.%f"))
# Sort old timestamps without sorting the original array to preserve variance
temp = copy.deepcopy(oldtimestamps)
temp.sort()
mrtimestamp = temp[0]
# Replicate variance of old timestamps into the new timestamps
for x in range(batchSize):
diff = mrtimestamp - oldtimestamps[x]
newtimestamps[x] = newtimestamps[x] - diff
newtimestamps[x] = newtimestamps[x].strftime("%Y%m%d%H%M%S.%f")[:-3]
# Switch old timestamps with new timestamps
batch['tag_tsp'] = newtimestamps
return batch
该脚本发送数据,但作为响应,我得到状态代码 500。
茅侃侃
烙印99
相关分类