波斯汪
在上述答案的帮助下,我找到了一个工作示例,该示例可在 github 上找到: https ://github.com/alexflint/bigquery-storage-api-example主要代码是:const ( project = "myproject" dataset = "mydataset" table = "mytable" trace = "bigquery-writeclient-example" // identifies this client for bigquery debugging)// the data we will stream to bigqueryvar rows = []*Row{ {Name: "John Doe", Age: 104}, {Name: "Jane Doe", Age: 69}, {Name: "Adam Smith", Age: 33},}func main() { ctx := context.Background() // create the bigquery client client, err := storage.NewBigQueryWriteClient(ctx) if err != nil { log.Fatal(err) } defer client.Close() // create the write stream // a COMMITTED write stream inserts data immediately into bigquery resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{ Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table), WriteStream: &storagepb.WriteStream{ Type: storagepb.WriteStream_COMMITTED, }, }) if err != nil { log.Fatal("CreateWriteStream: ", err) } // get the stream by calling AppendRows stream, err := client.AppendRows(ctx) if err != nil { log.Fatal("AppendRows: ", err) } // get the protobuf descriptor for our row type var row Row descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor()) if err != nil { log.Fatal("NormalizeDescriptor: ", err) } // serialize the rows var opts proto.MarshalOptions var data [][]byte for _, row := range rows { buf, err := opts.Marshal(row) if err != nil { log.Fatal("protobuf.Marshal: ", err) } data = append(data, buf) } // send the rows to bigquery err = stream.Send(&storagepb.AppendRowsRequest{ WriteStream: resp.Name, TraceId: trace, // identifies this client Rows: &storagepb.AppendRowsRequest_ProtoRows{ ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ // protocol buffer schema WriterSchema: &storagepb.ProtoSchema{ ProtoDescriptor: descriptor, }, // protocol buffer data Rows: &storagepb.ProtoRows{ SerializedRows: data, // serialized protocol buffer data }, }, }, }) if err != nil { log.Fatal("AppendRows.Send: ", err) } // get the response, which will tell us whether it worked _, err = stream.Recv() if err != nil { log.Fatal("AppendRows.Recv: ", err) } log.Println("done")}上面“Row”结构的协议缓冲区定义是:syntax = "proto3";package tutorial;option go_package = ".;main";message Row { string Name = 1; int32 Age = 2;}您需要首先使用与协议缓冲区对应的模式创建一个 bigquery 数据集和表。请参阅上面链接的存储库中的自述文件以了解如何执行此操作。运行上面的代码后,数据在 bigquery 中显示如下:$ bq query 'select * from mydataset.mytable'Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE +------------+-----+| name | age |+------------+-----+| John Doe | 104 || Jane Doe | 69 || Adam Smith | 33 |+------------+-----+感谢大家的帮助!