mqtt.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # -*- coding: utf-8 -*-
  2. """
  3. demeter
  4. name:mqtt.py
  5. author:rabin
  6. """
  7. from demeter.core import *
  8. import paho.mqtt.client as mqtt
  9. #from gevent import monkey; monkey.patch_all()
  10. #import gevent
  11. class Connect(object):
  12. def __init__(self, act, topic=False):
  13. act.connect = self
  14. self.client = mqtt.Client()
  15. self.topic = topic
  16. state = hasattr(act, 'message')
  17. if state:
  18. self.client.on_connect = self.connectAndSub
  19. self.client.on_message = act.message
  20. else:
  21. self.client.on_connect = self.connect
  22. self.client.connect(Demeter.config['mqtt']['host'], int(Demeter.config['mqtt']['port']), int(Demeter.config['mqtt']['timeout']))
  23. if state:
  24. self.client.loop_forever()
  25. def __del__(self):
  26. pass
  27. def getClient(self):
  28. return self.client
  29. def connect(self, client, userdata, flags, rc):
  30. pass
  31. def connectAndSub(self, client, userdata, flags, rc):
  32. #print("Connected with result code "+str(rc))
  33. #client.subscribe("sensor/#")
  34. #sub = Demeter.config['mqtt']['sub'].split(',')
  35. sub = self.topic.split(',')
  36. for value in sub:
  37. client.subscribe(value + "/#")
  38. """
  39. gevent.joinall([
  40. gevent.spawn(self.subscribe, client, 'sensor/#'),
  41. gevent.spawn(self.subscribe, client, 'pic/#'),
  42. gevent.spawn(self.subscribe, client, 'msg/#'),
  43. ])
  44. """
  45. @staticmethod
  46. def subscribe(client, key):
  47. client.subscribe(key)
  48. def handle(self, key, value):
  49. service = Demeter.service('record', 'mqtt')
  50. service.push(key, value)
  51. class Pub(object):
  52. def __init__(self):
  53. Connect(self)
  54. def __del__(self):
  55. pass
  56. def push(self, key, msg, qos=0, retain=False, callback=False, param=False, feedback=False):
  57. result = self.connect.getClient().publish(key,payload=msg,qos=qos,retain=retain)
  58. self.callback = callback
  59. self.param = param
  60. if feedback == True and 'key' in self.param:
  61. self.connect.client.on_message = self.feedback
  62. self.connect.client.subscribe(self.param['key'])
  63. self.connect.client.loop_forever()
  64. elif qos in (1,2):
  65. self.connect.client.on_publish = self.publish
  66. self.connect.client.loop_forever()
  67. #else:
  68. #self.connect.client.disconnect()
  69. return result
  70. def publish(self, client, userdata, mid, msg='ok'):
  71. self.callback(self.param, client, userdata, mid, msg)
  72. self.connect.client.disconnect()
  73. def feedback(self, client, userdata, msg):
  74. if msg.topic == self.param['key']:
  75. self.publish(client, userdata, 0, msg.payload)
  76. class Sub(object):
  77. def __init__(self, topic=False):
  78. Connect(self, topic=topic)
  79. def __del__(self):
  80. pass
  81. def message(self, client, userdata, msg):
  82. #print(msg.topic+" "+str(msg.payload))
  83. #return
  84. self.connect.handle(msg.topic, str(msg.payload))