在 Golang 中使用 BigQuery Write API

我正在尝试使用新的Bigquery Storage API从 Golang 进行流式插入。根据此页面,我了解到此 API 取代了旧的流式插入 bigquery API。

但是,文档中的示例都没有显示如何实际插入行。为了创建 AppendRowsRequest,我得到了以下结果:

&storagepb.AppendRowsRequest{

    WriteStream: resp.Name,

    Rows: &storagepb.AppendRowsRequest_ProtoRows{

        ProtoRows: &storagepb.AppendRowsRequest_ProtoData{

            WriterSchema: nil, // protobuf schema??

            Rows: &storagepb.ProtoRows{

                SerializedRows: [][]byte{}, // serialized protocol buffer data??

            },

        },

    },

}

我应该在上面的 SerializedRows 字段中输入什么数据?

上面的storagepb.ProtoRows结构记录在这里。不幸的是,给出的只是协议缓冲区主概述页面的链接。

谁能给我一个使用新的 Bigquery Storage API 将行从 Golang 流式传输到 bigquery 的示例?



FFIVE
浏览 284回答 2
2回答

波斯汪

在上述答案的帮助下,我找到了一个工作示例,该示例可在 github 上找到: https ://github.com/alexflint/bigquery-storage-api-example主要代码是:const (    project = "myproject"    dataset = "mydataset"    table   = "mytable"    trace   = "bigquery-writeclient-example" // identifies this client for bigquery debugging)// the data we will stream to bigqueryvar rows = []*Row{    {Name: "John Doe", Age: 104},    {Name: "Jane Doe", Age: 69},    {Name: "Adam Smith", Age: 33},}func main() {    ctx := context.Background()    // create the bigquery client    client, err := storage.NewBigQueryWriteClient(ctx)    if err != nil {        log.Fatal(err)    }    defer client.Close()    // create the write stream    // a COMMITTED write stream inserts data immediately into bigquery    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),        WriteStream: &storagepb.WriteStream{            Type: storagepb.WriteStream_COMMITTED,        },    })    if err != nil {        log.Fatal("CreateWriteStream: ", err)    }    // get the stream by calling AppendRows    stream, err := client.AppendRows(ctx)    if err != nil {        log.Fatal("AppendRows: ", err)    }    // get the protobuf descriptor for our row type    var row Row    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())    if err != nil {        log.Fatal("NormalizeDescriptor: ", err)    }    // serialize the rows    var opts proto.MarshalOptions    var data [][]byte    for _, row := range rows {        buf, err := opts.Marshal(row)        if err != nil {            log.Fatal("protobuf.Marshal: ", err)        }        data = append(data, buf)    }    // send the rows to bigquery    err = stream.Send(&storagepb.AppendRowsRequest{        WriteStream: resp.Name,        TraceId:     trace, // identifies this client        Rows: &storagepb.AppendRowsRequest_ProtoRows{            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{                // protocol buffer schema                WriterSchema: &storagepb.ProtoSchema{                    ProtoDescriptor: descriptor,                },                // protocol buffer data                Rows: &storagepb.ProtoRows{                    SerializedRows: data, // serialized protocol buffer data                },            },        },    })    if err != nil {        log.Fatal("AppendRows.Send: ", err)    }    // get the response, which will tell us whether it worked    _, err = stream.Recv()    if err != nil {        log.Fatal("AppendRows.Recv: ", err)    }    log.Println("done")}上面“Row”结构的协议缓冲区定义是:syntax = "proto3";package tutorial;option go_package = ".;main";message Row {    string Name = 1;    int32 Age = 2;}您需要首先使用与协议缓冲区对应的模式创建一个 bigquery 数据集和表。请参阅上面链接的存储库中的自述文件以了解如何执行此操作。运行上面的代码后,数据在 bigquery 中显示如下:$ bq query 'select * from mydataset.mytable'Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   +------------+-----+|    name    | age |+------------+-----+| John Doe   | 104 || Jane Doe   |  69 || Adam Smith |  33 |+------------+-----+感谢大家的帮助!

慕斯709654

我找到了一些关于将流写入表的文档[1] [2],但我不确定这是否是您要查找的内容。请记住, storage/apiv1beta2 当前处于 beta 状态,因此可能尚未实现或缺少有关它的文档。如果我附加的文档对您没有帮助,我们可以打开一个公共问题跟踪器来正确记录或实施行流。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go