繁花不似锦
对于您有趣的任务,只是为了好玩,我决定实现相当复杂但通用的解决方案来异步处理任何命令。命令在文件中提供cmds.txt,每行单个命令。目前仅支持两个命令save和。空格后可能包含第二个可选参数,要保存到的文件名(默认为)。exitsavesave.txt如果程序异常退出(未exit提供命令),则工作将保存到临时文件中save.txt.tmp。cmds.txt文件在单独的线程中处理,每秒检查一次文件,检查速度非常快因此不占用CPU,检查只是测试文件修改时间是否已更改。每个新命令都应添加到新行的文件末尾,不应删除已处理的行。在程序启动命令文件被清理。主线程只检查has_cmdsbool 变量(如果有新命令),它非常快并且可以经常完成,例如在处理最小的任务(如 10-20 毫秒)之后。没有互斥体,因此一切都运行得非常快。例如使用主线程在随机时间点生成任务处理结果并将结果存储到数组中。在保存命令中,此结果数组将保存为 JSON。程序将有关其所做操作的所有信息打印到控制台中,并包含时间戳。要测试程序,请执行以下操作:启动程序。它立即开始处理计算工作。在任何文本编辑器中打开cmds.txt。添加带有save字符串的新行。保存存档。程序应打印该save命令已被识别、处理并且工作已保存到文件save.txt。在编辑器中添加另一行save other.txt。保存存档程序应该打印它已将工作保存到save.txt。添加新行exit并保存。程序应该退出。再次尝试运行程序。尝试按Ctrl+C程序控制台。程序应该捕获此键盘中断并说明这一点,并将工作保存到临时文件save.txt.tmp并退出程序。import threading, random, os, json, time, tracebackcmds = []has_cmds = Falsecmds_fname = 'cmds.txt'save_fname = 'save.txt'save_fname_tmp = 'save.txt.tmp'def CurTimeStr(*, exact = False): from datetime import datetime return (datetime.now(), datetime.utcnow())[exact].strftime(('[%H:%M:%S]', '[%Y-%m-%d %H:%M:%S.%f UTC]')[exact])def Print(*pargs, **nargs): print(CurTimeStr(), *pargs, flush = True, **nargs) def AddCmd(c, *, processed = False): global cmds, has_cmds cmds.append({**{'processed': threading.Event()}, **c}) if processed: cmds[-1]['processed'].set() has_cmds = True return cmds[-1]def ExternalCommandsThread(): global cmds, has_cmds Print('Cmds thread started.') first, next_line, mtime = True, 0, 0. while True: try: if first: Print(f'Cleaning cmds file "{cmds_fname}".') with open(cmds_fname, 'wb') as f: pass first = False if os.path.exists(cmds_fname) and abs(os.path.getmtime(cmds_fname) - mtime) > 0.0001 and os.path.getsize(cmds_fname) > 0: Print(f'Updated cmds file "{cmds_fname}". Processing lines starting from {next_line + 1}.') with open(cmds_fname, 'r', encoding = 'utf-8-sig') as f: data = f.read() lines = list(data.splitlines()) try: mtime = os.path.getmtime(cmds_fname) for iline, line in zip(range(next_line, len(lines)), lines[next_line:]): line = line.strip() if not line: continue if line[0] not in ['[', '{', '"']: cmd = line.split() else: cmd = json.loads(line) pargs = [] if type(cmd) is list: cmd, *pargs = cmd cmd = {'cmd': cmd, 'pargs': pargs} assert 'cmd' in cmd, 'No "cmd" in command line!' c = cmd['cmd'] if c in ['save']: assert len(set(cmd.keys()) - {'cmd', 'fname', 'pargs'}) == 0 AddCmd({'cmd': 'save', 'fname': cmd.get('fname', (cmd['pargs'] or [save_fname])[0])}) elif c == 'exit': AddCmd({'cmd': 'exit'}) else: assert False, f'Unrecognized cmd "{c}"!' Print(f'Parsed cmd "{c}" on line {iline + 1}.') next_line = iline + 1 except (json.decoder.JSONDecodeError, AssertionError) as ex: traceback.print_exc() Print(f'Failed to parse cmds line {iline + 1} with text "{line}"!') except: raise for i, c in enumerate(cmds): if c is None: continue if not c['processed'].is_set(): has_cmds = True while not c['processed'].wait(10): Print(f'Timed out waiting for cmd "{c["cmd"]}" to be processed, continuing waiting!') Print(f'Processed cmd "{c["cmd"]}".') cmds[i] = None if c['cmd'] == 'exit': Print('Exit cmd. Cmds thread finishes.') return has_cmds = False time.sleep(1) except Exception as ex: traceback.print_exc() Print(f'Exception ^^^^^ in Cmds thread!') AddCmd({'cmd': 'exit'}) time.sleep(3)def Main(): global cmds, has_cmds Print('Main thread started.') threading.Thread(target = ExternalCommandsThread, daemon = False).start() results = [] def SaveWork(fname): with open(fname, 'w', encoding = 'utf-8') as f: f.write(json.dumps(results, ensure_ascii = False, indent = 4)) Print(f'Work saved to "{fname}".') def ProcessCmds(): # Returns False only if program should exit for c in cmds: if c is None or c['processed'].is_set(): continue if c['cmd'] == 'save': SaveWork(c['fname']) elif c['cmd'] == 'exit': Print('Exit cmd. Main thread finishes...') c['processed'].set() return False else: assert False, 'Unknown cmd "c["cmd"]"!' c['processed'].set() return True try: # Main loop of tasks processing for i in range(1000): for j in range(10): if has_cmds and not ProcessCmds(): # Very fast check if there are any commands return # Exit # Emulate small work of 0-200 ms long. time.sleep(random.random() * 0.2) # Store results of work in array results.append({'time': CurTimeStr(exact = True), 'i': i, 'j': j}) assert False, 'Main finished without exit cmd!' except BaseException as ex: traceback.print_exc() Print(f'Exception ^^^^^ in Main thread!') SaveWork(save_fname_tmp) AddCmd({'cmd': 'exit'}, processed = True) if __name__ == '__main__': Main()示例输出 1:[08:15:16] Main thread started.[08:15:16] Cmds thread started.[08:15:16] Cleaning cmds file "cmds.txt".[08:15:21] Updated cmds file "cmds.txt". Processing lines starting from 1.[08:15:21] Parsed cmd "save" on line 1.[08:15:21] Work saved to "save.txt".[08:15:21] Processed cmd "save".[08:15:31] Updated cmds file "cmds.txt". Processing lines starting from 2.[08:15:31] Parsed cmd "save" on line 2.[08:15:31] Work saved to "other.txt".[08:15:31] Processed cmd "save".[08:15:35] Updated cmds file "cmds.txt". Processing lines starting from 3.[08:15:35] Parsed cmd "exit" on line 3.[08:15:35] Exit cmd. Main thread finishes...[08:15:35] Processed cmd "exit".[08:15:35] Exit cmd. Cmds thread finishes.与上面的输出相对应的命令文件cmds.txt:savesave other.txtexit示例输出 2:[08:14:39] Main thread started.[08:14:39] Cmds thread started.[08:14:39] Cleaning cmds file "cmds.txt".Traceback (most recent call last): File "stackoverflow_64165394_processing_commands_in_prog.py", line 127, in Main time.sleep(random.random() * 0.2)KeyboardInterrupt[08:14:40] Exception ^^^^^ in Main thread![08:14:40] Work saved to "save.txt.tmp".[08:14:41] Processed cmd "exit".[08:14:41] Exit cmd. Cmds thread finishes.举个例子save.txt:[ { "time": "[2020-10-02 05:15:16.836030 UTC]", "i": 0, "j": 0 }, { "time": "[2020-10-02 05:15:16.917989 UTC]", "i": 0, "j": 1 }, { "time": "[2020-10-02 05:15:17.011129 UTC]", "i": 0, "j": 2 }, { "time": "[2020-10-02 05:15:17.156579 UTC]", "i": 0, "j": 3 }, ................