为 Hbase 连接器 Kafka 使用无模式 JSON 转换器

我正在为 kafka 使用 hbase sink 连接器(https://github.com/mravi/kafka-connect-hbase)。所以我尝试在事件解析器类中使用它的 JsonConverter 来实现这个连接器,如下所示。


{

  "name": "test-hbase",

  "config": {

    "connector.class": "io.svectors.hbase.sink.HBaseSinkConnector",

    "tasks.max": "1",

    "topics": "hbase_connect",

    "zookeeper.quorum": "xxxxx.xxxx.xx.xx,xxxxx.xxxx.xx.xx,xxxxx.xxxx.xx.xx",

    "event.parser.class": "io.svectors.hbase.parser.JsonEventParser",

    "hbase.hbase_connect.rowkey.columns": "id",

    "hbase.hbase_connect.family": "col1",

  }

}

这是我运行的 kafka connect 分布式属性:


key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false


internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

问题是当我尝试生成没有模式的 JSON 消息时,连接器抛出null pointer exception如下:


 [2018-12-10 16:45:06,607] ERROR WorkerSinkTask{id=hbase_connect-0}

 Task threw an uncaught and unrecoverable exception.

 Task is being killed and will not recover until manually restarted.

 (org.apache.kafka.connect.runtime.WorkerSinkTask:515)

 java.lang.NullPointerException

at io.svectors.hbase.util.ToPutFunction.apply(ToPutFunction.java:78)

at io.svectors.hbase.sink.HBaseSinkTask.lambda$4(HBaseSinkTask.java:105)

    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

这是我使用的消息:


{"id": "9","name": "wis"}

对这个错误有什么建议吗?


慕标5832272
浏览 211回答 1
1回答

慕工程0101907

这是 schema less json 的连接器的问题。它已在此处的版本中修复:https ://github.com/nishutayal/kafka-connect-hbase/issues/5
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java