package pipeline
import (
"encoding/binary"
"io"
"sort"
)
func ArraySource(a ...int) <-chan int {
out := make(chan int)
go func() {
for _, v := range a{
out <- v
}
close(out)
}()
return out
}
func InMemSort(in <-chan int) <-chan int {
out := make(chan int)
go func() {
// Read into memory
var a []int
for v := range in {
a = append(a, v)
}
// Sort
sort.Ints(a)
// Output
for _, v := range a {
out <- v
}
close(out)
}()
return out
}
func Merge(in1, in2 <-chan int) <-chan int {
out := make(chan int)
go func() {
v1, ok1 := <-in1
v2, ok2 := <-in2
for ok1 || ok2 {
if !ok2 || (ok1 && v1 <= v2) {
out <- v1
v1, ok1 = <-in1
} else {
out <- v2
v2, ok2 = <-in2
}
}
close(out)
}()
return out
}package main
import (
"ccmouse/gointro/pipeline"
"fmt"
)
func main() {
p := pipeline.Merge(
pipeline.InMemSort(
pipeline.ArraySource(3, 2, 6, 7, 4)),
pipeline.InMemSort(
pipeline.ArraySource(7, 4, 0, 3, 2, 13, 8)))
for v := range p {
fmt.Println(v)
}
}
一些节点
数组数据源节点- channel 的关闭及检测
内部排序节点
归并节点
外部排序pipeline
一些节点1