一切皆是流 
                                                    
                        
                    
                    
  
                    
                    一直沉溺于两个客户端之间的信息交流,不能自拔
因此写过两个版本的网络程序
但都不太满意,不满意的有两点
- 逻辑太复杂
- 不方便迭代开发
用起来心智负担很高
最近一段时间在整理 reactphp-framework相关的,想搞一些php异步框架的工具包,已经有好多个包了。
整理过程中,发现mysql-pool连接池的思路对实现两个客户端的交流很有启发。
连接池隐藏了底层的实现细节,只用关心几个方法就可以了,对mysql来说
实现两个方法
- query
- queryStream
就能满足在之上构建想要的东西,比如说ORM.
对与服务端和客户端交流来说,服务端和客户端都实现call方法,简单的调用call方法返回读写流就能在之上构建网络程序了,类似于这样。
call
- 服务端调用客户端
// 运行在服务端 伪代码
$pool = new Pool();
// 调用客户端
$stream = $pool->call(function($stream){
    // 这里代码运行在客户端
    $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, $clientId);
$stream->write('hello');
$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});
$stream->on('close', function(){
   echo "stream close\n";
});
- 客户端调用另外一个客户端
$client = new Client('server ip');
$stream = $client->call(function($stream){
    // 运行在另外一个客户端
     $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, $peerClientId);
$stream->write('hello');
$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});
$stream->on('close', function(){
   echo "stream close\n";
});
注意上面两个例子
- 服务端可以调用客户端
- 客户端可以调用客户端(通过服务端中转)
而客户端不可以调用服务端,服务端只是作为流量的中转,由于闭包里的代码可以自定义,这样某种程度上保护了服务端
上方的两个例子,是第三版的核心。在此基础上,能实现各种各样的网络程序。
基于此,可以抽象出下方的3种流
Stream
第一种 PortToPort
即端口流量转发,比如将一个客户端8022端口转发到另一个客户端的22端口。(实现ssh到该端口)
<?php
PortToPort::create($client, 'tcp')
// local 8022
->from(null, 8022)
//to another client
->to(
    'client_uuid',
    '127.0.0.1:22'
)->start();然后
ssh -p 8022 root@127.0.0.1就能登录到对端
第二种 StreamToPort
比如已经有了一个流,将这个流指向另一个客户端的某一端口
use React\Stream\ThroughStream;
$stream = new ThroughStream;
StreamToPort::create($client)
// form one uuid stream
->from('client_uuid', $stream)
->to('client_uuid', '127.0.0.1:8080')
->start();
$stream->write('hello world');
第三种 StreamToStream
已经有了两个流,将这两个流量互相转发,可以对一些流量进行桥接。
$stream1 = new ThroughStream;
$stream2 = new ThroughStream;
StreamToStream::create()->from($stream1)->toStream($stream2);
这三种流其中StreamToStream最为底层,StreamToPort是StreamToStream的上层,而PortToPort是StreamToPort的上层。而能构建出着这三种流,离不开上方的两个基础方法。
基于此,所有能抽象成流的数据,都可以将其转发到某处。
安全性
- 客户端到服务端的流量可以使用tls加密
- 客户端被调用安全性,使用 github.com/laravel/serializable-cl... 的这个包,除非完全信任对端,否则请设置
Client::$secretKey = 'xxxxx';安装
github.com/reactphp-framework/brid...
gitee.com/reactphp-framework/bridg...
Install
composer require reactphp-framework/bridge dev-master -vvvUsage
server
<?php
require __DIR__ . '/vendor/autoload.php';
use Reactphp\Framework\Bridge\Server;
use Reactphp\Framework\Bridge\Pool;
use Reactphp\Framework\Bridge\Verify\VerifyUuid;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use Reactphp\Framework\Bridge\TcpBridge;
Server::$debug = true;
$server = new Server(new VerifyUuid([
    '8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1',//value 是自定义的标识符,可以是空
    'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb' => '10.10.10.2',
    '41c5ee60-0628-4b11-9439-a10ba19cbcdd' => '10.10.10.3'
]), new TcpDecodeEncode);
$pool = new Pool($server, [
    'max_connections' => 20,
    'connection_timeout' => 2,
    'keep_alive' => 5,
    'wait_timeout' => 3
]);
$tcp = new TcpBridge('0.0.0.0:8010', $server);
client
<?php
require __DIR__ . '/vendor/autoload.php';
use Reactphp\Framework\Bridge\Client;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use React\EventLoop\Loop;
use function React\Async\async;
Client::$debug = true;
$uuid = $argv[1] ?? 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb';
echo "uuid: $uuid\n";
$uri = 'tcp://192.168.1.9:8010';
$client = new Client($uri, $uuid, new TcpDecodeEncode);
$client->start();server call client
// 调用客户端
$stream = $pool->call(function($stream){
    // 这里代码运行在客户端
    $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, [
    'uuid' => 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb',
]);
$stream->write('hello');
$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});
$stream->on('close', function(){
   echo "stream close\n";
});
client call client
$stream = $client->call(function($stream){
    // 运行在另外一个客户端
     $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, [
    'uuid' => '8d24e2ba-c6f8-4bb6-8838-cacd37f64165',
    // ‘something’ => '10.8.0.1'
]);
$stream->write('hello');
$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});
$stream->on('close', function(){
   echo "stream close\n";
});有趣的例子
在上方的服务端配置里有这样一段
'8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1'为什么有ip,php难不成可以转发ip流量(转发的是osi第三层,当然第二层也可以转发)吗,并实现ip互通吗,答案是可以的,不过仅限于linux。
- require  extensionpecl-tuntap- if build fail try remove TSRMLS_CC in tuntap.c
 
