没有收到来自频道的消息

编辑:

在我添加了一小部分我正在使用的文件(7 GB)并尝试运行该程序后,我可以看到:


fatal error: all goroutines are asleep - deadlock!


goroutine 1 [chan receive]:

main.main()

    /media/developer/golang/manual/examples/sp/v2/sp.v2.go:71 +0x4a9

exit status 2

情况:

我是 GO 的新手,所以如果我的问题真的很简单,我很抱歉。


我正在尝试流式传输xml文件、拆分文档,然后在不同的 GO 例程中解析它们。


我正在使用的 XML 文件示例:


<?xml version="1.0" encoding="UTF-8"?>

<osm version="0.6" generator="CGImap 0.0.2">

    <relation id="56688" user="kmvar" uid="56190" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">

        <member type="node" ref="294942404" role=""/>

        <member type="node" ref="364933006" role=""/>

        <tag k="name" v="Küstenbus Linie 123"/>

        <tag k="network" v="VVW"/>

        <tag k="route" v="bus"/>

        <tag k="type" v="route"/>

    </relation>

    <relation id="98367" user="jdifh" uid="92834" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">

        <member type="node" ref="294942404" role=""/>

        <member type="way" ref="4579143" role=""/>

        <member type="node" ref="249673494" role=""/>

        <tag k="name" v="Küstenbus Linie 123"/>

        <tag k="network" v="VVW"/>

        <tag k="operator" v="Regionalverkehr Küste"/>

        <tag k="ref" v="123"/>

    </relation>

    <relation id="72947" user="schsu" uid="92374" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">

        <member type="node" ref="294942404" role=""/>

        <tag k="name" v="Küstenbus Linie 123"/>

        <tag k="type" v="route"/>

    </relation>

</osm>

它应该打印每个结构,但我什么也没看到。该程序只是挂起。

我测试了 streamer 和 splitter(只是在将消息发送到通道之前添加fmt.Println(rs)到函数parseRelation中)。我可以看到结构。因此,问题在于发送和接收消息。

问题:

我不知道如何解决这个问题。尝试更改频道中消息的类型(从RSstring)并且每次只发送一个字符串。但它也没有帮助(我什么也看不到)


呼如林
浏览 99回答 2
2回答

呼唤远方

