猿问

Pool 只执行一个线程而不是 4 个,我如何使它无限?

所以我正在开发一个小的 Python 工具来对应用程序的 API 进行压力测试。


我有一个使用线程的非常好的脚本,但后来我读到它需要手动编码来维护 n 个并发线程(意思是,旧线程完成后立即启动新线程),以及这里的建议:如何开始旧线程结束后的新线程?就是使用ThreadPool,我尝试如下:


def test_post():

    print "Executing in " + threading.currentThread().getName() + "\n"

    time.sleep(randint(1, 3))

    return randint(1, 5), "Message"



if args.send:

    code, content = post()

    print (code, "\n")

    print (content)

elif args.test:

    # Create new threads

    print threads

    results_list = []

    pool = ThreadPool(processes=threads)

    results = pool.apply_async(test_post())

    pool.close()  # Done adding tasks.

    pool.join()  # Wait for all tasks to complete.

    # results = list(pool.imap_unordered(

    #     test_post(), ()

    # ))

    # thread_list = []

    # while threading.activeCount() <= threads:

    #     thread = LoadTesting(threadID=free_threads, name="Thread-" + str(threading.activeCount()), counter=1)

    #     thread.start()

    #     thread_list.append(thread)

    print "Exiting Main Thread" + "\n"

else:

    print ("cant get here!")

当我调用脚本时,我得到一致的输出,例如:


4


在主线程中执行


退出主线程


我不知道为什么......正如你在注释掉的块中看到的那样,我尝试了不同的方法,但它仍然只执行一次。


我的目标是让脚本循环运行,始终运行 n 个线程。在test_post(分别,post)函数返回的HTTP响应代码和内容-我想以后使用该打印/停止时响应代码是不是200 OK。


胡子哥哥
浏览 202回答 1
1回答

qq_笑_17

您的第一个问题是您已经在MainThreadwith 调用中调用了您的函数:pool.apply_async(test_post())...而不是test_post作为在工作线程中执行的调用的参数传递:pool.apply_async(test_post)OP:我有一个非常好的使用线程的脚本,但后来我读到它需要手动编码来维护 n 个并发线程(意思是,旧线程完成后立即启动新线程)......您需要区分工作单元(作业、任务)和线程。首先使用池的重点是重用执行器,无论是线程还是进程。在实例化池时已经创建了工作线程,只要您不关闭池,所有初始线程都会保持活动状态。所以你不关心重新创建线程,你只需调用现有池的池方法,只要你有一些要分发的工作。Pool 接受这个作业(一个池方法调用)并从中创建任务。这些任务被放在一个无界队列中。每当工作人员完成任务时,它都会阻塞地尝试get()从这样的inqueue.OP:Pool 只执行一个线程而不是 4 个......我尝试了不同的方法,但它仍然只执行一次。pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)...是一个单一调用、单一任务的生产工作。如果您想要多次执行func,则必须pool.apply_async()多次调用,或者使用映射池方法,例如pool.map(func, iterable, chunksize=None)...,它将一个函数映射到一个可迭代对象上。pool.apply_async是非阻塞的,这就是为什么它是“异步的”。它立即返回一个AsyncResult您可以(阻塞地)调用.wait()或调用.get()的对象。通过评论很明显,要没完没了并立即对已完成的任务(个体经营产生的输入流)的替代品...和程序应停止在一个KeyboardInterrupt或者当结果不具有一定的价值。您可以使用 -callback参数在任何旧apply_async任务完成后立即安排新任务。困难在于在 MainThread 的同时如何防止整个脚本过早结束,同时保持它对 KeyboardInterrupt 的响应。让 MainThread 在循环中休眠使其仍然可以立即对 KeyboardInterrupt 做出反应,同时防止提前退出。如果结果应该停止程序,您可以让回调终止池。然后 MainThread 只需要在他的睡眠循环中包含对池状态的检查。import timefrom random import randint, choicefrom itertools import countfrom datetime import datetimefrom threading import current_threadfrom multiprocessing.pool import ThreadPooldef test_post(post_id):&nbsp; &nbsp; time.sleep(randint(1, 3))&nbsp; &nbsp; status_code = choice([200] * 9 + [404])&nbsp; &nbsp; return "{} {} Message no.{}: {}".format(&nbsp; &nbsp; &nbsp; &nbsp; datetime.now(), current_thread().name, post_id, status_code&nbsp; &nbsp; ), status_codedef handle_result(result):&nbsp; &nbsp; msg, code = result&nbsp; &nbsp; print(msg)&nbsp; &nbsp; if code != 200:&nbsp; &nbsp; &nbsp; &nbsp; print("terminating")&nbsp; &nbsp; &nbsp; &nbsp; pool.terminate()&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; pool.apply_async(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; test_post, args=(next(post_cnt),), callback=handle_result&nbsp; &nbsp; &nbsp; &nbsp; )if __name__ == '__main__':&nbsp; &nbsp; N_WORKERS = 4&nbsp; &nbsp; post_cnt = count()&nbsp; &nbsp; pool = ThreadPool(N_WORKERS)&nbsp; &nbsp; # initial distribution&nbsp; &nbsp; for _ in range(N_WORKERS):&nbsp; &nbsp; &nbsp; &nbsp; pool.apply_async(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; test_post, args=(next(post_cnt),), callback=handle_result&nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; while pool._state == 0:&nbsp; # check if pool is still alive&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(1)&nbsp; &nbsp; except KeyboardInterrupt:&nbsp; &nbsp; &nbsp; &nbsp; print(" got interrupt")带有键盘中断的示例输出:$> python2 scratch.py2019-02-15 18:46:11.724203 Thread-4 Message no.3: 2002019-02-15 18:46:12.724713 Thread-2 Message no.1: 2002019-02-15 18:46:13.726107 Thread-1 Message no.0: 2002019-02-15 18:46:13.726292 Thread-3 Message no.2: 2002019-02-15 18:46:14.724537 Thread-4 Message no.4: 2002019-02-15 18:46:14.726881 Thread-2 Message no.5: 2002019-02-15 18:46:14.727071 Thread-1 Message no.6: 200^C got interrupt由于不需要的返回值而终止的示例输出:$> python2 scratch.py2019-02-15 18:44:19.966387 Thread-3 Message no.0: 2002019-02-15 18:44:19.966491 Thread-4 Message no.1: 2002019-02-15 18:44:19.966582 Thread-1 Message no.3: 2002019-02-15 18:44:20.967555 Thread-2 Message no.2: 2002019-02-15 18:44:20.968562 Thread-3 Message no.4: 404terminating请注意,在您的场景中,您还可以apply_async更频繁地调用N_WORKERS-times 为您的初始分发提供一些缓冲区以减少延迟。
随时随地看视频慕课网APP

相关分类

Python
我要回答