下面是个最小demo
server.php
<?php
require __DIR__ . '/vendor/autoload.php';
use Reactphp\Framework\Bridge\Server;
use Reactphp\Framework\Bridge\Pool;
use Reactphp\Framework\Bridge\Verify\VerifyUuid;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use Reactphp\Framework\Bridge\TcpBridge;
use React\EventLoop\Loop;
Server::$debug = true;
$server = new Server(new VerifyUuid([
    '8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1',
    'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb' => '10.10.10.2',
    '41c5ee60-0628-4b11-9439-a10ba19cbcdd' => '10.10.10.3'
]), new TcpDecodeEncode);
$pool = new Pool($server, [
    'max_connections' => 20,
    'connection_timeout' => 2,
    'keep_alive' => 5,
    'wait_timeout' => 3
]);
$tcp = new TcpBridge('0.0.0.0:8010', $server);
client.php 注意修改里面的ip
<?php
require __DIR__ . '/vendor/autoload.php';
use Reactphp\Framework\Bridge\Client;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use React\EventLoop\Loop;
use function React\Async\async;
Client::$debug = true;
$uuid = $argv[1] ?? 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb';
echo "uuid: $uuid\n";
$uri = 'tcp://192.168.1.9:8010';
$client = new Client($uri, $uuid, new TcpDecodeEncode);
$client->start();
function run_command($Command)
{
    echo '+ ', $Command, "\n";
    $rc = 0;
    passthru($Command, $rc);
    if ($rc != 0)
        echo '+ Command returned ', $rc, "\n";
    return ($rc == 0);
}
$client->on('controllerConnected', function ($data) use ($client) {
    $ip = $data['something'];
    $br = ((php_sapi_name() == 'cli') ? '' : '<br />');
    global $TUN;
    if (is_resource($TUN)) {
        return;
    }
    // Try to create a new TAP-Device
    if (!is_resource($TUN = tuntap_new('', TUNTAP_DEVICE_TUN)))
        die('Failed to create TAP-Device' . "\n");
    $Interface = tuntap_name($TUN);
    echo 'Created ', $Interface, "\n";
    run_command('ip link set ' . $Interface . ' up');
    run_command("ip addr add $ip/24 dev " . $Interface);
    run_command("iptables -t nat -D POSTROUTING -p all -d $ip/24 -j SNAT --to-source $ip");
    run_command("iptables -t nat -A POSTROUTING -p all -d $ip/24 -j SNAT --to-source $ip");
    // Read Frames from the device
    echo 'Waiting for frames...', $br, "\n";
    $ipTostreams = [];
    Loop::addReadStream($TUN, async(function ($TUN) use ($client, &$ipTostreams) {
        // Try to read next frame from device
        $Data = $buffer =  fread($TUN, 8192);
        $Data = substr($Data, 4);
        if (($Length = strlen($Data)) < 20) {
            trigger_error('IPv4-Frame too short');
            return false;
        }
        // Parse default header
        $Byte = ord($Data[0]);
        $ipVersion = (($Byte >> 4) & 0xF);
        $ipHeaderLength = ($Byte & 0xF);
        if ($ipVersion != 4) {
            trigger_error('IP-Frame is version ' . $ipVersion . ', NOT IPv4');
            return false;
        } elseif (($ipHeaderLength < 5) || ($ipHeaderLength * 4 > $Length)) {
            trigger_error('IPv4-Frame too short for header');
            return false;
        }
        $ipSourceAddress = (ord($Data[12]) << 24) | (ord($Data[13]) << 16) | (ord($Data[14]) << 8) | ord($Data[15]);
        $ipSourceAddress = long2ip($ipSourceAddress);
        echo "ipSourceAddress: $ipSourceAddress\n";
        $ipTargetAddress = (ord($Data[16]) << 24) | (ord($Data[17]) << 16) | (ord($Data[18]) << 8) | ord($Data[19]);
        $ipTargetAddress = long2ip($ipTargetAddress);
        echo "ipTargetAddress: $ipTargetAddress\n";
        if ($client->getStatus() !== 1) {
            echo "client not ready\n";
            if (isset($ipTostreams[$ipTargetAddress])) {
                echo "close stream\n";
                $ipTostreams[$ipTargetAddress]->close();
                unset($ipTostreams[$ipTargetAddress]);
            }
            return;
        }
        if (isset($ipTostreams[$ipTargetAddress])) {
            if ($ipTostreams[$ipTargetAddress] === '') {
                echo "stream is connecting\n";
            } else {
                echo "write to stream\n";
                $ipTostreams[$ipTargetAddress]->write($buffer);
            }
        } else {
            echo "create stream\n";
            $ipTostreams[$ipTargetAddress] = '';
            $stream = $client->call(function ($stream, $info) {
                global $TUN;
                if (!isset($TUN) || !is_resource($TUN)) {
                    Loop::futureTick(function () use ($stream) {
                        $stream->emit('error', [new \Exception('TUN not found')]);
                    });
                    return $stream;
                }
                $stream->on('data', function ($data) use ($TUN) {
                    fwrite($TUN, $data);
                });
                return $stream;
            }, [
                'something' => $ipTargetAddress
            ]);
            $stream->write($buffer);
            $stream->on('data', function ($data) use ($TUN) {
                fwrite($TUN, $data);
            });
            $stream->on('error', function ($e) {
                echo "file: " . $e->getFile() . "\n";
                echo "line: " . $e->getLine() . "\n";
                echo $e->getMessage() . "\n";
            });
            $stream->on('close', function () use (&$ipTostreams, $ipTargetAddress) {
                echo "tun stream close\n";
                unset($ipTostreams[$ipTargetAddress]);
            });
            $ipTostreams[$ipTargetAddress] = $stream;
        }
    }));
});
启动服务端
php server.php在两个客户端上分别启动
php client.php c4b34f0d-44fa-4ef5-9d28-ccef218d74fbphp client.php 41c5ee60-0628-4b11-9439-a10ba19cbcdd验证
in 10.10.10.3
ping 10.10.10.2你的linux ip互通后,假如你使用的是windows或mac,和linux在同一网段,可以修改路由策略访问ip 10.10.10.3
比如在mac上
route -n add -net 10.10.10.3 -netmask 255.255.255.0 '你的linuxip'然后ping下ip试试,是不是通了?
其它例子
在文件夹下examples
以上
License
MIT
本作品采用《CC 协议》,转载必须注明作者和本文链接
 
           jcc123 的个人博客
 jcc123 的个人博客
         
             
             
             
             
             
             
             
             
             
                     
                     
           
           关于 LearnKu
                关于 LearnKu
               
                     
                     
                     粤公网安备 44030502004330号
 粤公网安备 44030502004330号 
 
推荐文章: