我正在尝试运行以下流程:
从某处获取数据
创建新的本地 CSV 文件,将数据写入该文件
将 CSV 上传到 Bigquery
删除本地文件
但它似乎加载了空数据。这是代码:
func (c *Client) Do(ctx context.Context) error {
bqClient, err := bigquerypkg.NewBigQueryUtil(ctx, "projectID", "datasetID")
if err != nil {
return err
}
data, err := c.GetSomeData(ctx)
if err != nil {
return err
}
file, err := os.Create("example.csv")
if err != nil {
return err
}
defer file.Close()
// also file need to be delete
writer := csv.NewWriter(file)
defer writer.Flush()
timestamp := time.Now().UTC().Format("2006-01-02 03:04:05.000000000")
for _, d := range data {
csvRow := []string{
d.ID,
d.Name,
timestamp,
}
err = writer.Write(csvRow)
if err != nil {
log.Printf("error writing data to CSV: %v\n", err)
}
}
source := bigquery.NewReaderSource(file)
source.Schema = bigquery.Schema{
{Name: "id", Type: bigquery.StringFieldType},
{Name: "name", Type: bigquery.StringFieldType},
{Name: "createdAt", Type: bigquery.TimestampFieldType},
}
if _, err = bqClient.LoadCsv(ctx, "tableID", source); err != nil {
return err
}
return nil
}
LoadCSV()看起来像这样:
func (c *Client) LoadCsv(ctx context.Context, tableID string, src bigquery.LoadSource) (string, error) {
loader := c.bigQueryClient.Dataset(c.datasetID).Table(tableID).LoaderFrom(src)
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(ctx)
if err != nil {
return "", err
}
status, err := job.Wait(ctx)
if err != nil {
return job.ID(), err
}
if status.Err() != nil {
return job.ID(), fmt.Errorf("job completed with error: %v", status.Err())
}
return job.ID(), nil
}
运行后,bigquery 会创建架构但没有数据。如果我要更改os.Create()为os.Open()并且文件已经存在,那么一切正常。就像加载CSV时文件数据还没有写入(?)是什么原因?
慕无忌1623718
相关分类