dever 6 years ago
commit
6e9cb627f2
4 changed files with 510 additions and 0 deletions
  1. 110 0
      database/server.php
  2. 8 0
      index.php
  3. 350 0
      lib/Core.php
  4. 42 0
      src/Api.php

+ 110 - 0
database/server.php

@@ -0,0 +1,110 @@
+<?php
+return array
+(
+	# 表名
+	'name' => 'server',
+	# 显示给用户看的名称
+	'lang' => '服务器管理',
+	# 后台菜单排序
+	'order' => 1,
+	# 数据结构
+	'struct' => array
+	(
+		'id' 		=> array
+		(
+			'type' 		=> 'int-11',
+			'name' 		=> 'ID',
+			'default' 	=> '',
+			'desc' 		=> '',
+			'match' 	=> 'is_numeric',
+			'search'	=> 'order',
+			//'list'		=> true,
+			'order'		=> 'desc',
+		),
+		
+		'name'		=> array
+		(
+			'type' 		=> 'varchar-32',
+			'name' 		=> '服务器名称',
+			'default' 	=> '',
+			'desc' 		=> '请输入服务器名称',
+			'match' 	=> 'is_string',
+			'update'	=> 'text',
+			'search'	=> 'fulltext',
+			'list'		=> true,
+		),
+
+		'host'		=> array
+		(
+			'type' 		=> 'varchar-100',
+			'name' 		=> '服务器地址',
+			'default' 	=> '',
+			'desc' 		=> '服务器地址',
+			'match' 	=> 'is_string',
+			'update'	=> 'text',
+			'search'	=> 'fulltext',
+			'list'		=> true,
+		),
+
+		'port'		=> array
+		(
+			'type' 		=> 'varchar-32',
+			'name' 		=> '服务器端口号',
+			'default' 	=> '',
+			'desc' 		=> '服务器端口号',
+			'match' 	=> 'is_string',
+			'update'	=> 'text',
+			'search'	=> 'fulltext',
+			'list'		=> true,
+		),
+
+		'username'		=> array
+		(
+			'type' 		=> 'varchar-80',
+			'name' 		=> '用户名',
+			'default' 	=> '',
+			'desc' 		=> '用户名',
+			'match' 	=> 'is_string',
+			'update'	=> 'text',
+			'search'	=> 'fulltext',
+			'list'		=> true,
+		),
+
+		'password'		=> array
+		(
+			'type' 		=> 'varchar-200',
+			'name' 		=> '密码',
+			'default' 	=> '',
+			'desc' 		=> '密码',
+			'match' 	=> 'is_string',
+			'update'	=> 'text',
+			'search'	=> 'fulltext',
+			'list'		=> true,
+		),
+
+		'state'		=> array
+		(
+			'type' 		=> 'tinyint-1',
+			'name' 		=> '状态',
+			'default' 	=> '1',
+			'desc' 		=> '请选择状态',
+			'match' 	=> 'is_numeric',
+		),
+		
+		'cdate'		=> array
+		(
+			'type' 		=> 'int-11',
+			'name' 		=> '录入时间',
+			'match' 	=> array('is_numeric', time()),
+			'desc' 		=> '',
+			# 只有insert时才生效
+			'insert'	=> true,
+			'list'		=> 'date("Y-m-d H:i:s", {cdate})',
+		),
+	),
+
+	'request' => array
+	(
+		
+	),
+);

+ 8 - 0
index.php

@@ -0,0 +1,8 @@
+<?php
+
+define('DEVER_APP_NAME', 'mqtt');
+define('DEVER_APP_LANG', 'mqtt');
+define('DEVER_APP_PATH', dirname(__FILE__) . DIRECTORY_SEPARATOR);
+define('DEVER_MANAGE_ORDER', 98);
+define('DEVER_MANAGE_ICON', 'glyphicon glyphicon-tower');
+include(DEVER_APP_PATH . '../boot.php');

+ 350 - 0
lib/Core.php

