猿问

MQTT和时间延迟

我有一个 python 脚本,我正在尝试使用它来处理 paho mqtt 和时间延迟。我研究了类似的问题,这些问题讨论了 paho mqtt,例如Controlling Program with MQTT and Python。我的python脚本如下:

这是我的脚本:

 import paho.mqtt.client as mqttClient

    import time

    import subprocess

    import os

    

    

    global lastprocessed, Connected

    lastprocessed = None

    Connected = False   #global variable for the state of the connection

    

    def on_connect(client, userdata, flags, rc):

        if rc == 0:

            print("Connected to broker")

            global Connected

            Connected = True                #Signal connection

        else:

            print("Connection failed")

    

    

    def on_message(client, userdata, message):

        global lastprocessed

        if message.payload.decode() == "hello":

            lastprocessed = time.time()

    

        if lastprocessed and time.time() - lastprocessed < 20:

            print("good")


broker_address= "192.168.1.111"  #Broker address

port = 1883                         #Broker port

user = "abcde"                    #Connection username

password = "12345"            #Connection password


client = mqttClient.Client("Python")               #create new instance

client.username_pw_set(user, password=password)    #set username and password

client.on_connect= on_connect                      #attach function to callback

client.on_message= on_message                      #attach function to callback

client.connect(broker_address,port,60) #connect

client.subscribe("home/OpenMQTTGateway/433toMQTT") #subscribe

client.loop_forever() #then keep listening forever

我上面的代码发生的事情是,只要在有效负载中收到“hello”,它就会打印“good”,但如果在有效负载中收到任何其他内容,包括“hello”,它会继续打印“good”。它没有考虑 20 秒的时间。我不确定为什么?

我想要实现的目标如下:

  1. 当开始执行 python 脚本时,脚本应该打印“bad”。

  2. 这应该停止,直到在有效负载中收到“hello”。

  3. 一旦收到“hello”,“good”应该打印 20 秒,在这 20 秒内,主题中收到的任何其他消息都应该被忽略,包括“hello”。

  4. 20 秒后,脚本应该继续打印“坏”,但只打印一次,循环继续。


慕妹3146593
浏览 455回答 1
1回答

开满天机

