rabin 7 years ago
parent
commit
2fb80222ed
3 changed files with 38 additions and 10 deletions
  1. 15 3
      core.py
  2. 20 7
      mqtt.py
  3. 3 0
      web.py

+ 15 - 3
core.py

@@ -243,7 +243,7 @@ class Demeter(object):
 			raise Finish()
 		else:
 			print string
-			os._exit(0)
+			#os._exit(0)
 
 class File(object):
 
@@ -295,11 +295,23 @@ class File(object):
 
 class Shell(object):
 	@staticmethod
-	def popen(command, sub=False, bg=False):
+	def popen(command, sub=False, bg=False, timeout=0):
 		string = command
 		if bg == True:
 			command = command + ' 1>/dev/null 2>&1 &'
-		if sub == False:
+
+		if timeout > 0:
+			proc = subprocess.Popen(command,bufsize=0,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
+			poll_seconds = .250
+			deadline = time.time() + timeout
+			while time.time() < deadline and proc.poll() == None:
+				time.sleep(poll_seconds)
+			if proc.poll() == None:
+				return 'timeout'
+
+			stdout,stderr = proc.communicate()
+			return stdout
+		elif sub == False:
 			process = os.popen(command)
 			output = process.read()
 			process.close()

+ 20 - 7
mqtt.py

@@ -15,10 +15,12 @@ class Connect(object):
 	def __init__(self, act):
 		act.connect = self
 		self.client = mqtt.Client()
-		self.client.on_connect = self.connect
 		state = hasattr(act, 'message')
 		if state:
+			self.client.on_connect = self.connectAndSub
 			self.client.on_message = act.message
+		else:
+			self.client.on_connect = self.connect
 		self.client.connect(Demeter.config['mqtt']['host'], Demeter.config['mqtt']['port'], int(Demeter.config['mqtt']['timeout']))
 		if state:
 			self.client.loop_forever()
@@ -30,6 +32,9 @@ class Connect(object):
 		return self.client
 
 	def connect(self, client, userdata, flags, rc):
+		pass
+
+	def connectAndSub(self, client, userdata, flags, rc):
 		#print("Connected with result code "+str(rc))
 		#client.subscribe("sensor/#")
 		sub = Demeter.config['mqtt']['sub'].split(',')
@@ -59,22 +64,30 @@ class Pub(object):
 	def __del__(self):
 		pass
 
-	def push(self, key, msg, qos=0, retain=False, callback=False, param=False):
+	def push(self, key, msg, qos=0, retain=False, callback=False, param=False, feedback=False):
 		result = self.connect.getClient().publish(key,payload=msg,qos=qos,retain=retain)
 
-		if qos in (1,2):
-			self.callback = callback
-			self.param = param
+		self.callback = callback
+		self.param = param
+		if feedback == True and 'key' in self.param:
+			self.connect.client.on_message = self.feedback
+			self.connect.client.subscribe(self.param['key'])
+			self.connect.client.loop_forever()
+		elif qos in (1,2):
 			self.connect.client.on_publish = self.publish
 			self.connect.client.loop_forever()
 		else:
 			self.connect.client.disconnect()
 		return result
 
-	def publish(self, client, userdata, mid):
-		self.callback(self.param, client, userdata, mid)
+	def publish(self, client, userdata, mid, msg='ok'):
+		self.callback(self.param, client, userdata, mid, msg)
 		self.connect.client.disconnect()
 
+	def feedback(self, client, userdata, msg):
+		if msg.topic == self.param['key']:
+			self.publish(client, userdata, 0, msg.payload)
+
 class Sub(object):
 
 	def __init__(self):

+ 3 - 0
web.py

@@ -9,8 +9,11 @@ import os
 import json
 from demeter.core import *
 import tornado.web
+import tornado.web  
 import tornado.ioloop
 import tornado.httpserver
+import tornado.httpclient 
+import tornado.concurrent
 
 class Base(tornado.web.RequestHandler):
 	def initialize(self):