Python 在特定时间启动进程并在特定时间结束

这是一个示例 python 代码,每 5 秒运行一个进程,直到凌晨 1 点。


def run_check():

  mythrd =threading.Timer(5.0, run_check)

  mythrd.start()

  time = datetime.now()

  domystuff()


  if time.hour == 1:

    mythrd.cancel()


run_check()

我想在特定时间启动线程。所以我改变了这个代码如下,


mythrd = None


def run_check():

  while (True):


    now = datetime.now

    if now().hour == 16 and now().minute == 30 and now().second == 30:

      mythrd.start()

      time.sleep(1)

    if mythrd.is_alive():

      domystuff()


    if datetime.now().hour == 16 and datetime.now().minute == 31:

        mythrd.cancel()


mythrd =threading.Timer(5.0, run_check)

run_check()

早些时候,我将mythrd对象保留在run_check函数中。如果我在run_check函数内部初始化,它将将该对象作为一个新对象。它会再次触发。所以我将mythrd函数定义为一个全局对象。但它一直在运行。我想在特定时间开始该过程并在特定时间结束它。我的代码有什么问题?


蛊毒传说
浏览 411回答 2
2回答

MMMHUHU

这段代码有几个问题。首先,当在线程内部循环并且没有条件命中时,添加睡眠以节省 CPU 使用。其次,由于线程是非确定性的,您不能在确切的时间戳上触发条件。例如,线程可能会在“热”时间间隔内跳过它的执行,并且条件永远不会触发。因此,您可以改为检查当前时间戳是否已超过某个时间戳(以及超过了多少)和/或以其他方式比较时间戳。但无论如何,在您的代码中 mythrd 启动一个线程函数,它将递归调用自己......这就是问题所在。mythrd =threading.Timer(5.0, run_check)进而mythrd.start()&nbsp; # will start threading.Timer(5.0, run_check)因此,当条件now().hour == 16 and now().minute == 30 and now().second == 30触发时,线程将运行自身的另一个实例。这个实例可能会运行另一个实例等等......您的意思是实现一个触发线程,执行run_check任务,然后启动另一个线程执行实际工作?这是一个解决方案:from threading import Thread, Timerimport timeimport datetimekeep_checking = Truekeep_working = Truedef worker_thread():&nbsp; &nbsp; while keep_working:&nbsp; &nbsp; &nbsp; &nbsp; pass #do my stuff here and remember to use an eventual sleepdef time_checker():&nbsp; &nbsp; while keep_checking:&nbsp; &nbsp; &nbsp; &nbsp; now = datetime.now()&nbsp; &nbsp; &nbsp; &nbsp; if now.hour == 16 and now.minute == 30 and (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;now.second >= 30 and now.second < 31) and (&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;not mythrd.isAlive()):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mythrd.start()&nbsp; &nbsp; &nbsp; &nbsp; elif now.minute >= 31 and mythrd.isAlive():&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; keep_working = False # and let it die instead to kill it&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(1) # no operationmythrd = threading.Thread(target=worker_thread)time_checker() # might turn this into a thread as well请记住,您需要定义线程开启和关闭的确切时间间隔。有条件地涵盖所有这些情况并采取相应的行动。随着您的项目变得更加复杂,稍后可能需要互斥锁。

PIPIONE

