Go elasticsearch从传入的脉冲星插入批量数据

我必须使用 goelastic 库从即将到来的脉冲星中大量插入数据。但我有一个问题。

首先,pulsar 每部分批量发送 1000 个数据。然后当我插入松紧带时,有时会出现问题。附上这个问题。此问题会导致数据丢失。谢谢解答... ERROR: circuit_breaking_exception: [parent] Data too large, data for [indices:data/write/bulk[s]] would be [524374312/500mb], which is larger than the limit of [510027366/486.3mb], real usage: [524323448/500mb], new bytes reserved: [50864/49.6kb], usages [request=0/0b, fielddata=160771183/153.3mb, in_flight_requests=50864/49.6kb, model_inference=0/0b, eql_sequence=0/0b, accounting=6898128/6.5mb]

这部分是批量代码。


func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) {

    fmt.Println("------------------")

    bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{

        Index:      enum.IndexName,

        Client:     ElasticStruct.Client,

        FlushBytes: 10e+6,

    })

    if err != nil {

        panic(err)

    }

    start := time.Now().UTC()


    for _, x := range y {

        data, err := json.Marshal(x)

        if err != nil {

            panic(err)

        }

        err = bi.Add(

            context.Background(),

            esutil.BulkIndexerItem{

                Action: "index",


                Body: bytes.NewReader(data),


                OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {

                    i++

                },


                OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {

                    if err != nil {

                        log.Printf("ERROR: %s", err)

                    } else {

                        log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)

                    }

                },

            },

        )

        if err != nil {

            log.Fatalf("Unexpected error: %s", err)

        }

        x++


    }

    if err := bi.Close(context.Background()); err != nil {

        log.Fatalf("Unexpected error: %s", err)

    }

    


汪汪一只猫
浏览 188回答 1
1回答

紫衣仙女

Tldr;您的节点上似乎没有足够的 JVM 堆。您正在使用断路器以避免 Elasticsearch 内存不足 (OOM)。解决方案增加 JVM 内存,您会在此处找到一些文档来调整节点大小。较小的批量请求
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go