并行执行 DynamoDB 查询(全局二级索引的 BatchGetItems)

当查询在 GSI 上运行时,这里的想法是并行运行多个 DynamoDB 查询。截至目前,BatchGetItems不支持查询索引,推荐的方法是并行查询数据。我正在使用带有 wg 的 go routines 来并行处理例程的执行。

该函数的输入是一个带有 ID 的字符串数组,输出是 ID 的属性。

在本地运行该函数时没有问题,但是在AWS-Lambda上运行该函数时,返回的数据不断增长;

IE; 输入 2 项应输出 2 项。如果函数在 AWS-Lambda 上测试,

  • 函数第一次返回 2 个项目

  • 第二次返回 4 个项目(相同的项目重复 2 次)

  • 第三次它返回 6 个项目(相同的项目重复 4 次)

等等。这是代码片段。每次运行 lambda 时,是否有什么没有正确处理让 lambda 输出额外的数据集?

package main


import (

    "context"

    "fmt"

    "os"

    "sync"

    "github.com/aws/aws-lambda-go/lambda"

    "github.com/aws/aws-sdk-go/aws"

    "github.com/aws/aws-sdk-go/aws/session"

    "github.com/aws/aws-sdk-go/service/dynamodb"

    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"

)


//Final Output Interface

var bulkOutput []interface{}


func exitWithError(err error) {

    fmt.Fprintln(os.Stderr, err)

    os.Exit(1)

}


//LambdaInputJSON input for the lambda handler

type LambdaInputJSON struct {

    Ids      []string `json:"ids,omitempty"`

}


//HandleRequest : Lambda entry point

func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {

    return DynamoDBBatchGetRecords(data), nil

}


func main() {

    lambda.Start(HandleRequest)

}


func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {


    var wg sync.WaitGroup

    var mutex = &sync.Mutex{}


    iterations := len(a.Ids)

    wg.Add(iterations)

    for i := 0; i < iterations; i++ {

        go QueryOutput(a.Ids[i], &wg, mutex)

    }


    wg.Wait()

    return bulkOutput


}


//QueryOutput GoRoutine

func QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {

    var outputData []interface{}

    defer wg.Done()

    sess, err := session.NewSession(&aws.Config{

        Region: aws.String("aws-region"),

    })

    if err != nil {

        exitWithError(fmt.Errorf("failed to make Query API call, %v", err))

    }


当年话下
浏览 203回答 1
1回答

繁花不似锦

根据文档,全局变量独立于您的 Lambda 函数的处理程序代码。这导致缓冲区随着时间的推移而增加。纠正后的参考粘贴在下面。package mainimport (    "context"    "fmt"    "os"    "sync"    "github.com/aws/aws-lambda-go/lambda"    "github.com/aws/aws-sdk-go/aws"    "github.com/aws/aws-sdk-go/aws/session"    "github.com/aws/aws-sdk-go/service/dynamodb"    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute")func exitWithError(err error) {    fmt.Fprintln(os.Stderr, err)    os.Exit(1)}//HandleRequest : Lambda entry pointfunc HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {    output := DynamoDBBatchGetRecords(data)    return output, nil}func main() {    lambda.Start(HandleRequest)}func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {    var dataOut []interface{}    var wg = &sync.WaitGroup{}    var mtx = &sync.Mutex{}    iterations := len(a.Ids)    wg.Add(iterations)    for i := 0; i < i; i++ {        go func(i int) {            defer wg.Done()            var outputData []interface{}            sess, err := session.NewSession(&aws.Config{                Region: aws.String("aws-region"),            })            if err != nil {                exitWithError(fmt.Errorf("failed to make Query API call, %v", err))            }            ddb := dynamodb.New(sess)            queryInput := &dynamodb.QueryInput{                Limit:            aws.Int64(1),                TableName:        aws.String("table"),                IndexName:        aws.String("index"),                ScanIndexForward: aws.Bool(false),                ConsistentRead: aws.Bool(false),                KeyConditions: map[string]*dynamodb.Condition{                    "index-column": {                        ComparisonOperator: aws.String("EQ"),                        AttributeValueList: []*dynamodb.AttributeValue{                            {                                S: aws.String(a.Ids[i]),                            },                        },                    },                },            }            output, err := ddb.Query(queryInput)            if err != nil {                exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))            }            err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)            if err != nil {                exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))            }            mtx.Lock()            dataOut = append(dataOut, outputData[0])            mtx.Unlock()        }(i)    }    wg.Wait()    return dataOut}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go