使用高郎持续检查 API 中是否存在数据变化

我正在尝试轮询一个API以保留流量数据的时间序列,并在发生更改时将该数据保存到postgres。


目前,我有一个这样的实现


//this needs to check the api for new information every X seconds

func Poll(req *http.Request, client *http.Client) ([]byte, error) {

    r := rand.New(rand.NewSource(99))

    c := time.Tick(10 * time.Second)

    for _ = range c {

        //Download the current contents of the URL and do something with it

        response, err := client.Do(req)

        data, _ := io.ReadAll(response.Body)


        if err != nil {

            return nil, err

        }

        return data, nil

        // add a bit of jitter

        jitter := time.Duration(r.Int31n(5000)) * time.Millisecond

        time.Sleep(jitter)

    }


}




func main() {


    client := &http.Client{

        Timeout: time.Second * 60 * 60 * 600,

    }

    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"

    req, err := http.NewRequest("GET", url, nil)

    if err != nil {

        return err

    }

    req.Header.Set("Ocp-Apim-Subscription-Key", "xx")


    // response, err := client.Do(req)

    data, err := Poll(req, client)

    fmt.Println(string(data))


}

接下来,我将执行比较功能。


基本上,我正在尝试找出如何确保循环首先调用查询并返回适当的值。


我认为这个实现可能不是很好,我只是不确定如何真正正确地实现它。我能得到一些指点吗?


函数式编程
浏览 62回答 2
2回答

繁星淼淼

您的问题呈现了一个典型的生产者/消费者场景,因为您的 Poll() 函数正在生成由 main() 函数使用的响应数据(可能是在 postgres 中保存数据)。通过使用 go 例程和通道可以很好地解决此问题。轮询工作可以在一个 goroutine 中完成,该 goroutine 通过通道将响应数据传达给主函数。轮询工作时也可能出现错误(响应错误或 io 错误),因此也应将其传达给 main() 函数。首先定义一个新类型来保存轮询的数据和一个错误:type PollResponse struct {&nbsp; &nbsp; Data []byte&nbsp; &nbsp; Err error}在 Poll() 函数中,启动一个 go 例程来执行轮询工作,并返回一个通道来共享 go 例程之外的数据:func Poll(req *http.Request, client *http.Client) (ch chan PollResponse){&nbsp; &nbsp; ch = make(chan PollResponse) // Buffered channel is also good&nbsp; &nbsp; go func() {&nbsp; &nbsp; &nbsp; &nbsp; defer func() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; close(ch)&nbsp; &nbsp; &nbsp; &nbsp; }()&nbsp; &nbsp; &nbsp; &nbsp; r := rand.New(rand.NewSource(99))&nbsp; &nbsp; &nbsp; &nbsp; c := time.Tick(10 * time.Second)&nbsp; &nbsp; &nbsp; &nbsp; for range c {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res, err := client.Do(req);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pollRes := PollResponse {}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pollRes.Data, pollRes.Err = nil, err&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- pollRes&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pollRes.Data, pollRes.Err = io.ReadAll(res.Body)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ch <- pollRes&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if pollRes.Err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; jitter := time.Duration(r.Int31n(5000)) * time.Millisecond&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.Sleep(jitter)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }()&nbsp; &nbsp; return}最后在 main() 函数中,调用 Poll() 并读取通道以获得轮询响应:func main() {&nbsp; &nbsp; client := &http.Client{&nbsp; &nbsp; &nbsp; &nbsp; Timeout: time.Second * 60 * 60 * 600,&nbsp; &nbsp; }&nbsp; &nbsp; url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"&nbsp; &nbsp; req, err := http.NewRequest("GET", url, nil)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return&nbsp; &nbsp; }&nbsp; &nbsp; req.Header.Set("Ocp-Apim-Subscription-Key", "xx")&nbsp; &nbsp; pollCh := Poll(req, client)&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; for item := range pollCh {&nbsp; &nbsp; &nbsp; &nbsp; if item.Err == nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; fmt.Println(string(item.Data)) // or save it to postgres database&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp;&nbsp; &nbsp; }}

长风秋雁

在股票代码通道上的范围。在每次迭代中,获取数据,检查数据是否已更改并处理数据。关键点是从循环内部处理数据,而不是从函数返回数据。假设您具有以下函数:// procesChangedData updates the database with new// data from the API endpoint.func processChangedData(data []byte) error {&nbsp; &nbsp; // implement save to postgress}使用以下函数进行轮询:func Poll(client *http.Client) error {&nbsp; &nbsp; url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"&nbsp; &nbsp; // Use NewTicker instead of Tick so we can cleanup&nbsp; &nbsp; // ticker on return from the function.&nbsp; &nbsp; t := time.NewTicker(10 * time.Second)&nbsp; &nbsp; defer t.Stop()&nbsp; &nbsp; var prev []byte&nbsp; &nbsp; for _ = range t.C {&nbsp; &nbsp; &nbsp; &nbsp; // Create a new request objet for each request.&nbsp; &nbsp; &nbsp; &nbsp; req, err := http.NewRequest("GET", url, nil)&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; req.Header.Set("Ocp-Apim-Subscription-Key", "xx")&nbsp; &nbsp; &nbsp; &nbsp; resp, err := client.Do(req)&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Edit error handling to match application&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // requirements. I return an error here. Continuing&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the loop is also an option.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; data, err := io.ReadAll(resp.Body)&nbsp; &nbsp; &nbsp; &nbsp; // Ensure that body is closed before handling errors&nbsp; &nbsp; &nbsp; &nbsp; // below.&nbsp; &nbsp; &nbsp; &nbsp; resp.Body.Close()&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Edit error handling to match application&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // requirements. I return an error here. Continuing&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the loop is also an option.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if resp.StatusCode != http.StatusOK {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Edit error handling to match application&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // requirements. I return an error here. Continuing&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the loop is also an option.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return fmt.Errorf("bad status %d", resp.StatusCode)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; if bytes.Equal(data, prev) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; continue&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; prev = data&nbsp; &nbsp; &nbsp; &nbsp; if err := processChangedData(data); err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // Edit error handling to match application&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // requirements. I return an error here. Continuing&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // the loop is also an option.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; panic("unexpected break from loop")}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java