mqtt.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. demeter mqtt
  5. name:connect.py
  6. author:rabin
  7. """
  8. from demeter.core import *
  9. import paho.mqtt.client as mqtt
  10. #from gevent import monkey; monkey.patch_all()
  11. #import gevent
  12. class Connect(object):
  13. def __init__(self, act):
  14. act.connect = self
  15. self.client = mqtt.Client()
  16. self.client.on_connect = self.connect
  17. state = hasattr(act, 'message')
  18. if state:
  19. self.client.on_message = act.message
  20. self.client.connect(Demeter.config['mqtt']['host'], Demeter.config['mqtt']['port'], int(Demeter.config['mqtt']['timeout']))
  21. if state:
  22. self.client.loop_forever()
  23. def __del__(self):
  24. pass
  25. def getClient(self):
  26. return self.client
  27. def connect(self, client, userdata, flags, rc):
  28. #print("Connected with result code "+str(rc))
  29. #client.subscribe("sensor/#")
  30. sub = Demeter.config['mqtt']['sub'].split(',')
  31. for value in sub:
  32. client.subscribe(value + "/#")
  33. """
  34. gevent.joinall([
  35. gevent.spawn(self.subscribe, client, 'sensor/#'),
  36. gevent.spawn(self.subscribe, client, 'pic/#'),
  37. gevent.spawn(self.subscribe, client, 'msg/#'),
  38. ])
  39. """
  40. @staticmethod
  41. def subscribe(client, key):
  42. client.subscribe(key)
  43. def handle(self, key, value):
  44. Demeter.record(key, value)
  45. class Pub(object):
  46. def __init__(self):
  47. Connect(self)
  48. def __del__(self):
  49. pass
  50. def push(self, key, msg):
  51. self.connect.getClient().publish(key,msg)
  52. class Sub(object):
  53. def __init__(self):
  54. Connect(self)
  55. def __del__(self):
  56. pass
  57. def message(self, client, userdata, msg):
  58. #print(msg.topic+" "+str(msg.payload))
  59. self.connect.handle(msg.topic, str(msg.payload))