mqtt 客户端
mqtt client class
<?php
namespace App\Servers;
/**
* Class MqttClient
* @package App\Servers
*/
class MqttClient
{
private $socket; /* holds the socket */
private $msgid = 1; /* counter for message id */
public $keepalive = 10; /* default keepalive timmer */
public $timesinceping; /* host unix time, used to detect disconects */
public $topics = []; /* used to store currently subscribed topics */
public $debug = false; /* should output debug messages */
public $address; /* broker address */
public $port; /* broker port */
public $client_id; /* client id sent to brocker */
public $will; /* stores the will of the client */
private $username; /* stores username */
private $password; /* stores password */
public function __construct($address, $port, $client_id = null)
{
if (!$client_id) {
$client_id = uniqid(microtime(true));
}
$this->address = $address;
$this->port = $port;
$this->client_id = $client_id;
}
/**
* connects to the broker
*
* @param bool $clean should the client send a clean session flag
* @param null $will
* @param null $username
* @param null $password
* @return bool
*/
public function connect($clean = true, $will = NULL, $username = NULL, $password = NULL)
{
if ($will) $this->will = $will;
if ($username) $this->username = $username;
if ($password) $this->password = $password;
$address = gethostbyname($this->address);
$this->socket = fsockopen($address, $this->port, $errno, $errstr, 60);
if (!$this->socket) {
error_log("fsockopen() $errno, $errstr \n");
return false;
}
stream_set_timeout($this->socket, 5);
stream_set_blocking($this->socket, 0);
$i = 0;
$buffer = "";
$buffer .= chr(0x00);
$i++;
$buffer .= chr(0x06);
$i++;
$buffer .= chr(0x4d);
$i++;
$buffer .= chr(0x51);
$i++;
$buffer .= chr(0x49);
$i++;
$buffer .= chr(0x73);
$i++;
$buffer .= chr(0x64);
$i++;
$buffer .= chr(0x70);
$i++;
$buffer .= chr(0x03);
$i++;
//No Will
$var = 0;
if ($clean) $var += 2;
//Add will info to header
if ($this->will != NULL) {
$var += 4; // Set will flag
$var += ($this->will['qos'] << 3); //Set will qos
if ($this->will['retain']) $var += 32; //Set will retain
}
if ($this->username != NULL) $var += 128; //Add username to header
if ($this->password != NULL) $var += 64; //Add password to header
$buffer .= chr($var);
$i++;
//Keep alive
$buffer .= chr($this->keepalive >> 8);
$i++;
$buffer .= chr($this->keepalive & 0xff);
$i++;
$buffer .= $this->strwritestring($this->client_id, $i);
//Adding will to payload
if ($this->will != NULL) {
$buffer .= $this->strwritestring($this->will['topic'], $i);
$buffer .= $this->strwritestring($this->will['content'], $i);
}
if ($this->username) $buffer .= $this->strwritestring($this->username, $i);
if ($this->password) $buffer .= $this->strwritestring($this->password, $i);
$head = "";
$head .= chr(0x10);
$head .= chr($i);
fwrite($this->socket, $head, 2);
fwrite($this->socket, $buffer);
$string = $this->read(4);
if (ord($string[0]) >> 4 == 2 && $string[3] == chr(0)) {
if ($this->debug) echo "Connected to Broker\n";
} else {
error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n",
ord($string[0]), ord($string[3])));
return false;
}
$this->timesinceping = time();
return true;
}
/**
* reads in so many bytes
*
* @param int $int
* @param bool $nb
* @return bool|string
*/
public function read($int = 8192, $nb = false)
{
// print_r(socket_get_status($this->socket));
$string = "";
$togo = $int;
if ($nb) {
return fread($this->socket, $togo);
}
while (!feof($this->socket) && $togo > 0) {
$fread = fread($this->socket, $togo);
$string .= $fread;
$togo = $int - strlen($string);
}
return $string;
}
/**
* subscribes to topics
*
* @param array $topics
* @param int $qos
*/
public function subscribe(array $topics, $qos = 0)
{
$i = 0;
$buffer = "";
$id = $this->msgid;
$buffer .= chr($id >> 8);
$i++;
$buffer .= chr($id % 256);
$i++;
foreach ($topics as $key => $topic) {
$buffer .= $this->strwritestring($key, $i);
$buffer .= chr($topic["qos"]);
$i++;
$this->topics[$key] = $topic;
}
$cmd = 0x80;
//$qos
$cmd += ($qos << 1);
$head = chr($cmd);
$head .= chr($i);
fwrite($this->socket, $head, 2);
fwrite($this->socket, $buffer, $i);
$string = $this->read(2);
$bytes = ord(substr($string, 1, 1));
$string = $this->read($bytes);
}
/**
* sends a keep alive ping
*/
public function ping()
{
$head = chr(0xc0);
$head .= chr(0x00);
fwrite($this->socket, $head, 2);
if ($this->debug) echo "ping sent\n";
}
/**
* sends a proper disconect cmd
*/
public function disconnect()
{
$head = chr(0xe0);
$head .= chr(0x00);
fwrite($this->socket, $head, 2);
}
/**
* sends a proper disconect, then closes the socket
*/
public function close()
{
$this->disconnect();
fclose($this->socket);
}
/**
* publishes $content on a $topic
*
* @param $topic
* @param $content
* @param int $qos
* @param int $retain
*/
public function publish($topic, $content, $qos = 0, $retain = 0)
{
$i = 0;
$buffer = "";
$buffer .= $this->strwritestring($topic, $i);
//$buffer .= $this->strwritestring($content,$i);
if ($qos) {
$id = $this->msgid++;
$buffer .= chr($id >> 8);
$i++;
$buffer .= chr($id % 256);
$i++;
}
$buffer .= $content;
$i += strlen($content);
$cmd = 0x30;
if ($qos) $cmd += $qos << 1;
if ($retain) $cmd += 1;
$head = chr($cmd);
$head .= $this->setmsglength($i);
fwrite($this->socket, $head, strlen($head));
fwrite($this->socket, $buffer, $i);
}
/**
* processes a recieved topic
*
* @param $msg
*/
public function message($msg)
{
$tlen = (ord($msg[0]) << 8) + ord($msg[1]);
$topic = substr($msg, 2, $tlen);
$msg = substr($msg, ($tlen + 2));
$found = 0;
foreach ($this->topics as $key => $top) {
if (preg_match("/^" . str_replace("#", ".*",
str_replace("+", "[^\/]*",
str_replace("/", "\/",
str_replace("$", '\$',
$key)))) . "$/", $topic)) {
if (is_string($top['function']) && function_exists($top['function'])) {
call_user_func($top['function'], $topic, $msg);
$found = 1;
} elseif (is_object($top['function'])) {
$top['function']($topic, $msg);
}
}
}
if ($this->debug && !$found) echo "msg recieved but no match in subscriptions\n";
}
/**
* the processing loop for an "allways on" client
*
* @param bool $loop set true when you are doing other stuff in the loop good for watching something else at the same time
* @return int
*/
public function proc($loop = true)
{
for (; ;) {
$sockets = array($this->socket);
$w = $e = NULL;
$cmd = 0;
//$byte = fgetc($this->socket);
if (feof($this->socket)) {
if ($this->debug) echo "eof receive going to reconnect for good measure\n";
fclose($this->socket);
$this->connect(false);
if (count($this->topics))
$this->subscribe($this->topics);
}
$byte = $this->read(1, true);
if (!strlen($byte)) {
if ($loop) {
usleep(100000);
}
} else {
$cmd = (int)(ord($byte) / 16);
if ($this->debug) echo "Recevid: $cmd\n";
$multiplier = 1;
$value = 0;
do {
$digit = ord($this->read(1));
$value += ($digit & 127) * $multiplier;
$multiplier *= 128;
} while (($digit & 128) != 0);
if ($this->debug) echo "Fetching: $value\n";
if ($value)
$string = $this->read($value, "fetch");
if ($cmd) {
switch ($cmd) {
case 3:
$this->message($string);
break;
}
$this->timesinceping = time();
}
}
if ($this->timesinceping < (time() - $this->keepalive)) {
if ($this->debug) echo "not found something so ping\n";
$this->ping();
}
if ($this->timesinceping < (time() - ($this->keepalive * 2))) {
if ($this->debug) echo "not seen a package in a while, disconnecting\n";
fclose($this->socket);
$this->connect(false);
if (count($this->topics))
$this->subscribe($this->topics);
}
}
return 1;
}
/**
* get message length
*
* @param $msg
* @param $i
* @return float|int
*/
public function getmsglength(&$msg, &$i)
{
$multiplier = 1;
$value = 0;
do {
$digit = ord($msg[$i]);
$value += ($digit & 127) * $multiplier;
$multiplier *= 128;
$i++;
} while (($digit & 128) != 0);
return $value;
}
/**
* set message length
*
* @param $len
* @return string
*/
public function setmsglength($len)
{
$string = "";
do {
$digit = $len % 128;
$len = $len >> 7;
// if there are more digits to encode, set the top bit of this digit
if ($len > 0)
$digit = ($digit | 0x80);
$string .= chr($digit);
} while ($len > 0);
return $string;
}
/**
* writes a string to a buffer
*
* @param $str
* @param $i
* @return string
*/
public function strwritestring($str, &$i)
{
$len = strlen($str);
$msb = $len >> 8;
$lsb = $len % 256;
$ret = chr($msb);
$ret .= chr($lsb);
$ret .= $str;
$i += ($len + 2);
return $ret;
}
/**
* print string
*
* @param $string
*/
public function printstr($string)
{
$strlen = strlen($string);
for ($j = 0; $j < $strlen; $j++) {
$num = ord($string[$j]);
if ($num > 31)
$chr = $string[$j]; else $chr = " ";
printf("%4d: %08b : 0x%02x : %s \n", $j, $num, $num, $chr);
}
}
}
- 发布示例
$server mqtt服务器地址,下同
$port 端口,下同<?php $mqtt = new MqttClient($server, $port); $mqtt->connect(); $mqtt->publish('topic', 'hello world!');
- 订阅示例
$mqtt = new MqttClient($server, $port); $mqtt->connect(); //#通配符订阅所有频道,function可用函数名回调 $topics['#'] = [ "qos" => 0, "function" => function ($topic, $message) { var_dump($topic, $message); } }]; $mqtt->subscribe($topics); $mqtt->proc();
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: