Golang - gzipping mongodb find 查询的游标数据,写入文件并解压缩时出错

我正在迭代一个 mongodb 游标并将数据压缩并发送到 S3 对象。尝试使用 解压缩上传的文件gzip -d时,出现以下错误,


gzip: 9.log.gz: invalid compressed data--crc error

gzip: 9.log.gz: invalid compressed data--length error

下面给出了我用于迭代、压缩、上传的代码,


// CursorReader struct acts as reader wrapper on top of mongodb cursor

type CursorReader struct {

    Csr *mongo.Cursor

}


// Read func reads the data from cursor and puts it into byte array

func (cr *CursorReader) Read(p []byte) (n int, err error) {

    dataAvail := cr.Csr.Next(context.TODO())

    if !dataAvail {

        n = 0

        err = io.EOF

        if cr.Csr.Close(context.TODO()) != nil {

            fmt.Fprintf(os.Stderr, "Error: MongoDB: getting logs: close cursor: %s", err)

        }

        return

    }

    var b bytes.Buffer

    w := gzip.NewWriter(&b)

    w.Write([]byte(cr.Csr.Current.String() + "\n"))

    w.Close()

    n = copy(p, []byte(b.String()))

    err = nil

    return

}

cursor, err := coll.Find(ctx, filter) // runs the find query and returns cursor

csrRdr := new(CursorReader) // creates a new cursorreader instance

csrRdr.Csr = cursor // assigning the find cursor to cursorreader instance

_, err = s3Uploader.Upload(&s3manager.UploadInput{  // Uploading the data to s3 in parts

    Bucket: aws.String("bucket"),

    Key:    aws.String("key")),

    Body:   csrRdr, 

})

如果数据低,那么我没有得到问题。但如果数据很大,那么我就会出错。到目前为止我调试的东西,试图压缩 1500 个文档,每个大小为 15MB,得到错误。即使我尝试将压缩后的字节直接写入本地文件,但我得到了同样的错误。


开心每一天1111
浏览 203回答 1
1回答

Helenr

问题似乎是反复调用gzip.NewWriter()infunc(*CursorReader) Read([]byte) (int, error)您正在gzip.Writer为每个调用分配一个新的Read. gzip压缩是有状态的,因此您只能Writer对所有操作使用单个实例。解决方案#1解决您的问题的一个相当简单的方法是读取游标中的所有行并将其传递gzip.Writer并将 gzip 压缩的内容存储到内存缓冲区中。var cursor, _ = collection.Find(context.TODO(), filter)defer cursor.Close(context.TODO())// prepare a buffer to hold gzipped datavar buffer bytes.Buffervar gz = gzip.NewWriter(&buffer)defer gz.Close()for cursor.Next(context.TODO()) {    if _, err = io.WriteString(gz, cursor.Current.String()); err != nil {        // handle error somehow  ¯\_(ツ)_/¯    }}// you can now use buffer as io.Reader// and it'll contain gzipped data for your serialized rows_, err = s3.Upload(&s3.UploadInput{    Bucket: aws.String("..."),    Key:    aws.String("...")),    Body:   &buffer, })解决方案#2另一种解决方案是使用goroutines创建一个流,按需读取和压缩数据,而不是在内存缓冲区中io.Pipe()。如果您正在读取的数据非常大并且您无法将所有数据都保存在内存中,这将非常有用。var cursor, _ = collection.Find(context.TODO(), filter)defer cursor.Close(context.TODO())// create pipe endpointsreader, writer := io.Pipe()// note: io.Pipe() returns a synchronous in-memory pipe// reads and writes block on one another// make sure to go through docs once.// now, since reads and writes on a pipe blocks// we must move to a background goroutine else// all our writes would block forevergo func() {    // order of defer here is important    // see: https://stackoverflow.com/a/24720120/6611700    // make sure gzip stream is closed before the pipe    // to ensure data is flushed properly    defer writer.Close()    var gz = gzip.NewWriter(writer)    defer gz.Close()    for cursor.Next(context.Background()) {        if _, err = io.WriteString(gz, cursor.Current.String()); err != nil {            // handle error somehow  ¯\_(ツ)_/¯        }    }}()// you can now use reader as io.Reader// and it'll contain gzipped data for your serialized rows_, err = s3.Upload(&s3.UploadInput{    Bucket: aws.String("..."),    Key:    aws.String("...")),    Body:   reader, })
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go