该逻辑的代码片段会有所帮助。您可以使用Mongo Change Streams来做到这一点。例如,要查看集合的更改,请使用以下Collection.Watch()方法 -var collection *mongo.Collection// specify a pipeline that will only match "insert" events// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documentsmatchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)if err != nil { log.Fatal(err)}// print out all change stream events in the order they're received// see the mongo.ChangeStream documentation for more examples of using change streamsfor changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) // NewConsumer}然后创建一个新的消费者或者.SubscribeTopics()在你更新你的集合并且它符合你的标准时调用