一切皆是流

一直沉溺于两个客户端之间的信息交流,不能自拔

因此写过两个版本的网络程序

但都不太满意,不满意的有两点

  • 逻辑太复杂
  • 不方便迭代开发

用起来心智负担很高

最近一段时间在整理 reactphp-framework 相关的,想搞一些 php 异步框架的工具包,已经有好多个包了。

整理过程中,发现 mysql-pool 连接池的思路对实现两个客户端的交流很有启发。

连接池隐藏了底层的实现细节,只用关心几个方法就可以了,对 mysql 来说
实现两个方法

  • query
  • queryStream

就能满足在之上构建想要的东西,比如说 ORM.

对与服务端和客户端交流来说,服务端和客户端都实现 call 方法,简单的调用 call 方法返回读写流就能在之上构建网络程序了,类似于这样。

call#

  • 服务端调用客户端
// 运行在服务端 伪代码
$pool = new Pool();
// 调用客户端
$stream = $pool->call(function($stream){
    // 这里代码运行在客户端
    $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, $clientId);

$stream->write('hello');

$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});

$stream->on('close', function(){
   echo "stream close\n";
});
  • 客户端调用另外一个客户端
$client = new Client('server ip');

$stream = $client->call(function($stream){
    // 运行在另外一个客户端
     $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, $peerClientId);
$stream->write('hello');

$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});

$stream->on('close', function(){
   echo "stream close\n";
});

注意上面两个例子

  • 服务端可以调用客户端
  • 客户端可以调用客户端(通过服务端中转)

而客户端不可以调用服务端,服务端只是作为流量的中转,由于闭包里的代码可以自定义,这样某种程度上保护了服务端

上方的两个例子,是第三版的核心。在此基础上,能实现各种各样的网络程序。

基于此,可以抽象出下方的 3 种流

Stream#

第一种 PortToPort#

即端口流量转发,比如将一个客户端 8022 端口转发到另一个客户端的 22 端口。(实现 ssh 到该端口)

<?php

PortToPort::create($client, 'tcp')
// local 8022
->from(null, 8022)
//to another client
->to(
    'client_uuid',
    '127.0.0.1:22'
)->start();

然后

ssh -p 8022 root@127.0.0.1

就能登录到对端

第二种 StreamToPort#

比如已经有了一个流,将这个流指向另一个客户端的某一端口

use React\Stream\ThroughStream;

$stream = new ThroughStream;

StreamToPort::create($client)
// form one uuid stream
->from('client_uuid', $stream)
->to('client_uuid', '127.0.0.1:8080')
->start();

$stream->write('hello world');

第三种 StreamToStream#

已经有了两个流,将这两个流量互相转发,可以对一些流量进行桥接。

$stream1 = new ThroughStream;
$stream2 = new ThroughStream;

StreamToStream::create()->from($stream1)->toStream($stream2);

这三种流其中 StreamToStream 最为底层,StreamToPort 是 StreamToStream 的上层,而 PortToPort 是 StreamToPort 的上层。而能构建出着这三种流,离不开上方的两个基础方法。

基于此,所有能抽象成流的数据,都可以将其转发到某处。

安全性#

  • 客户端到服务端的流量可以使用 tls 加密
  • 客户端被调用安全性,使用 github.com/laravel/serializable-cl... 的这个包,除非完全信任对端,否则请设置
Client::$secretKey = 'xxxxx';

安装#

github.com/reactphp-framework/brid...

gitee.com/reactphp-framework/bridg...

Install#

composer require reactphp-framework/bridge dev-master -vvv

Usage#

server#

<?php

require __DIR__ . '/vendor/autoload.php';

use Reactphp\Framework\Bridge\Server;
use Reactphp\Framework\Bridge\Pool;
use Reactphp\Framework\Bridge\Verify\VerifyUuid;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use Reactphp\Framework\Bridge\TcpBridge;

Server::$debug = true;

$server = new Server(new VerifyUuid([
    '8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1',//value 是自定义的标识符,可以是空
    'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb' => '10.10.10.2',
    '41c5ee60-0628-4b11-9439-a10ba19cbcdd' => '10.10.10.3'
]), new TcpDecodeEncode);



$pool = new Pool($server, [
    'max_connections' => 20,
    'connection_timeout' => 2,
    'keep_alive' => 5,
    'wait_timeout' => 3
]);

$tcp = new TcpBridge('0.0.0.0:8010', $server);

client#

<?php

require __DIR__ . '/vendor/autoload.php';

use Reactphp\Framework\Bridge\Client;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use React\EventLoop\Loop;
use function React\Async\async;

Client::$debug = true;
$uuid = $argv[1] ?? 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb';
echo "uuid: $uuid\n";

