猿问

Mongodb 不会使用游标检索具有 200 万条记录的集合中的所有文档

我有 2,000,000 条记录的集合


> db.events.count();                                     │

2000000             

我使用 golang mongodb 客户端连接到数据库


package main


import (

    "go.mongodb.org/mongo-driver/bson"

    "go.mongodb.org/mongo-driver/mongo"

    "go.mongodb.org/mongo-driver/mongo/options"

)


func main() {

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    defer cancel()

    client, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27888").SetAuth(options.Credential{

        Username: "mongoadmin",

        Password: "secret",

    }))


    if err != nil {

        panic(err)

    }


    defer func() {

        if err = client.Disconnect(ctx); err != nil {

            panic(err)

        }

    }()



    collection := client.Database("test").Collection("events")


    var bs int32 = 10000

    var b = true

    cur, err := collection.Find(context.Background(), bson.D{}, &options.FindOptions{

        BatchSize: &bs, NoCursorTimeout: &b})

    if err != nil {

        log.Fatal(err)

    }

    defer cur.Close(ctx)


    s, n := runningtime("retrive db from mongo and publish to kafka")

    count := 0

    for cur.Next(ctx) {

        var result bson.M

        err := cur.Decode(&result)

        if err != nil {

            log.Fatal(err)

        }


        bytes, err := json.Marshal(result)

        if err != nil {

            log.Fatal(err)

        }

        count++


        msg := &sarama.ProducerMessage{

            Topic: "hello",

            // Key:   sarama.StringEncoder("aKey"),

            Value: sarama.ByteEncoder(bytes),

        }

        asyncProducer.Input() <- msg

    }



但是该程序仅检索大约 600,000 条记录,而不是每次运行该程序时的 2,000,000 条。


$ go run main.go

done

count = 605426

nErrors = 0

2020/09/18 11:23:43 End:         retrive db from mongo and publish to kafka took 10.080603336s

我不知道为什么?我想检索所有 2,000,000 条记录。谢谢你的帮助。


RISEBY
浏览 135回答 1
1回答

慕娘9325324

您获取结果的循环可能会提前结束,因为您使用相同的ctx上下文来迭代具有 10 秒超时的结果。这意味着如果检索和处理 200 万条记录(包括连接)时间超过 10 秒,上下文将被取消,因此游标也会报告错误。请注意,设置FindOptions.NoCursorTimeout为true只是为了防止光标因不活动而超时,它不会覆盖使用的上下文的超时。使用另一个上下文来执行查询并迭代结果,一个没有超时的上下文,例如context.Background().另请注意,要构造 的选项find,请使用辅助方法,因此它可能看起来像这样简单而优雅:options.Find().SetBatchSize(10000).SetNoCursorTimeout(true)所以工作代码:ctx2 := context.Background()cur, err := collection.Find(ctx2, bson.D{},&nbsp; &nbsp; options.Find().SetBatchSize(10000).SetNoCursorTimeout(true))// ...for cur.Next(ctx2) {&nbsp; &nbsp; // ...}// Also check error after the loop:if err := cur.Err(); err != nil {&nbsp; &nbsp; log.Printf("Iterating over results failed: %v", err)}
随时随地看视频慕课网APP

相关分类

Go
我要回答