golang 并发问题

allMap存储的是一个任务列表,KEY标记了这个任务类型,Value对应的是任务的参数,现在我需要并发处理这些任务。开发过程中使用了如下两种方法,效果并不好,感觉自己没有领会到golang并发处理的思想;下面是我的几点体会和疑惑,希望得到各位大神的指导。
方式一
//allMap中存储了任务列表
//Task定义如下
typeTaskstruct{
Paramsinterface{}
ResultChanchan[]byte
//Wg*sync.WaitGroup
}
Params是参数,ResultChan是处理完毕之后,将结果写入到ResultChan中;
//并发处理任务
forkey,value:=rangeallMap{
gofunc(kstring,vinterface{}){
log.Debug("k:",k)
ifk==tools.REQUEST_BAOJIE{
//A
log.Debug("baojieelemlen:",len(value))
one_task=&service.Task{
Params:v,
ResultChan:make(chan[]byte,len(value)),
//Wg:new(sync.WaitGroup),
}
//B
log.Debugf("1one_task:%+v",one_task)
//AddTask函数逻辑会处理one_task,处理完毕之后,将结果写入到one_task结构体的ResultChan字段;
service.AddTask(one_task)
}elseifk==tools.REQUEST{
}
}(key,value)
}
//C
log.Debugf("2one_task:%+v",one_task)
//接收结果
gofunc(){
foritem:=rangeone_task.ResultChan{
log.Debug("ReceivedataFromResultChan:",string(item))
}
log.Debug("Process",tools.REQUEST_BAOJIE,"end")
}()
这种方式的弊端,太依赖程序执行的先后顺序了,测试的过程中,发现当C发生在A和B之前时,会使接收结果goroutinue访问ResultChan成员发生奔溃,因为此时ResultChan还没有申请空间。
方案一解决方案:service.AddTask(one_task)函数再加一个参数,chan<-interface{},AddTask处理完之后,将结果写入到这个通道里面,接收结果协程监听该通道,然后读取结果。
方式二
延迟并发时机
fork,v:=rangeallMap{
//gofunc(kstring,vinterface{}){
log.Debug("k:",k)
ifk==tools.REQUEST{
//A
log.Debug("baojieelemlen:",len(v))
one_task=&service.Task{
Params:v,
ResultChan:make(chan[]byte,len(v)),
//Wg:new(sync.WaitGroup),
}
//B
log.Debugf("1one_task:%+v",one_task)
goservice.AddTask(one_task)
}elseifk==tools.REQUEST_TCP{
}
//}(key,value)
}
//C
log.Debugf("2one_task:%+v",one_task)
//接收结果
gofunc(){
foritem:=rangeone_task.ResultChan{
log.Debug("ReceivedataFromResultChan:",string(item))
}
log.Debug("Process",tools.REQUEST_BAOJIE,"end")
}()
这样,就保证了C必须发生在A、B之后,这样一来,ResultChan一定先初始化了,等待AddTask后面的协程往里面写入数据,接收结果协程就会读取出来。
问题1
问题来了,既然方式一存在问题,那么方式二中是否在效率上有何弊端呢?
我这样写并发的逻辑是否有问题?
问题2
这种思想是否可取
vartaskTask;
//提交任务线程
forkey,value:=rangeallMap{
task:=Task{
params:value,
result:make(chaninterface{},len(value)),//value是一个list
}
goprocessOneByOne(key,value)//这种方式是不是开启了很多协程?len(allmap)
}
//取结果
forresult:=rangetask.result{
//getresultfromchann
//todo
}
``
##问题3
计划使用一个全局的chan,processOneByOne业务函数处理完毕之后,将结果写到该chan中,然后监听这个chann,从chann中获取结果
处理流程大致:
demo.go
funcTodoWork(){
gofunc(){
forkey,value:=rangeallMap{
processOneByOne(key,value)
}
}()
foritem:=rangetask.ResultChan{
//问题一、这里如何保证item就是上面那个keyvalue的结果,而不是其他的KEY、value对应的结果
//问题二、当TodoWork在多进程环境下面时,是否存在竞争问题?
println(item)
}
}
task.go
var(
ResultChanchaninterface{}
)
funcinit(){
ResultChan=make(chaninterface{},100)
}
funcprocessOneByOne(keystring,valueinterface{}){
//处理任务
//....
//写入结果
//问题三、怎么关闭ResultChan,如果不关闭,是不是goroutine泄漏问题啊?
ResultChan<-"HelloWorld"
}
###问题描述
###问题出现的环境背景及自己尝试过哪些方法
###相关代码
//请把代码文本粘贴到下方(请勿用图片代替代码)
###你期待的结果是什么?实际看到的错误信息又是什么?
###题目描述
###题目来源及自己的思路
###相关代码
//请把代码文本粘贴到下方(请勿用图片代替代码)
###你期待的结果是什么?实际看到的错误信息又是什么?
###问题描述
###问题出现的环境背景及自己尝试过哪些方法
###相关代码
//请把代码文本粘贴到下方(请勿用图片代替代码)
###你期待的结果是什么?实际看到的错误信息又是什么?
慕莱坞森
浏览 564回答 2
2回答

胡说叔叔

想太多了,怕同时起多了协程,可以定义协程长度,更甚至采用协程池。简单写个实现:typetaskstruct{namestrigparamsinterface{}result[]byte}vargt=make(chanint,10)//同时只允许10个task一起执行vartkr=make(chantask,10)gofunc(){for_,t:=rangeallTaskMap{gt
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

JavaScript