一切皆是流
一直沉溺于两个客户端之间的信息交流,不能自拔
因此写过两个版本的网络程序
但都不太满意,不满意的有两点
- 逻辑太复杂
- 不方便迭代开发
用起来心智负担很高
最近一段时间在整理 reactphp-framework 相关的,想搞一些 php 异步框架的工具包,已经有好多个包了。
整理过程中,发现 mysql-pool 连接池的思路对实现两个客户端的交流很有启发。
连接池隐藏了底层的实现细节,只用关心几个方法就可以了,对 mysql 来说
实现两个方法
- query
- queryStream
就能满足在之上构建想要的东西,比如说 ORM.
对与服务端和客户端交流来说,服务端和客户端都实现 call 方法,简单的调用 call 方法返回读写流就能在之上构建网络程序了,类似于这样。
call#
- 服务端调用客户端
// 运行在服务端 伪代码
$pool = new Pool();
// 调用客户端
$stream = $pool->call(function($stream){
// 这里代码运行在客户端
$stream->on('data',function($data) use ($stream) {
echo $data."\n"; // 收到hello
$stream->end('world');
});
return $stream;
}, $clientId);
$stream->write('hello');
$stream->on('data', function($data){
echo $data."\n"; // 收到world
});
$stream->on('close', function(){
echo "stream close\n";
});
- 客户端调用另外一个客户端
$client = new Client('server ip');
$stream = $client->call(function($stream){
// 运行在另外一个客户端
$stream->on('data',function($data) use ($stream) {
echo $data."\n"; // 收到hello
$stream->end('world');
});
return $stream;
}, $peerClientId);
$stream->write('hello');
$stream->on('data', function($data){
echo $data."\n"; // 收到world
});
$stream->on('close', function(){
echo "stream close\n";
});
注意上面两个例子
- 服务端可以调用客户端
- 客户端可以调用客户端(通过服务端中转)
而客户端不可以调用服务端,服务端只是作为流量的中转,由于闭包里的代码可以自定义,这样某种程度上保护了服务端
上方的两个例子,是第三版的核心。在此基础上,能实现各种各样的网络程序。
基于此,可以抽象出下方的 3 种流
Stream#
第一种 PortToPort#
即端口流量转发,比如将一个客户端 8022 端口转发到另一个客户端的 22 端口。(实现 ssh 到该端口)
<?php
PortToPort::create($client, 'tcp')
// local 8022
->from(null, 8022)
//to another client
->to(
'client_uuid',
'127.0.0.1:22'
)->start();
然后
ssh -p 8022 root@127.0.0.1
就能登录到对端
第二种 StreamToPort#
比如已经有了一个流,将这个流指向另一个客户端的某一端口
use React\Stream\ThroughStream;
$stream = new ThroughStream;
StreamToPort::create($client)
// form one uuid stream
->from('client_uuid', $stream)
->to('client_uuid', '127.0.0.1:8080')
->start();
$stream->write('hello world');
第三种 StreamToStream#
已经有了两个流,将这两个流量互相转发,可以对一些流量进行桥接。
$stream1 = new ThroughStream;
$stream2 = new ThroughStream;
StreamToStream::create()->from($stream1)->toStream($stream2);
这三种流其中 StreamToStream 最为底层,StreamToPort 是 StreamToStream 的上层,而 PortToPort 是 StreamToPort 的上层。而能构建出着这三种流,离不开上方的两个基础方法。
基于此,所有能抽象成流的数据,都可以将其转发到某处。
安全性#
- 客户端到服务端的流量可以使用 tls 加密
- 客户端被调用安全性,使用 github.com/laravel/serializable-cl... 的这个包,除非完全信任对端,否则请设置
Client::$secretKey = 'xxxxx';
安装#
github.com/reactphp-framework/brid...
gitee.com/reactphp-framework/bridg...
Install#
composer require reactphp-framework/bridge dev-master -vvv
Usage#
server#
<?php
require __DIR__ . '/vendor/autoload.php';
use Reactphp\Framework\Bridge\Server;
use Reactphp\Framework\Bridge\Pool;
use Reactphp\Framework\Bridge\Verify\VerifyUuid;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use Reactphp\Framework\Bridge\TcpBridge;
Server::$debug = true;
$server = new Server(new VerifyUuid([
'8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1',//value 是自定义的标识符,可以是空
'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb' => '10.10.10.2',
'41c5ee60-0628-4b11-9439-a10ba19cbcdd' => '10.10.10.3'
]), new TcpDecodeEncode);
$pool = new Pool($server, [
'max_connections' => 20,
'connection_timeout' => 2,
'keep_alive' => 5,
'wait_timeout' => 3
]);
$tcp = new TcpBridge('0.0.0.0:8010', $server);
client#
<?php
require __DIR__ . '/vendor/autoload.php';
use Reactphp\Framework\Bridge\Client;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use React\EventLoop\Loop;
use function React\Async\async;
Client::$debug = true;
$uuid = $argv[1] ?? 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb';
echo "uuid: $uuid\n";
$uri = 'tcp://192.168.1.9:8010';
$client = new Client($uri, $uuid, new TcpDecodeEncode);
$client->start();
server call client#
// 调用客户端
$stream = $pool->call(function($stream){
// 这里代码运行在客户端
$stream->on('data',function($data) use ($stream) {
echo $data."\n"; // 收到hello
$stream->end('world');
});
return $stream;
}, [
'uuid' => 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb',
]);
$stream->write('hello');
$stream->on('data', function($data){
echo $data."\n"; // 收到world
});
$stream->on('close', function(){
echo "stream close\n";
});
client call client#
$stream = $client->call(function($stream){
// 运行在另外一个客户端
$stream->on('data',function($data) use ($stream) {
echo $data."\n"; // 收到hello
$stream->end('world');
});
return $stream;
}, [
'uuid' => '8d24e2ba-c6f8-4bb6-8838-cacd37f64165',
// ‘something’ => '10.8.0.1'
]);
$stream->write('hello');
$stream->on('data', function($data){
echo $data."\n"; // 收到world
});
$stream->on('close', function(){
echo "stream close\n";
});
有趣的例子#
在上方的服务端配置里有这样一段
'8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1'
为什么有 ip,php 难不成可以转发 ip 流量 (转发的是 osi 第三层,当然第二层也可以转发) 吗,并实现 ip 互通吗,答案是可以的,不过仅限于 linux。
- require extensionpecl-tuntap
- if build fail try remove TSRMLS_CC in tuntap.c
下面是个最小 demo
server.php
<?php
require __DIR__ . '/vendor/autoload.php';
use Reactphp\Framework\Bridge\Server;
use Reactphp\Framework\Bridge\Pool;
use Reactphp\Framework\Bridge\Verify\VerifyUuid;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use Reactphp\Framework\Bridge\TcpBridge;
use React\EventLoop\Loop;
Server::$debug = true;
$server = new Server(new VerifyUuid([
'8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1',
'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb' => '10.10.10.2',
'41c5ee60-0628-4b11-9439-a10ba19cbcdd' => '10.10.10.3'
]), new TcpDecodeEncode);
$pool = new Pool($server, [
'max_connections' => 20,
'connection_timeout' => 2,
'keep_alive' => 5,
'wait_timeout' => 3
]);
$tcp = new TcpBridge('0.0.0.0:8010', $server);
client.php 注意修改里面的 ip
<?php
require __DIR__ . '/vendor/autoload.php';
use Reactphp\Framework\Bridge\Client;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use React\EventLoop\Loop;
use function React\Async\async;
Client::$debug = true;
$uuid = $argv[1] ?? 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb';
echo "uuid: $uuid\n";
$uri = 'tcp://192.168.1.9:8010';
$client = new Client($uri, $uuid, new TcpDecodeEncode);
$client->start();
function run_command($Command)
{
echo '+ ', $Command, "\n";
$rc = 0;
passthru($Command, $rc);
if ($rc != 0)
echo '+ Command returned ', $rc, "\n";
return ($rc == 0);
}
$client->on('controllerConnected', function ($data) use ($client) {
$ip = $data['something'];
$br = ((php_sapi_name() == 'cli') ? '' : '<br />');
global $TUN;
if (is_resource($TUN)) {
return;
}
// Try to create a new TAP-Device
if (!is_resource($TUN = tuntap_new('', TUNTAP_DEVICE_TUN)))
die('Failed to create TAP-Device' . "\n");
$Interface = tuntap_name($TUN);
echo 'Created ', $Interface, "\n";
run_command('ip link set ' . $Interface . ' up');
run_command("ip addr add $ip/24 dev " . $Interface);
run_command("iptables -t nat -D POSTROUTING -p all -d $ip/24 -j SNAT --to-source $ip");
run_command("iptables -t nat -A POSTROUTING -p all -d $ip/24 -j SNAT --to-source $ip");
// Read Frames from the device
echo 'Waiting for frames...', $br, "\n";
$ipTostreams = [];
Loop::addReadStream($TUN, async(function ($TUN) use ($client, &$ipTostreams) {
// Try to read next frame from device
$Data = $buffer = fread($TUN, 8192);
$Data = substr($Data, 4);
if (($Length = strlen($Data)) < 20) {
trigger_error('IPv4-Frame too short');
return false;
}
// Parse default header
$Byte = ord($Data[0]);
$ipVersion = (($Byte >> 4) & 0xF);
$ipHeaderLength = ($Byte & 0xF);
if ($ipVersion != 4) {
trigger_error('IP-Frame is version ' . $ipVersion . ', NOT IPv4');
return false;
} elseif (($ipHeaderLength < 5) || ($ipHeaderLength * 4 > $Length)) {
trigger_error('IPv4-Frame too short for header');
return false;
}
$ipSourceAddress = (ord($Data[12]) << 24) | (ord($Data[13]) << 16) | (ord($Data[14]) << 8) | ord($Data[15]);
$ipSourceAddress = long2ip($ipSourceAddress);
echo "ipSourceAddress: $ipSourceAddress\n";
$ipTargetAddress = (ord($Data[16]) << 24) | (ord($Data[17]) << 16) | (ord($Data[18]) << 8) | ord($Data[19]);
$ipTargetAddress = long2ip($ipTargetAddress);
echo "ipTargetAddress: $ipTargetAddress\n";
if ($client->getStatus() !== 1) {
echo "client not ready\n";
if (isset($ipTostreams[$ipTargetAddress])) {
echo "close stream\n";
$ipTostreams[$ipTargetAddress]->close();
unset($ipTostreams[$ipTargetAddress]);
}
return;
}
if (isset($ipTostreams[$ipTargetAddress])) {
if ($ipTostreams[$ipTargetAddress] === '') {
echo "stream is connecting\n";
} else {
echo "write to stream\n";
$ipTostreams[$ipTargetAddress]->write($buffer);
}
} else {
echo "create stream\n";
$ipTostreams[$ipTargetAddress] = '';
$stream = $client->call(function ($stream, $info) {
global $TUN;
if (!isset($TUN) || !is_resource($TUN)) {
Loop::futureTick(function () use ($stream) {
$stream->emit('error', [new \Exception('TUN not found')]);
});
return $stream;
}
$stream->on('data', function ($data) use ($TUN) {
fwrite($TUN, $data);
});
return $stream;
}, [
'something' => $ipTargetAddress
]);
$stream->write($buffer);
$stream->on('data', function ($data) use ($TUN) {
fwrite($TUN, $data);
});
$stream->on('error', function ($e) {
echo "file: " . $e->getFile() . "\n";
echo "line: " . $e->getLine() . "\n";
echo $e->getMessage() . "\n";
});
$stream->on('close', function () use (&$ipTostreams, $ipTargetAddress) {
echo "tun stream close\n";
unset($ipTostreams[$ipTargetAddress]);
});
$ipTostreams[$ipTargetAddress] = $stream;
}
}));
});
启动服务端
php server.php
在两个客户端上分别启动
php client.php c4b34f0d-44fa-4ef5-9d28-ccef218d74fb
php client.php 41c5ee60-0628-4b11-9439-a10ba19cbcdd
验证
in 10.10.10.3
ping 10.10.10.2
你的 linux ip 互通后,假如你使用的是 windows 或 mac,和 linux 在同一网段,可以修改路由策略访问 ip 10.10.10.3
比如在 mac 上
route -n add -net 10.10.10.3 -netmask 255.255.255.0 '你的linuxip'
然后 ping 下 ip 试试,是不是通了?
其它例子#
在文件夹下 examples
以上
License#
MIT
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: