func createPipeline( filename string, fileSize, chunkCount int) <- chan int{ chunkSize := fileSize/chunkCount sortResults := [] <-chan int{} for i:= 0; i< chunkCount;i++{ file, err := os.Open(filename)//每循环一次就open一次file if err != nil{ panic(err) } file.Seek(int64(i*chunkSize),0) source := zzpipeline.ReaderSource( bufio.NewReader(file), chunkSize) sortResults = append(sortResults, zzpipeline.InMemSort(source)) //此处在一次循环结束时,数据已经读出并保存在sortResults中了, // 为什么后面紧接着file.Close()会导致整个数据都读不出来了呢? file.Close() //close与open都在循环体内 } return zzpipeline.MergeN(sortResults...) }
11分45秒的时候老师解释了,createPipeline只是创建了pipeline,也就是把一个文件分成了chunkCount个小快,各放上了一个channel,但是此时channel还是阻塞的,因为要等触发了ReadSource以后才这些通道才会开始不断传输。因此在CreatePipeline里面是不能file.close的,需要把这些file句柄返回出来,由外面close。老师说因为不想把这个示例搞的太复杂,所以就留下了这个坑。
若要加bufio就要把open移到循环体里来
func createPipeline(filename string, fileSize ,
chunkSize int) ( <-chan int){
readSize := fileSize/chunkSize
pipeline.Init()
sources := [] <-chan int{}
for i := 0; i < chunkSize; i++ {
file, err := os.Open(filename)
if err != nil{
panic(err)
}
file.Seek(int64(i*readSize),0)
source := pipeline.ReadSource(bufio.NewReader(file), readSize)
sort := pipeline.InMemSort(source)
sources = append(sources, sort)
fmt.Printf("n:%d\n", i)
}
return pipeline.MergeN(sources...)
}
func createPipeline(filename string, fileSize ,
chunkSize int) ( <-chan int, *os.File){
file, err := os.Open(filename)
if err != nil{
panic(err)
}
readSize := fileSize/chunkSize
sources := [] <-chan int{}
for i := 0; i < chunkSize; i++ {
source := pipeline.ReadSource(file, readSize)
sort := pipeline.InMemSort(source)
sources = append(sources, sort)
}
return pipeline.MergeN(sources...), file
} //这样就把file传出去close了,看了不点个赞吗?
这样加defer 不可靠还体现在:若是在循环后sleep个几秒钟,最终处理到560m左右的数据,file就会被关闭。如果在循环体里打印的话,可以发现这4次循环瞬间就完成了。而数据还没开始处理。所以不能在生产完管道后就立马关闭file,必须等所有事情处理完才能file.Close()。所以不能在函数内关闭,要把[]*File传到外面去关闭。
defer file.Close()并不可靠,它会运行一次只允许处理12k数据下一次处理8k数据,这样交替。但我们有800M 的数据。所以只能把file传到外面等事情做完之后,再close掉。
这要从生命周期来考虑,这个函数只是生成pipeline,管道还没开始运行,就把file关闭了,当然就取不出数据啊。
在一次循环结束时,数据并没有读出并保存在sortResults中。回头看看zzpipeline.ReaderSource就知道了, 它也只是搭了个管道。但是貌似go的编译器很聪明,改成defer file.Close()是可以的。
能想到的大概两个原因:
每次都开关是有性能损耗的,打开和关闭需要耗时;但这个并不是关键的。
每次需要存储读到的位置,下次打开时,还要从上次的地方开始读取;这会增加程序的复杂读,这里跟数据库的长连接情况方便查询不一样。 打开一次足够的。