$uri = 'tcp://192.168.1.9:8010';
$client = new Client($uri, $uuid, new TcpDecodeEncode);
$client->start();

server call client#

// 调用客户端
$stream = $pool->call(function($stream){
    // 这里代码运行在客户端
    $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, [
    'uuid' => 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb',
]);

$stream->write('hello');

$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});

$stream->on('close', function(){
   echo "stream close\n";
});

client call client#

$stream = $client->call(function($stream){
    // 运行在另外一个客户端
     $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, [
    'uuid' => '8d24e2ba-c6f8-4bb6-8838-cacd37f64165',
    // ‘something’ => '10.8.0.1'
]);
$stream->write('hello');

$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});

$stream->on('close', function(){
   echo "stream close\n";
});

有趣的例子#

在上方的服务端配置里有这样一段

'8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1'

为什么有 ip,php 难不成可以转发 ip 流量 (转发的是 osi 第三层,当然第二层也可以转发) 吗,并实现 ip 互通吗,答案是可以的,不过仅限于 linux。

  • require extensionpecl-tuntap
    • if build fail try remove TSRMLS_CC in tuntap.c

下面是个最小 demo

server.php

<?php

require __DIR__ . '/vendor/autoload.php';

use Reactphp\Framework\Bridge\Server;
use Reactphp\Framework\Bridge\Pool;
use Reactphp\Framework\Bridge\Verify\VerifyUuid;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use Reactphp\Framework\Bridge\TcpBridge;
use React\EventLoop\Loop;

Server::$debug = true;

$server = new Server(new VerifyUuid([
    '8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1',
    'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb' => '10.10.10.2',
    '41c5ee60-0628-4b11-9439-a10ba19cbcdd' => '10.10.10.3'
]), new TcpDecodeEncode);



$pool = new Pool($server, [
    'max_connections' => 20,
    'connection_timeout' => 2,
    'keep_alive' => 5,
    'wait_timeout' => 3
]);

$tcp = new TcpBridge('0.0.0.0:8010', $server);

client.php 注意修改里面的 ip

<?php

require __DIR__ . '/vendor/autoload.php';

use Reactphp\Framework\Bridge\Client;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use React\EventLoop\Loop;
use function React\Async\async;

Client::$debug = true;
$uuid = $argv[1] ?? 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb';
echo "uuid: $uuid\n";

$uri = 'tcp://192.168.1.9:8010';
$client = new Client($uri, $uuid, new TcpDecodeEncode);
$client->start();



function run_command($Command)
{
    echo '+ ', $Command, "\n";

    $rc = 0;

    passthru($Command, $rc);

    if ($rc != 0)
        echo '+ Command returned ', $rc, "\n";

    return ($rc == 0);
}

