func ReadSource(reader io.Reader) <-chan int {
out := make(chan int)
go func() {
buffer := make([]byte, 8)
for {
n, err := reader.Read(buffer)
if n > 0 {
v := int(binary.BigEndian.Uint64(buffer))
out <- v
}
if err != nil {
break
}
}
close(out)
}()
return out
}
func WriteSink(writer io.Writer, in <-chan int) {
for v := range in {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(v))
writer.Write(buffer)
}
}
func RandomSource(count int) <-chan int{
out := make(chan int)
go func() {
for i := 0; i < count; i++ {
out <- rand.Int()
}
close(out)
}()
return out
}func main() {
const filename = "large.in"
const n = 100000000
file, err := os.Create(filename)
if err != nil {
panic(err)
}
defer file.Close()
p := pipeline.RandomSource(n)
writer := bufio.NewWriter(file)
pipeline.WriteSink(writer, p)
writer.Flush()
file, err = os.Open(filename)
if err != nil {
panic(err)
}
defer file.Close()
p = pipeline.ReadSource(bufio.NewReader(file))
count := 0
for v := range p {
fmt.Println(v)
count++
if count >= 100 {
break
}
}
}
更多节点,使用bufferio这个库包装一下加快运行速度