@@ -0,0 +1,350 @@
+<?php
+namespace Mqtt\Lib;
+/*
+ 	phpMQTT
+	A simple php class to connect/publish/subscribe to an MQTT broker
+*/
+/*
+	Licence
+	Copyright (c) 2010 Blue Rhinos Consulting | Andrew Milsted
+	andrew@bluerhinos.co.uk | http://www.bluerhinos.co.uk
+	Permission is hereby granted, free of charge, to any person obtaining a copy
+	of this software and associated documentation files (the "Software"), to deal
+	in the Software without restriction, including without limitation the rights
+	to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+	copies of the Software, and to permit persons to whom the Software is
+	furnished to do so, subject to the following conditions:
+	The above copyright notice and this permission notice shall be included in
+	all copies or substantial portions of the Software.
+	THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+	IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+	FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+	AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+	LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+	OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+	THE SOFTWARE.
+	
+*/
+/* phpMQTT */
+class Core {
+	private $socket; 			/* holds the socket	*/
+	private $msgid = 1;			/* counter for message id */
+	public $keepalive = 10;		/* default keepalive timmer */
+	public $timesinceping;		/* host unix time, used to detect disconects */
+	public $topics = array(); 	/* used to store currently subscribed topics */
+	public $debug = false;		/* should output debug messages */
+	public $address;			/* broker address */
+	public $port;				/* broker port */
+	public $clientid;			/* client id sent to brocker */
+	public $will;				/* stores the will of the client */
+	private $username;			/* stores username */
+	private $password;			/* stores password */
+	public $cafile;
+	function __construct($address, $port, $clientid, $cafile = NULL){
+		$this->broker($address, $port, $clientid, $cafile);
+	}
+	/* sets the broker details */
+	function broker($address, $port, $clientid, $cafile = NULL){
+		$this->address = $address;
+		$this->port = $port;
+		$this->clientid = $clientid;
+		$this->cafile = $cafile;
+	}
+	function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){
+		while($this->connect($clean, $will, $username, $password)==false){
+			sleep(10);
+		}
+		return true;
+	}
+	/* connects to the broker 
+		inputs: $clean: should the client send a clean session flag */
+	function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){
+		
+		if($will) $this->will = $will;
+		if($username) $this->username = $username;
+		if($password) $this->password = $password;
+		if ($this->cafile) {
+			$socketContext = stream_context_create(["ssl" => [
+				"verify_peer_name" => true,
+				"cafile" => $this->cafile
+				]]);
+			$this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext);
+		} else {
+			$this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT);
+		}
+		if (!$this->socket ) {
+		    if($this->debug) error_log("stream_socket_create() $errno, $errstr \n");
+			return false;
+		}
+		stream_set_timeout($this->socket, 5);
+		stream_set_blocking($this->socket, 0);
+		$i = 0;
+		$buffer = "";
+		$buffer .= chr(0x00); $i++;
+		$buffer .= chr(0x06); $i++;
+		$buffer .= chr(0x4d); $i++;
+		$buffer .= chr(0x51); $i++;
+		$buffer .= chr(0x49); $i++;
+		$buffer .= chr(0x73); $i++;
+		$buffer .= chr(0x64); $i++;
+		$buffer .= chr(0x70); $i++;
+		$buffer .= chr(0x03); $i++;
+		//No Will
+		$var = 0;
+		if($clean) $var+=2;
+		//Add will info to header
+		if($this->will != NULL){
+			$var += 4; // Set will flag
+			$var += ($this->will['qos'] << 3); //Set will qos
+			if($this->will['retain'])	$var += 32; //Set will retain
+		}
+		if($this->username != NULL) $var += 128;	//Add username to header
+		if($this->password != NULL) $var += 64;	//Add password to header
+		$buffer .= chr($var); $i++;
+		//Keep alive
+		$buffer .= chr($this->keepalive >> 8); $i++;
+		$buffer .= chr($this->keepalive & 0xff); $i++;
+		$buffer .= $this->strwritestring($this->clientid,$i);
+		//Adding will to payload
+		if($this->will != NULL){
+			$buffer .= $this->strwritestring($this->will['topic'],$i);  
+			$buffer .= $this->strwritestring($this->will['content'],$i);
+		}
+		if($this->username) $buffer .= $this->strwritestring($this->username,$i);
+		if($this->password) $buffer .= $this->strwritestring($this->password,$i);
+		$head = "  ";
+		$head{0} = chr(0x10);
+		$head{1} = chr($i);
+		fwrite($this->socket, $head, 2);
+		fwrite($this->socket,  $buffer);
+	 	$string = $this->read(4);
+		if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){
+			if($this->debug) echo "Connected to Broker\n"; 
+		}else{	
+			error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n", 
+			                        ord($string{0}),ord($string{3})));
+			return false;
+		}
+		$this->timesinceping = time();
+		return true;
+	}
+	/* read: reads in so many bytes */
+	function read($int = 8192, $nb = false){
+		//	print_r(socket_get_status($this->socket));
+		
+		$string="";
+		$togo = $int;
+		
+		if($nb){
+			return fread($this->socket, $togo);
+		}
+			
+		while (!feof($this->socket) && $togo>0) {
+			$fread = fread($this->socket, $togo);
+			$string .= $fread;
+			$togo = $int - strlen($string);
+		}
+		
+	
+		
+		
+			return $string;
+	}
+	/* subscribe: subscribes to topics */
+	function subscribe($topics, $qos = 0){
+		$i = 0;
+		$buffer = "";
+		$id = $this->msgid;
+		$buffer .= chr($id >> 8);  $i++;
+		$buffer .= chr($id % 256);  $i++;
+		foreach($topics as $key => $topic){
+			$buffer .= $this->strwritestring($key,$i);
+			$buffer .= chr($topic["qos"]);  $i++;
+			$this->topics[$key] = $topic; 
+		}
+		$cmd = 0x80;
+		//$qos
+		$cmd +=	($qos << 1);
+		$head = chr($cmd);
+		$head .= chr($i);
+		
+		fwrite($this->socket, $head, 2);
+		fwrite($this->socket, $buffer, $i);
+		$string = $this->read(2);
+		
+		$bytes = ord(substr($string,1,1));
+		$string = $this->read($bytes);
+	}
+	/* ping: sends a keep alive ping */
+	function ping(){
+			$head = " ";
+			$head = chr(0xc0);		
+			$head .= chr(0x00);
+			fwrite($this->socket, $head, 2);
+			if($this->debug) echo "ping sent\n";
+	}
+	/* disconnect: sends a proper disconect cmd */
+	function disconnect(){
+			$head = " ";
+			$head{0} = chr(0xe0);		
+			$head{1} = chr(0x00);
+			fwrite($this->socket, $head, 2);
+	}
+	/* close: sends a proper disconect, then closes the socket */
+	function close(){
+	 	$this->disconnect();
+		stream_socket_shutdown($this->socket, STREAM_SHUT_WR);	
+	}
+	/* publish: publishes $content on a $topic */
+	function publish($topic, $content, $qos = 0, $retain = 0){
+		$i = 0;
+		$buffer = "";
+		$buffer .= $this->strwritestring($topic,$i);
+		//$buffer .= $this->strwritestring($content,$i);
+		if($qos){
+			$id = $this->msgid++;
+			$buffer .= chr($id >> 8);  $i++;
+		 	$buffer .= chr($id % 256);  $i++;
+		}
+		$buffer .= $content;
+		$i+=strlen($content);
+		$head = " ";
+		$cmd = 0x30;
+		if($qos) $cmd += $qos << 1;
+		if($retain) $cmd += 1;
+		$head{0} = chr($cmd);		
+		$head .= $this->setmsglength($i);
+		fwrite($this->socket, $head, strlen($head));
+		fwrite($this->socket, $buffer, $i);
+	}
+	/* message: processes a recieved topic */
+	function message($msg){
+		 	$tlen = (ord($msg{0})<<8) + ord($msg{1});
+			$topic = substr($msg,2,$tlen);
+			$msg = substr($msg,($tlen+2));
+			$found = 0;
+			foreach($this->topics as $key=>$top){
+				if( preg_match("/^".str_replace("#",".*",
+						str_replace("+","[^\/]*",
+							str_replace("/","\/",
+								str_replace("$",'\$',
+									$key))))."$/",$topic) ){
+					if(is_callable($top['function'])){
+						call_user_func($top['function'],$topic,$msg);
+						$found = 1;
+					}
+				}
+			}
+			if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n";
+	}
+	/* proc: the processing loop for an "allways on" client 
+		set true when you are doing other stuff in the loop good for watching something else at the same time */	
+	function proc( $loop = true){
+		if(1){
+			$sockets = array($this->socket);
+			$w = $e = NULL;
+			$cmd = 0;
+			
+				//$byte = fgetc($this->socket);
+			if(feof($this->socket)){
+				if($this->debug) echo "eof receive going to reconnect for good measure\n";
+				fclose($this->socket);
+				$this->connect_auto(false);
+				if(count($this->topics))
+					$this->subscribe($this->topics);	
+			}
+			
+			$byte = $this->read(1, true);
+			
+			if(!strlen($byte)){
+				if($loop){
+					usleep(100000);
+				}
+			 
+			}else{ 
+			
+				$cmd = (int)(ord($byte)/16);
+				if($this->debug) echo "Recevid: $cmd\n";
+				$multiplier = 1; 
+				$value = 0;
+				do{
+					$digit = ord($this->read(1));
+					$value += ($digit & 127) * $multiplier; 
+					$multiplier *= 128;
+					}while (($digit & 128) != 0);
+				if($this->debug) echo "Fetching: $value\n";
+				
+				if($value)
+					$string = $this->read($value);
+				
+				if($cmd){
+					switch($cmd){
+						case 3:
+							$this->message($string);
+						break;
+					}
+					$this->timesinceping = time();
+				}
+			}
+			if($this->timesinceping < (time() - $this->keepalive )){
+				if($this->debug) echo "not found something so ping\n";
+				$this->ping();	
+			}
+			
+			if($this->timesinceping<(time()-($this->keepalive*2))){
+				if($this->debug) echo "not seen a package in a while, disconnecting\n";
+				fclose($this->socket);
+				$this->connect_auto(false);
+				if(count($this->topics))
+					$this->subscribe($this->topics);
+			}
+		}
+		return 1;
+	}
+	/* getmsglength: */
+	function getmsglength(&$msg, &$i){
+		$multiplier = 1; 
+		$value = 0 ;
+		do{
+		  $digit = ord($msg{$i});
+		  $value += ($digit & 127) * $multiplier; 
+		  $multiplier *= 128;
+		  $i++;
+		}while (($digit & 128) != 0);
+		return $value;
+	}
+	/* setmsglength: */
+	function setmsglength($len){
+		$string = "";
+		do{
+		  $digit = $len % 128;
+		  $len = $len >> 7;
+		  // if there are more digits to encode, set the top bit of this digit
+		  if ( $len > 0 )
+		    $digit = ($digit | 0x80);
+		  $string .= chr($digit);
+		}while ( $len > 0 );
+		return $string;
+	}
+	/* strwritestring: writes a string to a buffer */
+	function strwritestring($str, &$i){
+		$ret = " ";
+		$len = strlen($str);
+		$msb = $len >> 8;
+		$lsb = $len % 256;
+		$ret = chr($msb);
+		$ret .= chr($lsb);
+		$ret .= $str;
+		$i += ($len+2);
+		return $ret;
+	}
+	function printstr($string){
+		$strlen = strlen($string);
+			for($j=0;$j<$strlen;$j++){
+				$num = ord($string{$j});
+				if($num > 31) 
+					$chr = $string{$j}; else $chr = " ";
+				printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr);
+			}
+	}
+}

+ 42 - 0
src/Api.php

@@ -0,0 +1,42 @@
+<?php
+namespace Mqtt\Src;
+
+use Dever;
+use Mqtt\Lib\Core;
+
+class Api
+{
+	public function __construct()
+	{
+		$id = Dever::input('id', 1);
+		$server = Dever::db('mqtt/server')->one($id);
+		$host = $server['host'];
+		$port = $server['port'];
+		$username = $server['username'];
+		$password = $server['password'];
+		$client_id = Dever::id();
+		$this->server = new Core($host, $port, $client_id);
+
+		if (!$this->server->connect(true, NULL, $username, $password)) {
+			Dever::log($client_id . ' timeout');
+			Dever::alert('timeout');
+		}
+	}
+
+	public function pub($key, $value, $qos = 0)
+	{
+		$this->server->publish($key, $value, $qos);
+		$this->server->close();
+		return true;
+	}
+
+	public function sub($key, $function, $qos = 0)
+	{
+		$topics[$key] = array('qos' => $qos, 'function' => $function);
+		$this->server->subscribe($topics, 0);
+		while($this->server->proc()){
+
+		}
+		$this->server->close();
+	}
+}