老师,请问,为什么不能循环一次,open一次,读完数据并存储,close一次,再循环?我实验过,确实失败了

来源:3-5 完整外部排序

helloei

2018-08-20 19:24

func createPipeline(
   filename string,
   fileSize, chunkCount int)  <- chan int{
   chunkSize := fileSize/chunkCount

   sortResults := [] <-chan int{}
   for i:= 0; i< chunkCount;i++{

      file, err := os.Open(filename)//每循环一次就open一次file
      if err != nil{
         panic(err)
      }

      file.Seek(int64(i*chunkSize),0)

      source := zzpipeline.ReaderSource(
         bufio.NewReader(file), chunkSize)

      sortResults = append(sortResults,
         zzpipeline.InMemSort(source))
      //此处在一次循环结束时,数据已经读出并保存在sortResults中了,
      // 为什么后面紧接着file.Close()会导致整个数据都读不出来了呢?
      file.Close()  //close与open都在循环体内

   }
   return zzpipeline.MergeN(sortResults...)
}


写回答 关注

7回答

  • 慕粉329807310
    2020-10-28 09:16:12

    11分45秒的时候老师解释了,createPipeline只是创建了pipeline,也就是把一个文件分成了chunkCount个小快,各放上了一个channel,但是此时channel还是阻塞的,因为要等触发了ReadSource以后才这些通道才会开始不断传输。因此在CreatePipeline里面是不能file.close的,需要把这些file句柄返回出来,由外面close。老师说因为不想把这个示例搞的太复杂,所以就留下了这个坑。

  • helloei
    2019-06-18 23:31:02

    若要加bufio就要把open移到循环体里来

    func createPipeline(filename string, fileSize ,

    chunkSize int) ( <-chan int){

    readSize := fileSize/chunkSize

    pipeline.Init()

    sources := [] <-chan int{}

    for i := 0; i < chunkSize; i++ {

    file, err := os.Open(filename)

    if err != nil{

    panic(err)

    }

    file.Seek(int64(i*readSize),0)

    source := pipeline.ReadSource(bufio.NewReader(file), readSize)

    sort := pipeline.InMemSort(source)

    sources = append(sources, sort)

    fmt.Printf("n:%d\n", i)

    }

    return pipeline.MergeN(sources...)

    }


  • helloei
    2019-06-18 19:52:37

    func createPipeline(filename string, fileSize ,

    chunkSize int) ( <-chan int, *os.File){

    file, err := os.Open(filename)

    if err != nil{

    panic(err)

    }

    readSize := fileSize/chunkSize


    sources := [] <-chan int{}

    for i := 0; i < chunkSize; i++ {


    source := pipeline.ReadSource(file, readSize)

    sort := pipeline.InMemSort(source)

    sources = append(sources, sort)

    }


    return pipeline.MergeN(sources...), file

    } //这样就把file传出去close了,看了不点个赞吗?


  • helloei
    2018-10-04 15:42:13

    这样加defer 不可靠还体现在:若是在循环后sleep个几秒钟,最终处理到560m左右的数据,file就会被关闭。如果在循环体里打印的话,可以发现这4次循环瞬间就完成了。而数据还没开始处理。所以不能在生产完管道后就立马关闭file,必须等所有事情处理完才能file.Close()。所以不能在函数内关闭,要把[]*File传到外面去关闭。

  • helloei
    2018-10-04 15:15:16
    defer file.Close()并不可靠,它会运行一次只允许处理12k数据下一次处理8k数据,这样交替。但我们有800M
    的数据。所以只能把file传到外面等事情做完之后,再close掉。
  • helloei
    2018-10-04 13:28:07

    这要从生命周期来考虑,这个函数只是生成pipeline,管道还没开始运行,就把file关闭了,当然就取不出数据啊。

    在一次循环结束时,数据并没有读出并保存在sortResults中。回头看看zzpipeline.ReaderSource就知道了,
     它也只是搭了个管道。但是貌似go的编译器很聪明,改成defer file.Close()是可以的。
  • tokumi
    2018-08-22 16:11:29

    能想到的大概两个原因:

    1. 每次都开关是有性能损耗的,打开和关闭需要耗时;但这个并不是关键的。

    2. 每次需要存储读到的位置,下次打开时,还要从上次的地方开始读取;这会增加程序的复杂读,这里跟数据库的长连接情况方便查询不一样。  打开一次足够的。

搭建并行处理管道,感受GO语言魅力

通过搭建并行数据处理管道,展示go语言在并发编程方面的优势

19375 学习 · 78 问题

查看课程

相似问题