$client->on('controllerConnected', function ($data) use ($client) {

    $ip = $data['something'];
    $br = ((php_sapi_name() == 'cli') ? '' : '<br />');

    global $TUN;

    if (is_resource($TUN)) {
        return;
    }

    // Try to create a new TAP-Device
    if (!is_resource($TUN = tuntap_new('', TUNTAP_DEVICE_TUN)))
        die('Failed to create TAP-Device' . "\n");

    $Interface = tuntap_name($TUN);

    echo 'Created ', $Interface, "\n";

    run_command('ip link set ' . $Interface . ' up');
    run_command("ip addr add $ip/24 dev " . $Interface);
    run_command("iptables -t nat -D POSTROUTING -p all -d $ip/24 -j SNAT --to-source $ip");
    run_command("iptables -t nat -A POSTROUTING -p all -d $ip/24 -j SNAT --to-source $ip");

    // Read Frames from the device
    echo 'Waiting for frames...', $br, "\n";


    $ipTostreams = [];

    Loop::addReadStream($TUN, async(function ($TUN) use ($client, &$ipTostreams) {
        // Try to read next frame from device
        $Data = $buffer =  fread($TUN, 8192);
        $Data = substr($Data, 4);
        if (($Length = strlen($Data)) < 20) {
            trigger_error('IPv4-Frame too short');

            return false;
        }

        // Parse default header
        $Byte = ord($Data[0]);
        $ipVersion = (($Byte >> 4) & 0xF);
        $ipHeaderLength = ($Byte & 0xF);

        if ($ipVersion != 4) {
            trigger_error('IP-Frame is version ' . $ipVersion . ', NOT IPv4');

            return false;
        } elseif (($ipHeaderLength < 5) || ($ipHeaderLength * 4 > $Length)) {
            trigger_error('IPv4-Frame too short for header');

            return false;
        }
        $ipSourceAddress = (ord($Data[12]) << 24) | (ord($Data[13]) << 16) | (ord($Data[14]) << 8) | ord($Data[15]);
        $ipSourceAddress = long2ip($ipSourceAddress);
        echo "ipSourceAddress: $ipSourceAddress\n";
        $ipTargetAddress = (ord($Data[16]) << 24) | (ord($Data[17]) << 16) | (ord($Data[18]) << 8) | ord($Data[19]);
        $ipTargetAddress = long2ip($ipTargetAddress);
        echo "ipTargetAddress: $ipTargetAddress\n";

        if ($client->getStatus() !== 1) {
            echo "client not ready\n";
            if (isset($ipTostreams[$ipTargetAddress])) {
                echo "close stream\n";
                $ipTostreams[$ipTargetAddress]->close();
                unset($ipTostreams[$ipTargetAddress]);
            }
            return;
        }

        if (isset($ipTostreams[$ipTargetAddress])) {
            if ($ipTostreams[$ipTargetAddress] === '') {
                echo "stream is connecting\n";
            } else {
                echo "write to stream\n";
                $ipTostreams[$ipTargetAddress]->write($buffer);
            }
        } else {
            echo "create stream\n";
            $ipTostreams[$ipTargetAddress] = '';
            $stream = $client->call(function ($stream, $info) {
                global $TUN;
                if (!isset($TUN) || !is_resource($TUN)) {
                    Loop::futureTick(function () use ($stream) {
                        $stream->emit('error', [new \Exception('TUN not found')]);
                    });
                    return $stream;
                }
                $stream->on('data', function ($data) use ($TUN) {
                    fwrite($TUN, $data);
                });
                return $stream;
            }, [
                'something' => $ipTargetAddress
            ]);

            $stream->write($buffer);


            $stream->on('data', function ($data) use ($TUN) {
                fwrite($TUN, $data);
            });

            $stream->on('error', function ($e) {
                echo "file: " . $e->getFile() . "\n";
                echo "line: " . $e->getLine() . "\n";
                echo $e->getMessage() . "\n";
            });

            $stream->on('close', function () use (&$ipTostreams, $ipTargetAddress) {
                echo "tun stream close\n";
                unset($ipTostreams[$ipTargetAddress]);
            });
            $ipTostreams[$ipTargetAddress] = $stream;
        }
    }));
});

启动服务端

php server.php

在两个客户端上分别启动

php client.php c4b34f0d-44fa-4ef5-9d28-ccef218d74fb
php client.php 41c5ee60-0628-4b11-9439-a10ba19cbcdd

验证
in 10.10.10.3

ping 10.10.10.2

你的 linux ip 互通后,假如你使用的是 windows 或 mac,和 linux 在同一网段,可以修改路由策略访问 ip 10.10.10.3

比如在 mac 上

route -n add -net 10.10.10.3 -netmask 255.255.255.0 '你的linuxip'

然后 ping 下 ip 试试,是不是通了?

其它例子#

在文件夹下 examples

以上

License#

MIT

本作品采用《CC 协议》,转载必须注明作者和本文链接
Make everything simple instead of making difficulties as simple as possible
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
讨论数量: 3

远程调用另一个服务的方法,看起来有点微服务的感觉。 我对微服务只是略懂,感觉了解微服务的概念和简单的使用是比较简单的,有很多现成的框架和实例可以学习。 但是应用的规模复杂后 debug 和 测试可能会很麻烦,也很少有人分享这方面的知识和经验。 比如 api 会有接口文档出 bug 可以通过查看接口文档的方式确认是不是参数有误,rpc 出 bug , 单客户端就不太好确定是服务的问题还是参数的问题。 然后比如测试,任意一个服务都有出问题的可能,测试的时候是否需要针对这些特例编写用例进行测试。如果不需要的话,通过测试的功能如何保证上线不会出问题。

1年前 评论
zds

有现成的 rpc 协议和封装好的 client

1年前 评论
jcc123

感谢反馈

是否是 rpc?

不是 rpc,但在之上可以构建出 rpc。

关于 debug

这个包提供了通过事件监听 error。

$stream->on('error', function(\Exception $e){
   echo "error:" $e->getMessage()."\n";
});
  • 一种是 timeout。连接对端超时,说明对端不在线或服务不可用。
  • 一种是对端的 exception, 执行闭包代码的错误信息。 如果还定位不到错误,终端会输出沟通信息。

关于测试

就像写注释一样~

当然,对于这种下层包,测试是必不可少的。 目前在 dev-master, 也许会一直在 dev-master,不建于用于生产环境中。 想体验一把或者用于个人项目尝尝鲜,还是非常推荐的。

1年前 评论