首先,让我们解决这个问题:您不能逐行解析 XML。您很幸运,您的文件恰好是每行一个标签,但这不能想当然。您必须解析整个 XML 文档。通过逐行处理,您试图将<tag>and<member>推入为<relation>. 相反,使用xml.NewDecoder并让它为您处理文件。package mainimport (    "encoding/xml"    "fmt"    "os"    "log")type Osm struct {    XMLName     xml.Name    `xml:"osm"`    Relations   []Relation  `xml:"relation"`}type Relation struct {    XMLName     xml.Name    `xml:"relation"`    ID          string      `xml:"id,attr"`    User        string      `xml:"user,attr"`    Uid         string      `xml:"uid,attr"`    Members     []Member    `xml:"member"`    Tags        []Tag       `xml:"tag"`}type Member struct {    XMLName     xml.Name    `xml:"member"`    Ref         string      `xml:"ref,attr"`    Type        string      `xml:"type,attr"`    Role        string      `xml:"role,attr"`}type Tag struct {    XMLName     xml.Name    `xml:"tag"`    Key         string      `xml:"k,attr"`    Value       string      `xml:"v,attr"`}func main() {    reader, err := os.Open("test.xml")    if err != nil {        log.Fatal(err)    }    defer reader.Close()    decoder := xml.NewDecoder(reader)    osm := &Osm{}    err = decoder.Decode(&osm)    if err != nil {        log.Fatal(err)    }    fmt.Println(osm)}Osm其他结构类似于您期望的 XML 模式。decoder.Decode(&osm)应用该架构。答案的其余部分将仅涵盖通道和 goroutines 的使用。XML 部分将被删除。如果你添加一些调试语句,你会发现它parseRelation从未被调用,这意味着它channel是空的并且fmt.Println(<- channel)等待一个永远不会关闭的空通道。所以一旦你完成处理,关闭通道。  for {    p2Rc, err := reader.ReadSlice('\n')    ...  }  close(channel)现在我们得到{ [] []}5973974 次。for i := 0; i < 5973974; i++ {  fmt.Println(<- channel)}这是尝试从通道读取 5973974 次。这违背了渠道的意义。相反,使用range.for thing := range channel {    fmt.Println(thing)}现在至少它完成了!但是有一个新问题。如果它真的找到了一个东西,比如如果你改成if p2Rc[2] == 114 {,if p2Rc[2] == 32 {你会得到一个panic: send on closed channel. 这是因为parseRelation与阅读器并行运行,并且可能会在主要阅读代码完成并关闭通道后尝试写入。在关闭频道之前,您必须确保使用该频道的每个人都已完成。要解决此问题,需要进行相当大的重新设计。下面是一个简单程序的示例,该程序从文件中读取行,将它们放入通道,并让工作人员从该通道读取。func main() {    reader, err := os.Open("test.xml")    if err != nil {        log.Fatal(err)    }    defer reader.Close()    // Use the simpler bufio.Scanner    scanner := bufio.NewScanner(reader)    // A buffered channel for input    lines := make(chan string, 10)    // Work on the lines    go func() {        for line := range lines {            fmt.Println(line)        }    }()    // Read lines into the channel    for scanner.Scan() {        lines <- scanner.Text()    }    if err := scanner.Err(); err != nil {        log.Fatal(err)    }    // When main exits, channels gracefully close.}这很好用,因为main它很特殊并且在退出时会清理通道。但是如果 reader 和 writer 都是 goroutines 呢?// A buffered channel for inputlines := make(chan string, 10)// Work on the linesgo func() {    for line := range lines {        fmt.Println(line)    }}()// Read lines into the channelgo func() {    for scanner.Scan() {        lines <- scanner.Text()    }    if err := scanner.Err(); err != nil {        log.Fatal(err)    }}()空的。main在 goroutines 可以完成他们的工作之前退出并关闭通道。我们需要一种方法让我们知道main要等到处理完成。有几种方法可以做到这一点。一个是与另一个通道同步处理。// A buffered channel for inputlines := make(chan string, 10)// A channel for main to wait fordone := make(chan bool, 1)// Work on the linesgo func() {    for line := range lines {        fmt.Println(line)    }    // Indicate the worker is done    done <- true}()// Read lines into the channelgo func() {    // Explicitly close `lines` when we're done so the workers can finish    defer close(lines)    for scanner.Scan() {        lines <- scanner.Text()    }    if err := scanner.Err(); err != nil {        log.Fatal(err)    }}()// main will wait until there's something to read from `done`<-done现在main将触发 reader 和 worker goroutines 并缓冲等待done. 阅读器将填充lines直到完成阅读,然后将其关闭。同时,工作人员将在完成读取后读取lines和写入。done另一种选择是使用sync.WaitGroup。// A buffered channel for inputlines := make(chan string, 10)var wg sync.WaitGroup// Note that there is one more thing to wait forwg.Add(1)go func() {    // Tell the WaitGroup we're done    defer wg.Done()    for line := range lines {        fmt.Println(line)    }}()// Read lines into the channelgo func() {    defer close(lines)    for scanner.Scan() {        lines <- scanner.Text()    }    if err := scanner.Err(); err != nil {        log.Fatal(err)    }}()// Wait until everything in the WaitGroup is donewg.Wait()和以前一样,main启动 reader 和 worker goroutine,但现在它在启动 worker 之前将 1 添加到 WaitGroup。然后等待wg.Wait()返回。阅读器与以前一样工作,lines完成后关闭通道。工作人员现在wg.Done()在完成递减 WaitGroup 的计数并允许wg.Wait()返回时调用。每种技术都有优点和缺点。done更灵活,链条更好,如果你能把头绕在它身上会更安全。WaitGroups 更简单,更容易理解,但要求每个 goroutine 共享一个变量。如果我们想添加到这个处理链中,我们可以这样做。假设我们有一个读取行的 goroutine,一个在 XML 元素中处理它们,一个对元素执行某些操作。// A buffered channel for inputlines := make(chan []byte, 10)elements := make(chan *RS)var wg sync.WaitGroup// Worker goroutine, turn lines into RS structswg.Add(1)go func() {    defer wg.Done()    defer close(elements)    for line := range lines {        if line[2] == 32 {            fmt.Println("Begin")            fmt.Println(string(line))            fmt.Println("End")            rs := &RS{}            xml.Unmarshal(line, &rs)            elements <- rs        }    }}()// File readergo func() {    defer close(lines)    for scanner.Scan() {        lines <- scanner.Bytes()    }    if err := scanner.Err(); err != nil {        log.Fatal(err)    }}()// Element readerwg.Add(1)go func() {    defer wg.Done()    for element := range elements {        fmt.Println(element)    }}()wg.Wait()这会产生空结构,因为您正试图将单独的 XML 行放入表示完整标记的结构中<relationship>。但它演示了如何向链中添加更多工人。

芜湖不芜

我不知道这是否对您有帮助,但您的条件if p2Rc[2]==114永远不会满足,然后您继续并开始收听频道。从不接收输入。我认为这里的主要问题是这个(代码)是做什么的?上述条件属于该过程的何处?如果这很清楚,我可以更新一个更好的回应。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go