我使用loop_start()andloop_stop()而不是loop_forever()然后使用 between start,stop我可以创建自己的循环来检查消息和打印文本。我使用变量state来控制代码是在第一个hello( state = "start") 之前还是它得到hello,现在它必须检查时间并重复文本“好” ( state = "hello") 或者它有 20 秒之后hello并且它没有打印任何内容 (state = "other")在内部on_message,我只hello在收到消息时才将状态更改为hello,而旧状态则不同hello。import paho.mqtt.client as mqttClientimport time# --- functions ---def on_connect(client, userdata, flags, rc):&nbsp; &nbsp; #global state&nbsp; &nbsp; global connected&nbsp; &nbsp; if rc == 0:&nbsp; &nbsp; &nbsp; &nbsp; print("Connected to broker")&nbsp; &nbsp; &nbsp; &nbsp; connected = True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; print("Connection failed")def on_message(client, userdata, message):&nbsp; &nbsp; global state&nbsp; &nbsp; global last_processed&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; if message.payload.decode() == "hello":&nbsp; &nbsp; &nbsp; &nbsp; if state != 'hello':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = 'hello'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; last_processed = time.time()&nbsp; &nbsp; # OR&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; #if state != 'hello':&nbsp; &nbsp; #&nbsp; &nbsp; if message.payload.decode() == "hello":&nbsp; &nbsp; #&nbsp; &nbsp; &nbsp; &nbsp; state = 'hello'&nbsp; &nbsp; #&nbsp; &nbsp; &nbsp; &nbsp; last_processed = time.time()# --- main ---broker_address = "192.168.1.111"&nbsp; # broker addressport = 1883&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# broker portuser = "abcde"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # connection usernamepassword = "12345"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # connection password# ---client = mqttClient.Client("Python")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# create new instanceclient.username_pw_set(user, password=password)&nbsp; &nbsp; # set username and passwordclient.on_connect= on_connect&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # attach function to callbackclient.on_message= on_message&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # attach function to callbackclient.connect(broker_address, port, 60)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# connectclient.subscribe("home/OpenMQTTGateway/433toMQTT") # subscribe# --- main loop ---last_processed = Noneconnected = False&nbsp; &nbsp; &nbsp; # signal connectionstate = 'start' # 'start', 'hello', 'other', 'not connected'&nbsp; &nbsp;# ---client.loop_start()try:&nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp; if not connected:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('not connected')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; if state == 'start':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('bad')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; if state == 'hello':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if last_processed and time.time() - last_processed < 20:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print("good", time.time() - last_processed, end='\r') # '\r` to write it in the same line.&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('bad')&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = 'other'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; last_processed = None&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; if state == 'other':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pass&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; time.sleep(1.0) # to slow down example&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except KeyboardInterrupt:&nbsp; &nbsp; print('KeyboardInterrupt')&nbsp; &nbsp;&nbsp;client.loop_stop()编辑:如果您只需要bad一次hello,那么您可以bad在循环之前先打印,然后hello在收到消息时打印,然后time.sleep(20)在打印bad和更改状态之前打印。import paho.mqtt.client as mqttClientimport time# --- functions ---def on_connect(client, userdata, flags, rc):&nbsp; &nbsp; #global state&nbsp; &nbsp; global connected&nbsp; &nbsp; if rc == 0:&nbsp; &nbsp; &nbsp; &nbsp; print("Connected to broker")&nbsp; &nbsp; &nbsp; &nbsp; connected = True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; print("Connection failed")def on_message(client, userdata, message):&nbsp; &nbsp; global state&nbsp; &nbsp; message = message.payload.decode()&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; if state != 'hello':&nbsp; &nbsp; &nbsp; &nbsp; if message == 'hello':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = 'hello'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('good') # once when start `hello`&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('msg:', message)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# --- main ---broker_address = "192.168.1.111"&nbsp; # broker addressport = 1883&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# broker portuser = "abcde"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # connection usernamepassword = "12345"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # connection password# ---client = mqttClient.Client("Python")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# create new instanceclient.username_pw_set(user, password=password)&nbsp; &nbsp; # set username and passwordclient.on_connect= on_connect&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # attach function to callbackclient.on_message= on_message&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # attach function to callbackclient.connect(broker_address, port, 60)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# connectclient.subscribe("home/OpenMQTTGateway/433toMQTT") # subscribe# --- main loop ---connected = False&nbsp; &nbsp; &nbsp; # signal connectionstate = 'other' # 'hello'# ---client.loop_start()print('bad') # once at starttry:&nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; if state == 'hello':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; time.sleep(20)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('bad')&nbsp; # once when end `hello`&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = 'other'&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; time.sleep(1.0) # to slow down example&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except KeyboardInterrupt:&nbsp; &nbsp; print('KeyboardInterrupt')&nbsp; &nbsp;&nbsp;client.loop_stop()使用threading.Timerinstead of time.sleep()because sleep()blocks loop 会很有用,如果你想做更多的事情,它就没那么有用了。import paho.mqtt.client as mqttClientimport threading# --- functions ---def on_connect(client, userdata, flags, rc):&nbsp; &nbsp; #global state&nbsp; &nbsp; global connected&nbsp; &nbsp; if rc == 0:&nbsp; &nbsp; &nbsp; &nbsp; print("Connected to broker")&nbsp; &nbsp; &nbsp; &nbsp; connected = True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; print("Connection failed")def on_message(client, userdata, message):&nbsp; &nbsp; global state&nbsp; &nbsp; message = message.payload.decode()&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; if state != 'hello':&nbsp; &nbsp; &nbsp; &nbsp; if message == 'hello':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = 'hello'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('good') # once when start `hello`&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; threading.Timer(20, end_hello).start()&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('msg:', message)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;def end_hello():&nbsp; &nbsp; global state&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; print('bad')&nbsp; # once when end `hello`&nbsp; &nbsp; state = 'other'# --- main ---broker_address = "192.168.1.111"&nbsp; # broker addressport = 1883&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# broker portuser = "abcde"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # connection usernamepassword = "12345"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # connection password# ---client = mqttClient.Client("Python")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# create new instanceclient.username_pw_set(user, password=password)&nbsp; &nbsp; # set username and passwordclient.on_connect= on_connect&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # attach function to callbackclient.on_message= on_message&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # attach function to callbackclient.connect(broker_address, port, 60)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# connectclient.subscribe("home/OpenMQTTGateway/433toMQTT") # subscribe# --- main loop ---connected = False&nbsp; &nbsp; &nbsp; # signal connectionstate = 'other' # 'hello'# ---print('bad') # once at startclient.loop_forever()最终你仍然可以检查循环时间import paho.mqtt.client as mqttClientimport time# --- functions ---def on_connect(client, userdata, flags, rc):&nbsp; &nbsp; #global state&nbsp; &nbsp; global connected&nbsp; &nbsp; if rc == 0:&nbsp; &nbsp; &nbsp; &nbsp; print("Connected to broker")&nbsp; &nbsp; &nbsp; &nbsp; connected = True&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; print("Connection failed")def on_message(client, userdata, message):&nbsp; &nbsp; global state&nbsp; &nbsp; global last_processed&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; message = message.payload.decode()&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; if state != 'hello':&nbsp; &nbsp; &nbsp; &nbsp; if message == 'hello':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = 'hello'&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; last_processed = time.time()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('good') # once when start `hello`&nbsp; &nbsp; &nbsp; &nbsp; else:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('msg:', message)&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;# --- main ---broker_address = "192.168.1.111"&nbsp; # broker addressport = 1883&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# broker portuser = "abcde"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # connection usernamepassword = "12345"&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # connection password# ---client = mqttClient.Client("Python")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# create new instanceclient.username_pw_set(user, password=password)&nbsp; &nbsp; # set username and passwordclient.on_connect= on_connect&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # attach function to callbackclient.on_message= on_message&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # attach function to callbackclient.connect(broker_address, port, 60)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;# connectclient.subscribe("home/OpenMQTTGateway/433toMQTT") # subscribe# --- main loop ---last_processed = Noneconnected = False&nbsp; &nbsp; &nbsp; # signal connectionstate = 'other' # 'hello'# ---client.loop_start()print('bad') # once at starttry:&nbsp; &nbsp; while True:&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; if state == 'hello':&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if time.time() >= last_processed + 20:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; print('bad')&nbsp; # once when end `hello`&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; state = 'other'&nbsp; &nbsp;&nbsp;&nbsp; &nbsp; &nbsp; &nbsp; time.sleep(1.0) # to slow down example&nbsp; &nbsp; &nbsp; &nbsp;&nbsp;except KeyboardInterrupt:&nbsp; &nbsp; print('KeyboardInterrupt')&nbsp; &nbsp;&nbsp;client.loop_stop()
随时随地看视频慕课网APP

相关分类

Python
我要回答