编写 TCP 私有协议
使用php socket编写私有协议的tcp连接
使用pack 封包数据,unpack解包数据,不同数据类型和socket大小端转换
socket.php
<?php
class SocketFun
{
public $socket ;
public function __construct($socket)
{
$this->socket = $socket;
}
public function read_int8(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,1,MSG_WAITALL) ;
if($res==1){
$p_char = unpack("c",$buff);
return $p_char ; //return unpack("c",$buff);
}else {
echo socket_strerror(socket_last_error());
}
}
public function read_uint8(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,1,MSG_WAITALL) ;
if($res>0){
return unpack("C",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_int16(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,2,MSG_WAITALL) ;
if($res>0){
return unpack("s",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint16(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,2,MSG_WAITALL) ;
if($res>0){
return unpack("S",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint16_l(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,2,MSG_WAITALL) ;
if($res>0){
return unpack("v",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint16_b(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,2,MSG_WAITALL) ;
if($res>0){
return unpack("n",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_int32(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,4,MSG_WAITALL) ;
if($res>0){
$p_char = unpack("l",$buff) ;
return $p_char;
//return unpack("l",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint32(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,4,MSG_WAITALL) ;
if($res>0){
return unpack("L",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint32_l(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,4,MSG_WAITALL) ;
if($res>0){
return unpack("V",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint32_b(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,4,MSG_WAITALL) ;
if($res>0){
return unpack("N",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_int64(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,8,MSG_WAITALL) ;
if($res>0){
return unpack("q",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint64(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,8,MSG_WAITALL) ;
if($res>0){
return unpack("Q",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint64_l(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,8,MSG_WAITALL) ;
if($res>0){
return unpack("P",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_uint64_b(){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,8,MSG_WAITALL) ;
if($res>0){
return unpack("J",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
public function read_string($length,$type){
$buff = "" ;
$b = &$buff ;
$res = socket_recv($this->socket,$b,$length,MSG_WAITALL) ;
if($type){
$buff_g = gzuncompress($buff);
if($res>0){
return unpack("A*",$buff_g);
//return unpack("A*",$buff_g);
}else{
echo socket_strerror(socket_last_error());
}
}else{
if($res>0){
return unpack("A*",$buff);
//return unpack("A*",$buff);
}else{
echo socket_strerror(socket_last_error());
}
}
}
/**
* socket_write
*/
public function write_int8($value){
$buff_char = pack("c",$value);
$res = socket_write($this->socket,$buff_char,1) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint8($value){
$buff_char = pack("C",$value);
$res = socket_write($this->socket,$buff_char,1) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_int16($value){
$buff_char = pack("s",$value);
$res = socket_write($this->socket,$buff_char,2) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint16($value){
$buff_char = pack("S",$value);
$res = socket_write($this->socket,$buff_char,2) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint16_l($value){
$buff_char = pack("v",$value);
$res = socket_write($this->socket,$buff_char,2) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint16_b($value){
$buff_char = pack("n",$value);
$res = socket_write($this->socket,$buff_char,2) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_int32($value){
$buff_char = pack("l",$value);
$res = socket_write($this->socket,$buff_char,4) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint32($value){
$buff_char = pack("L",$value);
$res = socket_write($this->socket,$buff_char,4) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint32_l($value){
$buff_char = pack("V",$value);
$res = socket_write($this->socket,$buff_char,4) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint32_b($value){
$buff_char = pack("N",$value);
$res = socket_write($this->socket,$buff_char,4) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_int64($value){
$buff_char = pack("q",$value);
$res = socket_write($this->socket,$buff_char,8) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint64($value){
$buff_char = pack("Q",$value);
$res = socket_write($this->socket,$buff_char,8) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint64_l($value){
$buff_char = pack("P",$value);
$res = socket_write($this->socket,$buff_char,8) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_uint64_b($value){
$buff_char = pack("J",$value);
$res = socket_write($this->socket,$buff_char,8) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
public function write_string($value,$length){
$res = socket_write($this->socket,$value,strlen($value)) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
$len = $length-strlen($value) ;
if(strlen($value)<$length){
$buff_zero[$length-strlen($value)]=array(0);
$res = socket_write($this->socket,$buff_zero,$len) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
}
public function write_string_l($value,$length){
$res = socket_write($this->socket,$value,$length) ;
if(!($res>0)){
echo socket_strerror(socket_last_error());
}
}
function s_flush(){
flush();
}
/**
* 获取uuid
*/
public function gen_uuid() {
$uuid = array(
'time_low' => 0,
'time_mid' => 0,
'time_hi' => 0,
'clock_seq_hi' => 0,
'clock_seq_low' => 0,
'node' => array()
);
$uuid['time_low'] = mt_rand(0, 0xffff) + (mt_rand(0, 0xffff) << 16);
$uuid['time_mid'] = mt_rand(0, 0xffff);
$uuid['time_hi'] = (4 << 12) | (mt_rand(0, 0x1000));
$uuid['clock_seq_hi'] = (1 << 7) | (mt_rand(0, 128));
$uuid['clock_seq_low'] = mt_rand(0, 255);
for ($i = 0; $i < 6; $i++) {
$uuid['node'][$i] = mt_rand(0, 255);
}
$uuid = sprintf('%08x-%04x-%04x-%02x%02x-%02x%02x%02x%02x%02x%02x',
$uuid['time_low'],
$uuid['time_mid'],
$uuid['time_hi'],
$uuid['clock_seq_hi'],
$uuid['clock_seq_low'],
$uuid['node'][0],
$uuid['node'][1],
$uuid['node'][2],
$uuid['node'][3],
$uuid['node'][4],
$uuid['node'][5]
);
return $uuid;
}
}
采用多线程 发送心跳数据
thread.php
<?php
/**
* Implements threading in PHP
*
* @package <none>
* @version 1.0.0 - stable
* @author Tudor Barbu <miau@motane.lu>
* @copyright MIT
*/
class Thread {
const FUNCTION_NOT_CALLABLE = 10;
const COULD_NOT_FORK = 15;
/**
* possible errors
*
* @var array
*/
private $errors = array(
Thread::FUNCTION_NOT_CALLABLE => 'You must specify a valid function name that can be called from the current scope.',
Thread::COULD_NOT_FORK => 'pcntl_fork() returned a status of -1. No new process was created',
);
/**
* callback for the function that should
* run as a separate thread
*
* @var callback
*/
protected $runnable;
/**
* holds the current process id
*
* @var integer
*/
private $pid;
/**
* hodls exit code after child die
*/
private $exitCode = -1;
/**
* checks if threading is supported by the current
* PHP configuration
*
* @return boolean
*/
public static function available() {
$required_functions = array(
'pcntl_fork',
);
foreach( $required_functions as $function ) {
if ( !function_exists( $function ) ) {
return false;
}
}
return true;
}
/**
* class constructor - you can pass
* the callback function as an argument
*
* @param callback $_runnable
*/
public function __construct( $_runnable = null ) {
if( $_runnable !== null ) {
$this->setRunnable( $_runnable );
}
}
/**
* sets the callback
*
* @param callback $_runnable
* @return callback
*/
public function setRunnable( $_runnable ) {
if( self::runnableOk( $_runnable ) ) {
$this->runnable = $_runnable;
}
else {
throw new Exception( $this->getError( Thread::FUNCTION_NOT_CALLABLE ), Thread::FUNCTION_NOT_CALLABLE );
}
}
/**
* gets the callback
*
* @return callback
*/
public function getRunnable() {
return $this->runnable;
}
/**
* checks if the callback is ok (the function/method
* actually exists and is runnable from the current
* context)
*
* can be called statically
*
* @param callback $_runnable
* @return boolean
*/
public static function runnableOk( $_runnable ) {
return ( function_exists( $_runnable ) && is_callable( $_runnable ) );
}
/**
* returns the process id (pid) of the simulated thread
*
* @return int
*/
public function getPid() {
return $this->pid;
}
/**
* checks if the child thread is alive
*
* @return boolean
*/
public function isAlive() {
$pid = pcntl_waitpid( $this->pid, $status, WNOHANG );
if ($pid === 0) { // child is still alive
return true;
} else {
if (pcntl_wifexited($status) && $this->exitCode == -1) { // normal exit
$this->exitCode = pcntl_wexitstatus($status);
}
return false;
}
}
/**
* return exit code of child (-1 if child is still alive)
*
* @return int
*/
public function getExitCode() {
$this->isAlive();
return $this->exitCode;
}
/**
* starts the thread, all the parameters are
* passed to the callback function
*
* @return void
*/
public function start() {
$pid = @ pcntl_fork();
if( $pid == -1 ) {
throw new Exception( $this->getError( Thread::COULD_NOT_FORK ), Thread::COULD_NOT_FORK );
}
if( $pid ) {
// parent
$this->pid = $pid;
}
else {
// child
pcntl_signal( SIGTERM, array( $this, 'signalHandler' ) );
$arguments = func_get_args();
if ( !empty( $arguments ) ) {
call_user_func_array( $this->runnable, $arguments );
}
else {
call_user_func( $this->runnable );
}
exit( 0 );
}
}
/**
* attempts to stop the thread
* returns true on success and false otherwise
*
* @param integer $_signal - SIGKILL/SIGTERM
* @param boolean $_wait
*/
public function stop( $_signal = SIGKILL, $_wait = false ) {
if( $this->isAlive() ) {
posix_kill( $this->pid, $_signal );
if( $_wait ) {
pcntl_waitpid( $this->pid, $status = 0 );
}
}
}
/**
* alias of stop();
*
* @return boolean
*/
public function kill( $_signal = SIGKILL, $_wait = false ) {
return $this->stop( $_signal, $_wait );
}
/**
* gets the error's message based on
* its id
*
* @param integer $_code
* @return string
*/
public function getError( $_code ) {
if ( isset( $this->errors[$_code] ) ) {
return $this->errors[$_code];
}
else {
return 'No such error code ' . $_code . '! Quit inventing errors!!!';
}
}
/**
* signal handler
*
* @param integer $_signal
*/
protected function signalHandler( $_signal ) {
switch( $_signal ) {
case SIGTERM:
exit( 0 );
break;
}
}
}
// EOF
php socket编写业务
require "thread.php";
require "socket.php";
$GLOBALS['socket']=socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
$GLOBALS['res'] = socket_connect($socket,'ip address',6868); // 创建连接
if(!$GLOBALS['res'] ){
echo "connect failure" ;
die();
}
$socket = new SocketFun($GLOBALS['socket']) ;
// 根据私有协议中的字段按照顺序和类型 write 如:写入一个8bytes的int 0:$socket->write_int8(0)
$socket->write_int8(0) ;
$socket->write_uint32_l($len) ;
$socket->write_int8(7) ;
$socket->write_string_l($uuid,32) ;
$socket->write_int8(1) ;
$socket->write_int8(0) ;
$socket->write_int8(0) ;
$socket->write_int8(0);
$socket->write_string($str,strlen($str)) ;
$GLOBALS['HEART_DATA']=10;
$msg=date("Y-m-d H:i:s");
echo("before").PHP_EOL ;
$socket = new SocketFun($socket) ;
//接收数据 demo
function doRecvData($res, $t) {
do{
echo "SendData".PHP_EOL ;
$ss = new SocketFun($GLOBALS['socket']) ;
$m_split = $ss->read_int8() ;
$m_length = $ss->read_int32() ;
$m_type = $ss->read_int8() ;
$m_requestid = $ss->read_string(32,0) ;
$m_version = $ss->read_int8() ;
$m_cipher = $ss->read_int8() ;
$m_replyCipher = $ss->read_int8() ;
$m_compress = $ss->read_int8();
$len3 = $m_length[1]-42 ;
$m_json="";
if($m_type[1]==8){
if($m_compress[1]==1){ // 返回压缩数据
$m_json = $ss->read_string($len3,1) ;
}else{
$m_json = $ss->read_string($len3,0) ;
}
$s_data = json_decode($m_json[1]) ;
if(isset($s_data->data)){ // isset($s_data->data)
$socket_data = $s_data->data ;
if($socket_data!=null){
$GLOBALS['datas']=$socket_data ;
$num = $s_data->size;
if($num>400){
}else{
$data_value = json_encode($socket_data) ;
echo $data_value.PHP_EOL;
echo $num.PHP_EOL;
$result = request_by_post('http://10.10.83.231/subscribe/',json_encode($data_value));
}
}
}
}
}while(true);
usleep($t);
exit($res);
}
// 发送心跳 demo
function doSendHeartData($res, $t) {
do{
echo "SendHeartData" ;
$ss = new SocketFun($GLOBALS['socket']);
$uuid = $ss->gen_uuid() ;
$ss->write_int8(0) ;
$ss->write_uint32_l(42) ;
$ss->write_int8(3) ;
$ss->write_string_l($uuid,32) ;
$ss->write_int8(1) ;
$ss->write_int8(0) ;
$ss->write_int8(0) ;
$ss->write_int8(0);
echo " END".PHP_EOL ;
sleep($GLOBALS['HEART_DATA']);
}while(true);
usleep($t);
exit($res);
}
$thread_recv = new Thread('doRecvData');
$thread_recv->start(1, 30);
$thread_send_heartData = new Thread('doSendHeartData');
$thread_send_heartData->start(2, 100);
while ($thread_recv->isAlive(1) || $thread_send_heartData->isAlive(2));
echo "receive thread exit " . $thread_recv->getExitCode() . "\n";
echo "send heart data thread exit " . $thread_send_heartData->getExitCode() . "\n";
本作品采用《CC 协议》,转载必须注明作者和本文链接
太过典型的程序思维,以为贴了代码(E.g)别人就懂了~其实教学文章不是这么简单的事情。
还不如看workerman源码