猿问

如何让 Flask 网页(路由)在另一个网页(路由)上后台运行

所以我正在创建这个应用程序,它的一部分是一个网页,其中交易算法正在使用实时数据进行自我测试。所有这些都有效,但问题是如果我离开(退出)网页,它就会停止。我想知道如何让它无限期地在后台运行,因为我希望算法继续做它的事情。


这是我想在后台运行的路线。


@app.route('/live-data-source')

def live_data_source():

    def get_live_data():

        live_options = lo.Options()

        while True:

            live_options.run()

            live_options.update_strategy()

            trades = live_options.get_all_option_trades()

            trades = trades[0]

            json_data = json.dumps(

                {'data': trades})

            yield f"data:{json_data}\n\n"

            time.sleep(5)


    return Response(get_live_data(), mimetype='text/event-stream')

我研究过多线程,但不太确定这是否适合这项工作。我对烧瓶还是个新手,所以这是一个可怜的问题。如果您需要更多信息,请发表评论。


梦里花落0921
浏览 146回答 1
1回答

三国纷争

您可以按照以下方式进行操作 - 下面是 100% 有效的示例。请注意,在生产中使用 Celery 来执行此类任务,或者自己编写另一个守护程序应用程序(另一个进程),并借助消息队列(例如 RabbitMQ)或公共数据库的帮助从 http 服务器向其提供任务。如果对下面的代码有任何疑问,请随时提问,这对我来说是很好的练习:from flask import Flask, current_appimport threadingfrom threading import Thread, Eventimport timefrom random import randintapp = Flask(__name__)# use the dict to store events to stop other treads# one event per thread !app.config["ThreadWorkerActive"] = dict()def do_work(e: Event):&nbsp; &nbsp; """function just for another one thread to do some work"""&nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; if e.is_set():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break&nbsp; # can be stopped from another trhead&nbsp; &nbsp; &nbsp; &nbsp; print(f"{threading.current_thread().getName()} working now ...")&nbsp; &nbsp; &nbsp; &nbsp; time.sleep(2)&nbsp; &nbsp; print(f"{threading.current_thread().getName()} was stoped ...")@app.route("/long_thread", methods=["GET"])def long_thread_task():&nbsp; &nbsp; """Allows to start a new thread"""&nbsp; &nbsp; th_name = f"Th-{randint(100000, 999999)}"&nbsp; # not really unique actually&nbsp; &nbsp; stop_event = Event()&nbsp; # is used to stop another thread&nbsp; &nbsp; th = Thread(target=do_work, args=(stop_event, ), name=th_name, daemon=True)&nbsp; &nbsp; th.start()&nbsp; &nbsp; current_app.config["ThreadWorkerActive"][th_name] = stop_event&nbsp; &nbsp; return f"{th_name} was created!"@app.route("/stop_thread/<th_id>", methods=["GET"])def stop_thread_task(th_id):&nbsp; &nbsp; th_name = f"Th-{th_id}"&nbsp; &nbsp; if th_name in current_app.config["ThreadWorkerActive"].keys():&nbsp; &nbsp; &nbsp; &nbsp; e = current_app.config["ThreadWorkerActive"].get(th_name)&nbsp; &nbsp; &nbsp; &nbsp; if e:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e.set()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; current_app.config["ThreadWorkerActive"].pop(th_name)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return f"Th-{th_id} was asked to stop"&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return "Sorry something went wrong..."&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; return f"Th-{th_id} not found"@app.route("/", methods=["GET"])def index_route():&nbsp; &nbsp; text = ("/long_thread - create another thread.&nbsp; "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "/stop_thread/th_id - stop thread with a certain id.&nbsp; "&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; f"Available Threads: {'; '.join(current_app.config['ThreadWorkerActive'].keys())}")&nbsp; &nbsp; return textif __name__ == '__main__':&nbsp; &nbsp; app.run(host="0.0.0.0", port=9999)
随时随地看视频慕课网APP

相关分类

Python
我要回答