modbus.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # -*- coding: utf-8 -*-
  2. """
  3. demeter
  4. name:mqtt.py
  5. author:rabin
  6. """
  7. from demeter.core import *
  8. import paho.mqtt.client as mqtt
  9. #from gevent import monkey; monkey.patch_all()
  10. #import gevent
  11. class Connect(object):
  12. def __init__(self, act):
  13. act.connect = self
  14. self.client = mqtt.Client()
  15. state = hasattr(act, 'message')
  16. if state:
  17. self.client.on_connect = self.connectAndSub
  18. self.client.on_message = act.message
  19. else:
  20. self.client.on_connect = self.connect
  21. self.client.connect(Demeter.config['mqtt']['host'], Demeter.config['mqtt']['port'], int(Demeter.config['mqtt']['timeout']))
  22. if state:
  23. self.client.loop_forever()
  24. def __del__(self):
  25. pass
  26. def getClient(self):
  27. return self.client
  28. def connect(self, client, userdata, flags, rc):
  29. pass
  30. def connectAndSub(self, client, userdata, flags, rc):
  31. #print("Connected with result code "+str(rc))
  32. #client.subscribe("sensor/#")
  33. sub = Demeter.config['mqtt']['sub'].split(',')
  34. for value in sub:
  35. client.subscribe(value + "/#")
  36. """
  37. gevent.joinall([
  38. gevent.spawn(self.subscribe, client, 'sensor/#'),
  39. gevent.spawn(self.subscribe, client, 'pic/#'),
  40. gevent.spawn(self.subscribe, client, 'msg/#'),
  41. ])
  42. """
  43. @staticmethod
  44. def subscribe(client, key):
  45. client.subscribe(key)
  46. def handle(self, key, value):
  47. Demeter.record(key, value)
  48. class Pub(object):
  49. def __init__(self):
  50. Connect(self)
  51. def __del__(self):
  52. pass
  53. def push(self, key, msg, qos=0, retain=False, callback=False, param=False, feedback=False):
  54. result = self.connect.getClient().publish(key,payload=msg,qos=qos,retain=retain)
  55. self.callback = callback
  56. self.param = param
  57. if feedback == True and 'key' in self.param:
  58. self.connect.client.on_message = self.feedback
  59. self.connect.client.subscribe(self.param['key'])
  60. self.connect.client.loop_forever()
  61. elif qos in (1,2):
  62. self.connect.client.on_publish = self.publish
  63. self.connect.client.loop_forever()
  64. #else:
  65. #self.connect.client.disconnect()
  66. return result
  67. def publish(self, client, userdata, mid, msg='ok'):
  68. self.callback(self.param, client, userdata, mid, msg)
  69. self.connect.client.disconnect()
  70. def feedback(self, client, userdata, msg):
  71. if msg.topic == self.param['key']:
  72. self.publish(client, userdata, 0, msg.payload)
  73. class Sub(object):
  74. def __init__(self):
  75. Connect(self)
  76. def __del__(self):
  77. pass
  78. def message(self, client, userdata, msg):
  79. #print(msg.topic+" "+str(msg.payload))
  80. #return
  81. self.connect.handle(msg.topic, str(msg.payload))