猿问

需要将大型 QueryRunner 结果流式传输到文件,似乎存储在内存中

我正在尝试构建一个 Java 应用程序,该应用程序可以将任意 SQL SELECT 查询的非常大的结果集流式传输到 JSONL 文件中,特别是通过 SQLServer 但希望与任何 JDBC 一起运行DataSource。在 Python 中,这很容易将 sql 客户端结果视为生成器,然后调用json.dumps(). 但是,在这段代码中,它似乎在写出之前将所有内容都放入了内存中,这通常会导致堆和垃圾收集异常。我需要运行它的查询非常大,最多可以带回 10GB 的原始数据。执行时间不是主要问题,只要它每次都有效。


我试过在每一行之后调用flush(这很荒谬),这似乎对小数据集有帮助,但对大数据集没有帮助。任何人都可以提出一个我可以用来轻松实现这一目标的策略吗?


在我的 SQL 客户端类中,我使用 Apache DbUtilsQueryRunner并MapListHandler创建一个Maps 列表,这是我需要的灵活性(与 Java 中需要指定模式和类型的更传统方法相比):


public List<Map<String, Object>> query(String queryText) {

    try {

        DbUtils.loadDriver("com.microsoft.sqlserver.jdbc.Driver");


        // this function just sets up all the connection properties. Ommitted for clarity

        DataSource ds = this.initDataSource();


        StatementConfiguration sc = new StatementConfiguration.Builder().fetchSize(10000).build();

        QueryRunner queryRunner = new QueryRunner(ds, sc);

        MapListHandler handler = new MapListHandler();

        return queryRunner.query(queryText, handler);

    } catch (Exception e) {

        logger.error(e.getMessage());

        e.printStackTrace();

        return null;

    }

}

JsonLOutputWriter班级:


JsonLOutputWriter(String filename) {

    GsonBuilder gsonBuilder = new GsonBuilder();

    gsonBuilder.serializeNulls();

    this.gson = gsonBuilder.create();

    try {

        this.writer = new PrintWriter(new File(filename), ENCODING);

    } catch (FileNotFoundException | UnsupportedEncodingException e) {

        e.printStackTrace();

    }

}


void writeRow(Map row) {

    this.writer.println(this.gson.toJson(row));

}


void flush() {

    this.writer.flush();

}

主要方法:


JsonLOutputWriter writer = new JsonLOutputWriter(outputFile)

for (Map row : client.query(inputSql)) {

    writer.writeRow(row);

}

writer.flush()


繁星点点滴滴
浏览 115回答 1
1回答

一只甜甜圈

基本上这不能在DbUtils开箱即用的情况下完成。我摆脱了QueryRunner并且MapListHandler因为处理程序创建了一个ArrayList. 我不是基于拉,而是基于推,创建了一个非常相似的方法MyQueryRunner,它需要 aMyRowHandler而不是返回一个集合,而是迭代ResultSet并调用我的输出函数。我确信有更优雅的方法可以做到这一点并返回某种行缓冲区,但这是我需要的 80/20 并且适用于大型数据集。行处理程序public class RowHandler {&nbsp; &nbsp; private static final RowProcessor ROW_PROCESSOR = new BasicRowProcessor();&nbsp; &nbsp; private JsonLOutputWriter writer;&nbsp; &nbsp; public RowHandler(JsonLOutputWriter writer) {&nbsp; &nbsp; &nbsp; &nbsp; this.writer = writer;&nbsp; &nbsp; }&nbsp; &nbsp; int handle(ResultSet rs) throws SQLException {&nbsp; &nbsp; &nbsp; &nbsp; AtomicInteger counter = new AtomicInteger();&nbsp; &nbsp; &nbsp; &nbsp; while (rs.next()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; writer.writeRow(this.handleRow(rs));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; counter.getAndIncrement();&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return counter.intValue();&nbsp; &nbsp; }&nbsp; &nbsp; protected Map<String, Object> handleRow(ResultSet rs) throws SQLException {&nbsp; &nbsp; &nbsp; &nbsp; return this.ROW_PROCESSOR.toMap(rs);&nbsp; &nbsp; }}查询处理程序class CustomQueryRunner extends AbstractQueryRunner {&nbsp; &nbsp; private final RowHandler rh;&nbsp; &nbsp; CustomQueryRunner(DataSource ds, StatementConfiguration stmtConfig, RowHandler rh) {&nbsp; &nbsp; &nbsp; &nbsp; super(ds, stmtConfig);&nbsp; &nbsp; &nbsp; &nbsp; this.rh = rh;&nbsp; &nbsp; }&nbsp; &nbsp; int query(String sql) throws SQLException {&nbsp; &nbsp; &nbsp; &nbsp; Connection conn = this.prepareConnection();&nbsp; &nbsp; &nbsp; &nbsp; return this.query(conn, true, sql);&nbsp; &nbsp; }&nbsp; &nbsp; private int query(Connection conn, boolean closeConn, String sql, Object... params)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throws SQLException {&nbsp; &nbsp; &nbsp; &nbsp; if (conn == null) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new SQLException("Null connection");&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; PreparedStatement stmt = null;&nbsp; &nbsp; &nbsp; &nbsp; ResultSet rs = null;&nbsp; &nbsp; &nbsp; &nbsp; int count = 0;&nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; stmt = this.prepareStatement(conn, sql);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.fillStatement(stmt, params);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; rs = this.wrap(stmt.executeQuery());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; count = rh.handle(rs);&nbsp; &nbsp; &nbsp; &nbsp; } catch (SQLException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; this.rethrow(e, sql, params);&nbsp; &nbsp; &nbsp; &nbsp; } finally {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(rs);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } finally {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(stmt);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (closeConn) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(conn);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; return count;&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Java
我要回答