猿问

golang生产者消费者接收到的消息数

我在 golang 中编写了生产者-消费者模式。读取多个 csv 文件并处理记录。我正在一口气读取 csv 文件的所有记录。


我想以包括所有 csv 文件在内的总记录的 5% 的间隔记录处理完成的百分比。例如,我有 3 个 csv 要处理,每个有 20、30、50 行/记录(因此总共要处理 100 条记录)想要在处理 5 条记录时记录进度。


func processData(inputCSVFiles []string) {

    producerCount := len(inputCSVFiles)

    consumerCount := producerCount


    link := make(chan []string, 100)

    wp := &sync.WaitGroup{}

    wc := &sync.WaitGroup{}


    wp.Add(producerCount)

    wc.Add(consumerCount)


    for i := 0; i < producerCount; i++ {

        go produce(link, inputCSVFiles[i], wp)

    }


    for i := 0; i < consumerCount; i++ {

        go consume(link, wc)

    }

    wp.Wait()

    close(link)

    wc.Wait()

    fmt.Println("Completed data migration process for all CSV data files.")

}


func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {

    defer wg.Done()

    records := readCsvFile(filePath)

    totalNumberOfRecords := len(records)

    for _, record := range records {

        link <- record

    }

}


func consume(link <-chan []string, wg *sync.WaitGroup) {

    defer wg.Done()

    for record := range link {

        // process csv record

    }

}


开满天机
浏览 103回答 1
1回答

浮云间

我使用了原子变量和计数器通道,其中消费者将在处理记录时推送计数,其他 goroutine 将从通道中读取并计算总处理记录百分比。var progressPercentageStep float64 = 5.0var totalRecordsToProcess int32func processData(inputCSVFiles []string) {&nbsp; &nbsp; &nbsp; &nbsp; producerCount := len(inputCSVFiles)&nbsp; &nbsp; &nbsp; &nbsp; consumerCount := producerCount&nbsp; &nbsp; &nbsp; &nbsp; link := make(chan []string, 100)&nbsp; &nbsp; &nbsp; &nbsp; counter := make(chan int, 100)&nbsp; &nbsp; &nbsp; &nbsp; defer close(counter)&nbsp; &nbsp; &nbsp; &nbsp; wp := &sync.WaitGroup{}&nbsp; &nbsp; &nbsp; &nbsp; wc := &sync.WaitGroup{}&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; wp.Add(producerCount)&nbsp; &nbsp; &nbsp; &nbsp; wc.Add(consumerCount)&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; for i := 0; i < producerCount; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go produce(link, inputCSVFiles[i], wp)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; go progressStats(counter)&nbsp; &nbsp; &nbsp; &nbsp; for i := 0; i < consumerCount; i++ {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; go consume(link, wc)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; wp.Wait()&nbsp; &nbsp; &nbsp; &nbsp; close(link)&nbsp; &nbsp; &nbsp; &nbsp; wc.Wait()&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; func produce(link chan<- []string, filePath string, wg *sync.WaitGroup) {&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; records := readCsvFile(filePath)&nbsp; &nbsp; &nbsp; &nbsp; atomic.AddInt32(&totalRecordsToProcess, int32(len(records)))&nbsp; &nbsp; &nbsp; &nbsp; for _, record := range records {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; link <- record&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; func consume(link <-chan []string,counter chan<- int, wg *sync.WaitGroup) {&nbsp; &nbsp; &nbsp; &nbsp; defer wg.Done()&nbsp; &nbsp; &nbsp; &nbsp; for record := range link {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // process csv record&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; counter <- 1&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp;&nbsp;func progressStats(counter <-chan int) {&nbsp; &nbsp; var feedbackThreshold = progressPercentageStep&nbsp; &nbsp; for count := range counter {&nbsp; &nbsp; &nbsp; &nbsp; totalRemaining := atomic.AddInt32(&totalRecordsToProcess, -count)&nbsp; &nbsp; &nbsp; &nbsp; donePercent := 100.0 * processed / totalRemaining&nbsp; &nbsp; &nbsp; &nbsp; // log progress&nbsp; &nbsp; &nbsp; &nbsp; if donePercent >= feedbackThreshold {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Printf("Progress ************** Total Records: %d, Processed Records : %d, Processed Percentage: %.2f **************\n", totalRecordsToProcess, processed, donePercent)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; feedbackThreshold += progressPercentageStep&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}
随时随地看视频慕课网APP

相关分类

Go
我要回答