func ReadSource(reader io.Reader) <-chan int { out := make(chan int) go func() { buffer := make([]byte, 8) for { n, err := reader.Read(buffer) if n > 0 { v := int(binary.BigEndian.Uint64(buffer)) out <- v } if err != nil { break } } close(out) }() return out } func WriteSink(writer io.Writer, in <-chan int) { for v := range in { buffer := make([]byte, 8) binary.BigEndian.PutUint64(buffer, uint64(v)) writer.Write(buffer) } } func RandomSource(count int) <-chan int{ out := make(chan int) go func() { for i := 0; i < count; i++ { out <- rand.Int() } close(out) }() return out }
func main() { const filename = "large.in" const n = 100000000 file, err := os.Create(filename) if err != nil { panic(err) } defer file.Close() p := pipeline.RandomSource(n) writer := bufio.NewWriter(file) pipeline.WriteSink(writer, p) writer.Flush() file, err = os.Open(filename) if err != nil { panic(err) } defer file.Close() p = pipeline.ReadSource(bufio.NewReader(file)) count := 0 for v := range p { fmt.Println(v) count++ if count >= 100 { break } } }
更多节点,使用bufferio这个库包装一下加快运行速度