带有 flask 或 aiohttp 服务器的 asyncio 应用程序

我改进了去年的应用程序。一开始有两个不同的 python 应用程序——第一个用于统计数据,第二个——带有 GET 请求的网络服务器 gunicorn+flask。(这两种服务都在 centos 中)Statistics 进行计数并将所有内容存储在 Postgres 中。Web 服务器连接到该 Postgres 数据库并响应 GET 请求。


在重写的版本中,我使用 pandas 框架进行了所有统计,现在我想将这两个应用程序合并为一个。我使用 asyncio 来获取数据和计数统计。一切正常,现在我要添加 Web 服务器以响应 GET。


部分代码:


import asyncio

from contextlib import closing

import db_cl, tks_db

from formdf_cl import FormatDF


getinfofromtks = tks_db.TKS() # Class object to connect to third party database

formatdf = FormatDF() # counting class object, that stores some data

dbb = db_cl.MyDatabase('mydb.ini') # Class object to connect to my database



async def get_some_data():

    # getting information from third party database every 5 seconds.

    await asyncio.sleep(5)

    ans_inc, ans_out = getinfofromtks.getdf()

    return ans_inc, ans_out # two huge dataframes in pandas



async def process(ans_inc, ans_out):

    # counting data on CPU

    await asyncio.sleep(0)

    formatdf.leftjoin(ans_inc, ans_out)

    # storing statistics in my Database

    dbb.query_database('INSERT INTO statistic (timestamp, outgoing, incoming, stats) values (%s, %s,%s, %s)',

                       formatdf.make_count())

    dbb.commit_query()



async def main():

    while True:

        ans_inc, ans_out = await get_some_data()  # blocking, get data from third party database

        asyncio.ensure_future(process(ans_inc, ans_out))  # computing




if __name__ == "__main__":

    with closing(asyncio.get_event_loop()) as event_loop:

        event_loop.run_until_complete(main())

现在我希望将 http 服务器添加为线程应用程序(使用 flask 或 aiohttp),它将使用类对象“formatdf”中的参数响应 GET 请求。包含这些功能的最佳方式是什么?


ITMISS
浏览 220回答 2
2回答

胡说叔叔

我设法添加了一个 http 服务器作为协程。首先我尝试使用 aiohttp,但最终我找到了 Quart(与 Flask 相同,但它使用 Asyncio)。在 Quart 上运行 http 服务器的示例代码:import quartfrom quart import requestimport jsonimport timeapp = quart.Quart(__name__)def resp(code, data):    return quart.Response(        status=code,        mimetype="application/json",        response=to_json(data)    )def to_json(data):    return json.dumps(data) + "\n"@app.route('/')def root():    return quart.redirect('/api/status2')@app.errorhandler(400)def page_not_found(e):    return resp(400, {})@app.errorhandler(404)def page_not_found(e):    return resp(400, {})@app.errorhandler(405)def page_not_found(e):    return resp(405, {})@app.route('/api/status2', methods=['GET'])def get_status():    timestamp = request.args.get("timestamp")    delay = request.args.get("delay")    if timestamp:        return resp(200, {"time": time.time()})    elif delay:        return resp(200, {"test is:": '1'})    else:        return resp(200, {"", "ask me about time"})if __name__ == "__main__":    app.run(debug=True, host='0.0.0.0', port=5000)为了将此代码添加为协同程序,我使用await asyncio.gather()并使用了 app.run_task 而不是 app.run。像这样更改了我的问题中的代码:async def launcher_main():    while True:        ans_inc, ans_out = await get_some_data()        asyncio.ensure_future(process(ans_inc, ans_out))async def main():    await asyncio.gather(launcher_main(),                         restapi_quart.app.run_task(debug=True, host='0.0.0.0', port=5000))剩下的最后一个问题是使“formatdf”类对象的可用参数到我的 http 服务器。我已经实现了Tests.restapi_quart.app.config["formatdf"] = formatdf向 process(...) 函数添加行。从 quart 调用它:elif button:    return resp(200, {"ok": app.config["formatdf"].attr})

精慕HU

我只需要为我的应用程序解决这个问题。这是在现有异步应用程序中运行 aiohttp 服务器的方法。https://docs.aiohttp.org/en/stable/web_lowlevel.htmlimport asynciofrom aiohttp import webasync def web_server():    print(f'Configuring Web Server')    app = web.Application()    app.add_routes([web.get('/hello', web_hello)])    runner = web.AppRunner(app)    await runner.setup()    site = web.TCPSite(runner)        print(f'Starting Web Server')    await site.start()    print(f'Web Server Started')    #run forever and ever    await asyncio.sleep(100*3600)async def web_hello(request):    return web.Response(text="Hello World")async def main():    tasks = []    #Web Server    tasks.append(asyncio.create_task(web_server()))    results = await asyncio.gather(*tasks)if __name__ == '__main__':    asyncio.run(main())
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python