我正在尝试使用 Netbeans 在控制台上读取一个包含 34 个字段的文件。但是,我所能打印的只是模式。因为在与 csvreader 一起使用的这个特定版本的 Flink 中缺少打印选项。
请查看代码并帮助我了解应该更正的地方。我本来会使用CSVReader内置 API,但事实证明它不支持超过 22 个字段,因此求助于使用 Table API。还尝试使用CsvTableSource1.5.1 版 Flink,但语法不走运。由于.field("%CPU", Types.FLOAT())不断给出类型浮点数的错误无法识别的符号。我的主要目标是能够读取 CSV 文件然后发送到 Kafka 主题,但在此之前我想检查文件是否已读取,但还没有运气。
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.api.java.Slide;
public class CsvReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",
new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",
"OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",
"io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",
"mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",
"mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",
"cpu_time_children_system", "container_nr_sleeping", "container_nr_running",
"container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },
狐的传说
慕神8447489
相关分类