在 os.Create() 后将 CSV 文件加载到 bigquery 中不加载数据

我正在尝试运行以下流程:

  1. 从某处获取数据

  2. 创建新的本地 CSV 文件,将数据写入该文件

  3. 将 CSV 上传到 Bigquery

  4. 删除本地文件

但它似乎加载了空数据。这是代码:

func (c *Client) Do(ctx context.Context) error {

    bqClient, err := bigquerypkg.NewBigQueryUtil(ctx, "projectID", "datasetID")

    if err != nil {

        return err

    }


    data, err := c.GetSomeData(ctx)

    if err != nil {

        return err

    }


    file, err := os.Create("example.csv")

    if err != nil {

        return err

    }

    defer file.Close()

    // also file need to be delete


    writer := csv.NewWriter(file)

    defer writer.Flush()


    timestamp := time.Now().UTC().Format("2006-01-02 03:04:05.000000000")

    for _, d := range data {

        csvRow := []string{

            d.ID,

            d.Name,

            timestamp,

        }

        err = writer.Write(csvRow)

        if err != nil {

            log.Printf("error writing data to CSV: %v\n", err)

        }

    }


    source := bigquery.NewReaderSource(file)

    source.Schema = bigquery.Schema{

        {Name: "id", Type: bigquery.StringFieldType},

        {Name: "name", Type: bigquery.StringFieldType},

        {Name: "createdAt", Type: bigquery.TimestampFieldType},

    }

    if _, err = bqClient.LoadCsv(ctx, "tableID", source); err != nil {

        return err

    }


    return nil

}

LoadCSV()看起来像这样:


func (c *Client) LoadCsv(ctx context.Context, tableID string, src bigquery.LoadSource) (string, error) {

    loader := c.bigQueryClient.Dataset(c.datasetID).Table(tableID).LoaderFrom(src)

    loader.WriteDisposition = bigquery.WriteTruncate

    job, err := loader.Run(ctx)

    if err != nil {

        return "", err

    }

    status, err := job.Wait(ctx)

    if err != nil {

        return job.ID(), err

    }


    if status.Err() != nil {

        return job.ID(), fmt.Errorf("job completed with error: %v", status.Err())

    }


    return job.ID(), nil

}

运行后,bigquery 会创建架构但没有数据。如果我要更改os.Create()为os.Open()并且文件已经存在,那么一切正常。就像加载CSV时文件数据还没有写入(?)是什么原因?


守着一只汪
浏览 95回答 1
1回答

慕无忌1623718

我在这里看到的问题是您没有将文件句柄的光标倒回到文件的开头。因此,下一次读取将在文件末尾,并且将是 0 字节读取。这就解释了为什么文件中似乎没有内容。https://pkg.go.dev/os#File.Seek可以为您处理。实际上,这Flush无关紧要,因为您使用相同的文件句柄来读取文件而不是写入文件,因此即使没有刷新,您也会看到自己写入的字节。如果文件由不同的进程打开或重新打开,则不会出现这种情况。编辑:OP声称在他们的情况下这种冲洗是必要的,我不能提供不同意的证据。冲洗也不会伤害任何东西。示范:package mainimport (    "fmt"    "io"    "os")func main() {    f, err := os.CreateTemp("", "data.csv")    if err != nil {        panic(err)    } else {        defer f.Close()        defer os.Remove(f.Name())    }    fmt.Fprintf(f, "hello, world")    fmt.Fprintln(os.Stderr, "Before rewind: ")    if _, err := io.Copy(os.Stderr, f); err != nil {        panic(err)    }    f.Seek(0, io.SeekStart)    fmt.Fprintln(os.Stderr, "\nAfter rewind: ")    if _, err := io.Copy(os.Stderr, f); err != nil {        panic(err)    }    fmt.Fprintln(os.Stderr, "\n")}% go run t.goBefore rewind:After rewind:hello, world
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go