我已经为您实现了这个解决方案:from datetime import datetime as dtimport threadingimport timeclass test(object):&nbsp; &nbsp; def __init__(self):&nbsp; &nbsp; &nbsp; &nbsp; self.run_permission = False&nbsp; &nbsp; &nbsp; &nbsp; self.start_time = [16,30,30] # Hours, Minutes, Seconds&nbsp; &nbsp; &nbsp; &nbsp; self.end_time = [18,41,00] # Hours, Minutes, Seconds&nbsp; &nbsp; &nbsp; &nbsp; self.timer_sched_time = 5 # seconds&nbsp; &nbsp; &nbsp; &nbsp; threading.Thread(name="time_checker", target=self.check_time, args=(self.start_time, self.end_time,)).start()&nbsp; &nbsp; &nbsp; &nbsp; threading.Thread(name="scheduled_job", target=self.Timer, args=(self.timer_sched_time, )).start()&nbsp; &nbsp; &nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(10)&nbsp; &nbsp; def timer_job(self, unix_time, human_time):&nbsp; &nbsp; &nbsp; &nbsp; print "Unix time: %s&nbsp; Human time: %s \n" % (unix_time, human_time)&nbsp; &nbsp; def Timer(self, delay):&nbsp; &nbsp; &nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time_remaining = delay-time.time()%delay&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(time_remaining)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; unix_time = str(int(round(time.time()*1000)))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; human_time = str(dt.now())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(self.run_permission):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.timer_job(unix_time, human_time)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; except Exception, ex:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raise ex&nbsp; &nbsp; def check_time(self, start_execution_time, end_execution_time):&nbsp; &nbsp; &nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; now = dt.now()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(now.hour >= start_execution_time[0] and now.minute >= start_execution_time[1] and now.second >= start_execution_time[2]):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.run_permission = True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(now.hour >= end_execution_time[0] and now.minute >= end_execution_time[1] and now.second >= end_execution_time[2]):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.run_permission = Falsetest()首先导入您需要的库:from datetime import datetime as dtimport threadingimport time创建一个名为 test 的类:class test(object):定义__init__,当您使用test()以下命令调用它时,它将在创建类 test 时执行:def __init__(self):&nbsp; &nbsp; &nbsp; &nbsp; self.run_permission = False&nbsp; &nbsp; &nbsp; &nbsp; self.start_time = [16,30,30] # Hours, Minutes, Seconds&nbsp; &nbsp; &nbsp; &nbsp; self.end_time = [18,41,00] # Hours, Minutes, Seconds&nbsp; &nbsp; &nbsp; &nbsp; self.timer_sched_time = 5 # seconds&nbsp; &nbsp; &nbsp; &nbsp; threading.Thread(name="time_checker", target=self.check_time, args=(self.start_time, self.end_time,)).start()&nbsp; &nbsp; &nbsp; &nbsp; threading.Thread(name="scheduled_job", target=self.Timer, args=(self.timer_sched_time, )).start()&nbsp; &nbsp; &nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(10)该变量run_permission是一个布尔值,用作检查是否可以在计时器中执行作业的标志。该start_time变量中设置的开始时间check_time是写程序run_permission变量,True如果时间主要或等于start_time该end_time变量设定的停止时间check_time是写程序run_permission变量设置为false如果时间主要或等于end_time该timer_sched_time变量设置定时器的延迟,计算睡眠时间需要延迟,它每 5 秒授予完美的时间执行:17.00.00 - 17.00.05 - 17.00.10 - 17.00.15 等等这两行启动check_time作业和Timer作业的线程:threading.Thread(name="time_checker", target=self.check_time, args=(self.start_time, self.end_time,)).start()threading.Thread(name="scheduled_job", target=self.Timer, args=(self.timer_sched_time, )).start()这两行保持主线程运行,但睡眠主线程以减少资源消耗:while True:&nbsp; &nbsp; time.sleep(10)该函数timer_job仅以 Unix 格式和人类可读格式打印时间:def timer_job(self, unix_time, human_time):&nbsp; &nbsp; &nbsp; &nbsp; print "Unix time: %s&nbsp; Human time: %s \n" % (unix_time, human_time)要翻译 unix 格式,您可以使用epoch 转换器网站。该Timer函数计算睡眠时间以将执行授予完美的时间,以 unix 和人类格式获取时间戳并将其传递给timer_job如果run_permission变量设置为True:def Timer(self, delay):&nbsp; &nbsp; &nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time_remaining = delay-time.time()%delay&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(time_remaining)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; unix_time = str(int(round(time.time()*1000)))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; human_time = str(dt.now())&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(self.run_permission):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.timer_job(unix_time, human_time)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; except Exception, ex:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; raise excheck_time 函数使用参数start_execution_time 并将end_execution_time变量设置run_permission为TrueorFalse是否遵守时间执行条件(因此如果遵守小时/分钟/秒范围):def check_time(self, start_execution_time, end_execution_time):&nbsp; &nbsp; &nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; now = dt.now()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(now.hour >= start_execution_time[0] and now.minute >= start_execution_time[1] and now.second >= start_execution_time[2]):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.run_permission = True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if(now.hour >= end_execution_time[0] and now.minute >= end_execution_time[1] and now.second >= end_execution_time[2]):&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; self.run_permission = False如前所述test(),在文件末尾调用测试类并使用该__init__函数初始化程序。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python