mqtt.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. demeter mqtt
  5. name:connect.py
  6. author:rabin
  7. """
  8. from demeter.core import *
  9. import paho.mqtt.client as mqtt
  10. #from gevent import monkey; monkey.patch_all()
  11. #import gevent
  12. class Connect(object):
  13. def __init__(self, act):
  14. act.connect = self
  15. self.client = mqtt.Client()
  16. state = hasattr(act, 'message')
  17. if state:
  18. self.client.on_connect = self.connectAndSub
  19. self.client.on_message = act.message
  20. else:
  21. self.client.on_connect = self.connect
  22. self.client.connect(Demeter.config['mqtt']['host'], Demeter.config['mqtt']['port'], int(Demeter.config['mqtt']['timeout']))
  23. if state:
  24. self.client.loop_forever()
  25. def __del__(self):
  26. pass
  27. def getClient(self):
  28. return self.client
  29. def connect(self, client, userdata, flags, rc):
  30. pass
  31. def connectAndSub(self, client, userdata, flags, rc):
  32. #print("Connected with result code "+str(rc))
  33. #client.subscribe("sensor/#")
  34. sub = Demeter.config['mqtt']['sub'].split(',')
  35. for value in sub:
  36. client.subscribe(value + "/#")
  37. """
  38. gevent.joinall([
  39. gevent.spawn(self.subscribe, client, 'sensor/#'),
  40. gevent.spawn(self.subscribe, client, 'pic/#'),
  41. gevent.spawn(self.subscribe, client, 'msg/#'),
  42. ])
  43. """
  44. @staticmethod
  45. def subscribe(client, key):
  46. client.subscribe(key)
  47. def handle(self, key, value):
  48. Demeter.record(key, value)
  49. class Pub(object):
  50. def __init__(self):
  51. Connect(self)
  52. def __del__(self):
  53. pass
  54. def push(self, key, msg, qos=0, retain=False, callback=False, param=False, feedback=False):
  55. result = self.connect.getClient().publish(key,payload=msg,qos=qos,retain=retain)
  56. self.callback = callback
  57. self.param = param
  58. if feedback == True and 'key' in self.param:
  59. self.connect.client.on_message = self.feedback
  60. self.connect.client.subscribe(self.param['key'])
  61. self.connect.client.loop_forever()
  62. elif qos in (1,2):
  63. self.connect.client.on_publish = self.publish
  64. self.connect.client.loop_forever()
  65. #else:
  66. #self.connect.client.disconnect()
  67. return result
  68. def publish(self, client, userdata, mid, msg='ok'):
  69. self.callback(self.param, client, userdata, mid, msg)
  70. self.connect.client.disconnect()
  71. def feedback(self, client, userdata, msg):
  72. if msg.topic == self.param['key']:
  73. self.publish(client, userdata, 0, msg.payload)
  74. class Sub(object):
  75. def __init__(self):
  76. Connect(self)
  77. def __del__(self):
  78. pass
  79. def message(self, client, userdata, msg):
  80. #print(msg.topic+" "+str(msg.payload))
  81. #return
  82. self.connect.handle(msg.topic, str(msg.payload))