I’ve been traipsing round the countryside (near and far) on entirely too many customer visits, which, coupled with the return of my sinus troubles and a massive allergy, has done little towards either mood or creativity–hence the radio silence over the past couple of weeks.
Most of my travel time was put to moderately good use by reading books, and catching up on TV shows has been an adequate way to zone out and nurse my headaches, but a few other things have snuck their way in, including instilling some sanity into writing MQTT handlers–while on one of my depressingly repetitive train trips, I picked up a Trie
class, tweaked it a bit, and hacked up a quick bottle
-like approach to use with hbmqtt
:
class MQTTRouter(Trie):
def __init__(self):
super(MQTTRouter, self).__init__(wildcard='#')
def dispatch(self, path):
try:
status, func, kwargs = self.get(path)
if '' in kwargs:
del(kwargs[''])
if status:
return func(**kwargs)
except Exception as e:
print(e)
router = MQTTRouter()
def route(path):
"""Decorator for wrapping handlers"""
def inner(func):
router.insert(path, func)
return inner
# some example handlers, with parameters and wildcards
@route("$SYS/:internal/#")
def system(internal):
print("system " + internal)
@route("/sensors/:sensor/temperature")
def temperature(sensor):
print(sensor)
# test invocations
router.dispatch("$SYS/counter/foo")
router.dispatch("/sensors/room/temperature")
I’ve yet to tie this in with asyncio
, but I can see it happening when allergy season passes. If it ever does, considering how unstable the weather is now…