Go中的“显式关闭”mgo MongoDB

我有一个运行 10 个 Web 服务(同类)的 Docker 集群。他们都在使用 MongoDB,其中包括数据持久性。


这是在main()服务启动时从 调用的代码:


// Init establishes a connection with MongoDB instance.

func Init(mongoURL string) *mgo.Session {

    mongo, err := mgo.Dial(mongoURL)

    misc.PanicIf(err)


    // make sure we are strongly consistent

    mongo.SetMode(mgo.Strong, true)


    // update global state

    db = mongo

    Entries = db.DB("").C("entries")

    Channels = db.DB("").C("channels")

    Settings = db.DB("").C("settings")

    Metadata = db.DB("").C("metadata")


    // only use this on first load, to confirm the settings are there

    // every refresh should be done via UpdateGlobalSettings (thread-safe)

    GlobalSettings = &GlobalSettingsStruct{}

    GlobalSettings.Init()


    return mongo

}

所以基本上API和工人只需使用全局变量,例如Entries,Settings等等。


运行一段时间后,该服务停止正常工作。每蒙戈动作,比如Entries.find(...)回报的错误:Closed Explicitly。


这意味着什么?我应该定期刷新 mongoDB 连接还是应该关闭它并重新建立与每个请求的连接?


该应用程序以性能为导向,因此尽管 mongo 连接已关闭,但一切仍在运行,因为一切都在内存或集群缓存上运行。我不想做一些延迟处理的愚蠢事情。


心有法竹
浏览 299回答 3
3回答

12345678_0001

当sess.SetPoolLimit(50)不使用时mgo,在 10.000 个并发连接等压力下会发生许多错误。当我限制池错误就消失了。我在下面为这个问题创建了一个测试用例源代码,所以你可以很容易地在自己的机器上测试它。你对这种行为有什么建议吗,我想听听。package mainimport (&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "time"&nbsp; &nbsp; // you can also use original go-mgo/mgo here as well&nbsp; &nbsp; mgo "github.com/globalsign/mgo"&nbsp; &nbsp; "github.com/globalsign/mgo/bson")// TODO: put some records into db first://// use testapi// db.competitions.insert([//&nbsp; &nbsp;{game_id: 1, game_name: "foo"},//&nbsp; &nbsp;{game_id: 2, game_name: "bar"},//&nbsp; &nbsp;{game_id: 3, game_name: "jazz"}// ])// NOTE: you might want to increase this depending on your machine power//&nbsp; &nbsp; &nbsp; &nbsp;mine is://&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MacBook (Retina, 12-inch, Early 2015)//&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;1,2 GHz Intel Core M//&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;8 GB 1600 MHz DDR3const ops = 10000type m bson.Mfunc main() {&nbsp; &nbsp; sess, err := mgo.DialWithTimeout("localhost", time.Second)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; panic(err)&nbsp; &nbsp; }&nbsp; &nbsp; defer sess.Close()&nbsp; &nbsp; // NOTE: without this setting there are many errors&nbsp; &nbsp; //&nbsp; &nbsp; &nbsp; &nbsp;see the end of the file&nbsp; &nbsp; // setting pool limit prevents most of the timeouts&nbsp; &nbsp; // sess.SetPoolLimit(50)&nbsp; &nbsp; // sess.SetSocketTimeout(60 * time.Second)&nbsp; &nbsp; sess.SetMode(mgo.Monotonic, true)&nbsp; &nbsp; time.Sleep(time.Second)&nbsp; &nbsp; done := make(chan bool, ops)&nbsp; &nbsp; for i := 0; i < ops; i++ {&nbsp; &nbsp; &nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; defer func() { done <- true }()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; result, err := fetchSomething(sess)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("ERR: %s\n", err)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Printf("RESULT: %+v\n", result)&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; }&nbsp; &nbsp; for i := 0; i < ops; i++ {&nbsp; &nbsp; &nbsp; &nbsp; <-done&nbsp; &nbsp; }}func fetchSomething(sess *mgo.Session) ([]m, error) {&nbsp; &nbsp; s := sess.Copy()&nbsp; &nbsp; defer s.Close()&nbsp; &nbsp; result := []m{}&nbsp; &nbsp; group := m{&nbsp; &nbsp; &nbsp; &nbsp; "$group": m{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "_id": m{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "id":&nbsp; &nbsp;"$game_id",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "name": "$game_name",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp; project := m{&nbsp; &nbsp; &nbsp; &nbsp; "$project": m{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "_id":&nbsp; "$_id.id",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "name": "$_id.name",&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp; sort := m{&nbsp; &nbsp; &nbsp; &nbsp; "$sort": m{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "_id": 1,&nbsp; &nbsp; &nbsp; &nbsp; },&nbsp; &nbsp; }&nbsp; &nbsp; err := col(s, "competitions").Pipe([]m{group, project, sort}).All(&result)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return nil, err&nbsp; &nbsp; }&nbsp; &nbsp; return result, nil}// col is a helper for selecting a columnfunc col(sess *mgo.Session, name string) *mgo.Collection {&nbsp; &nbsp; return sess.DB("testapi").C(name)}/*ERRORS WITHOUT sess.SetPoolLimit(50)$ go run socket_error.goRESULT: [map[name:foo _id:1] map[_id:2 name:bar] map[_id:3 name:jazz]]ERR: read tcp 127.0.0.1:52918->127.0.0.1:27017: read: connection reset by peerERR: write tcp 127.0.0.1:52084->127.0.0.1:27017: write: broken pipeRESULT: []RESULT: []ERR: write tcp 127.0.0.1:53627->127.0.0.1:27017: write: broken pipeERR: write tcp 127.0.0.1:53627->127.0.0.1:27017: write: broken pipeRESULT: []ERR: write tcp 127.0.0.1:53627->127.0.0.1:27017: write: broken pipeRESULT: []ERR: write tcp 127.0.0.1:53627->127.0.0.1:27017: write: broken pipeRESULT: []ERR: write tcp 127.0.0.1:53627->127.0.0.1:27017: write: broken pipeRESULT: []ERR: write tcp 127.0.0.1:53627->127.0.0.1:27017: write: broken pipeRESULT: []RESULT: []ERR: read tcp 127.0.0.1:53627->127.0.0.1:27017: read: connection reset by peerRESULT: []ERR: resetNonce: write tcp 127.0.0.1:53625->127.0.0.1:27017: write: broken pipeRESULT: []RESULT: [map[name:foo _id:1] map[_id:2 name:bar] map[_id:3 name:jazz]]ERR: resetNonce: write tcp 127.0.0.1:54591->127.0.0.1:27017: write: broken pipeRESULT: []......*/

蛊毒传说

那没问题。而是用于Session.Copy创建新的连接实例,而不是直接使用返回的会话。golang mongodb 驱动包里面有连接池。

慕哥9229398

首先尝试在 mgo 中启用调试模式以获取有关正在发生的事情的更多信息。我想服务器在一段时间不活动后断开连接。在任何情况下,通常的方法是在每个请求处理(使用中间件)开始时做一个 mgoDial然后Copy连接。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go