猿问

无法使用 Flink Table API 打印 CSV 文件

我正在尝试使用 Netbeans 在控制台上读取一个包含 34 个字段的文件。但是,我所能打印的只是模式。因为在与 csvreader 一起使用的这个特定版本的 Flink 中缺少打印选项。


请查看代码并帮助我了解应该更正的地方。我本来会使用CSVReader内置 API,但事实证明它不支持超过 22 个字段,因此求助于使用 Table API。还尝试使用CsvTableSource1.5.1 版 Flink,但语法不走运。由于.field("%CPU", Types.FLOAT())不断给出类型浮点数的错误无法识别的符号。我的主要目标是能够读取 CSV 文件然后发送到 Kafka 主题,但在此之前我想检查文件是否已读取,但还没有运气。


import org.apache.flink.table.api.Table;

import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.TableEnvironment;

import org.apache.flink.table.sources.CsvTableSource;

import org.apache.flink.types.Row;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSink;

import org.apache.flink.table.api.Types;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.table.sinks.CsvTableSink;

import org.apache.flink.table.sinks.TableSink;

import org.apache.flink.table.api.java.Slide;



public class CsvReader {

  public static void main(String[] args) throws Exception {


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);

CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",

            new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",

    "OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",

    "io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",

    "mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",

    "mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",

    "cpu_time_children_system", "container_nr_sleeping", "container_nr_running",

    "container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },



皈依舞
浏览 277回答 2
2回答

狐的传说

这段代码:package example.flink;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.sources.CsvTableSource;import org.apache.flink.types.Row;public class TestFlink {&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);&nbsp; &nbsp; &nbsp; &nbsp; CsvTableSource csvTableSource = CsvTableSource&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .builder()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .path("container_data.csv")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("%CPU", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("MEM", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("VSZ", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("RSS", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("timestamp", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("OOM_Score", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("io_read_count", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("io_write_count", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("io_read_bytes", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("io_write_bytes", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("io_read_chars", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("io_write_chars", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("num_fds", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("num_ctx_switches_voluntary", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("num_ctx_switches_involuntary", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_rss", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_vms", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_shared", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_text", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_lib", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_data", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_dirty", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_uss", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_pss", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("mem_swap", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("num_threads", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("cpu_time_user", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("cpu_time_system", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("cpu_time_children_user", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("cpu_time_children_system", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("container_nr_sleeping", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("container_nr_running", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("container_nr_stopped", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("container_nr_uninterruptible", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .field("container_nr_iowait", Types.FLOAT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .fieldDelimiter(",")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .lineDelimiter("\n")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ignoreFirstLine()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .ignoreParseErrors()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .commentPrefix("%")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();&nbsp; &nbsp; &nbsp; &nbsp; // name your table source&nbsp; &nbsp; &nbsp; &nbsp; tEnv.registerTableSource("container", csvTableSource);&nbsp; &nbsp; &nbsp; &nbsp; Table table = tEnv.scan("container");&nbsp; &nbsp; &nbsp; &nbsp; DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);&nbsp; &nbsp; &nbsp; &nbsp; // define the sink as common print on console here&nbsp; &nbsp; &nbsp; &nbsp; stream.print();&nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; env.execute();&nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}使用这些库(有些可能是多余的):compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.1'compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.5.1'compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.1'compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.1'compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.5.1'compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.5.1'compile group: 'org.apache.flink', name: 'flink-runtime_2.11', version: '1.5.1'至少正在运行。我不确定它是否提供了您需要的输出,但它几乎与您在最新编辑中的输出完全相同,但它正在 IDE 中运行。这有帮助吗?如果您的分隔符仍然是空格,请记住更改 .fieldDelimiter(",")

慕神8447489

您必须将其转换Table为 aDataStream才能打印它。最简单的方法是将其转换为 a DataStream<Row>,如下所示:DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);// print the stream & execute the programstream.print();env.execute();有关更多详细信息,请参阅文档。
随时随地看视频慕课网APP

相关分类

Java
我要回答