一切皆是流

一直沉溺于两个客户端之间的信息交流,不能自拔

因此写过两个版本的网络程序

但都不太满意,不满意的有两点

  • 逻辑太复杂
  • 不方便迭代开发

用起来心智负担很高

最近一段时间在整理 reactphp-framework相关的,想搞一些php异步框架的工具包,已经有好多个包了。

整理过程中,发现mysql-pool连接池的思路对实现两个客户端的交流很有启发。

连接池隐藏了底层的实现细节,只用关心几个方法就可以了,对mysql来说
实现两个方法

  • query
  • queryStream

就能满足在之上构建想要的东西,比如说ORM.

对与服务端和客户端交流来说,服务端和客户端都实现call方法,简单的调用call方法返回读写流就能在之上构建网络程序了,类似于这样。

call

  • 服务端调用客户端
// 运行在服务端 伪代码
$pool = new Pool();
// 调用客户端
$stream = $pool->call(function($stream){
    // 这里代码运行在客户端
    $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, $clientId);

$stream->write('hello');

$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});

$stream->on('close', function(){
   echo "stream close\n";
});
  • 客户端调用另外一个客户端
$client = new Client('server ip');

$stream = $client->call(function($stream){
    // 运行在另外一个客户端
     $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, $peerClientId);
$stream->write('hello');

$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});

$stream->on('close', function(){
   echo "stream close\n";
});

注意上面两个例子

  • 服务端可以调用客户端
  • 客户端可以调用客户端(通过服务端中转)

而客户端不可以调用服务端,服务端只是作为流量的中转,由于闭包里的代码可以自定义,这样某种程度上保护了服务端

上方的两个例子,是第三版的核心。在此基础上,能实现各种各样的网络程序。

基于此,可以抽象出下方的3种流

Stream

第一种 PortToPort

即端口流量转发,比如将一个客户端8022端口转发到另一个客户端的22端口。(实现ssh到该端口)

<?php

PortToPort::create($client, 'tcp')
// local 8022
->from(null, 8022)
//to another client
->to(
    'client_uuid',
    '127.0.0.1:22'
)->start();

然后

ssh -p 8022 root@127.0.0.1

就能登录到对端

第二种 StreamToPort

比如已经有了一个流,将这个流指向另一个客户端的某一端口

use React\Stream\ThroughStream;

$stream = new ThroughStream;

StreamToPort::create($client)
// form one uuid stream
->from('client_uuid', $stream)
->to('client_uuid', '127.0.0.1:8080')
->start();

$stream->write('hello world');

第三种 StreamToStream

已经有了两个流,将这两个流量互相转发,可以对一些流量进行桥接。

$stream1 = new ThroughStream;
$stream2 = new ThroughStream;

StreamToStream::create()->from($stream1)->toStream($stream2);

这三种流其中StreamToStream最为底层,StreamToPort是StreamToStream的上层,而PortToPort是StreamToPort的上层。而能构建出着这三种流,离不开上方的两个基础方法。

基于此,所有能抽象成流的数据,都可以将其转发到某处。

安全性

Client::$secretKey = 'xxxxx';

安装

github.com/reactphp-framework/brid...

gitee.com/reactphp-framework/bridg...

Install

composer require reactphp-framework/bridge dev-master -vvv

Usage

server

<?php

require __DIR__ . '/vendor/autoload.php';

use Reactphp\Framework\Bridge\Server;
use Reactphp\Framework\Bridge\Pool;
use Reactphp\Framework\Bridge\Verify\VerifyUuid;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use Reactphp\Framework\Bridge\TcpBridge;

Server::$debug = true;

$server = new Server(new VerifyUuid([
    '8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1',//value 是自定义的标识符,可以是空
    'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb' => '10.10.10.2',
    '41c5ee60-0628-4b11-9439-a10ba19cbcdd' => '10.10.10.3'
]), new TcpDecodeEncode);



$pool = new Pool($server, [
    'max_connections' => 20,
    'connection_timeout' => 2,
    'keep_alive' => 5,
    'wait_timeout' => 3
]);

$tcp = new TcpBridge('0.0.0.0:8010', $server);

client

<?php

require __DIR__ . '/vendor/autoload.php';

use Reactphp\Framework\Bridge\Client;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use React\EventLoop\Loop;
use function React\Async\async;

Client::$debug = true;
$uuid = $argv[1] ?? 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb';
echo "uuid: $uuid\n";

$uri = 'tcp://192.168.1.9:8010';
$client = new Client($uri, $uuid, new TcpDecodeEncode);
$client->start();

server call client

// 调用客户端
$stream = $pool->call(function($stream){
    // 这里代码运行在客户端
    $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, [
    'uuid' => 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb',
]);

$stream->write('hello');

$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});

$stream->on('close', function(){
   echo "stream close\n";
});

client call client

$stream = $client->call(function($stream){
    // 运行在另外一个客户端
     $stream->on('data',function($data) use ($stream) {
       echo $data."\n"; // 收到hello
       $stream->end('world');
    });
    return $stream;
}, [
    'uuid' => '8d24e2ba-c6f8-4bb6-8838-cacd37f64165',
    // ‘something’ => '10.8.0.1'
]);
$stream->write('hello');

$stream->on('data', function($data){
   echo $data."\n"; // 收到world
});

$stream->on('close', function(){
   echo "stream close\n";
});

有趣的例子

在上方的服务端配置里有这样一段

'8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1'

为什么有ip,php难不成可以转发ip流量(转发的是osi第三层,当然第二层也可以转发)吗,并实现ip互通吗,答案是可以的,不过仅限于linux。

  • require extensionpecl-tuntap
    • if build fail try remove TSRMLS_CC in tuntap.c

下面是个最小demo

server.php

<?php

require __DIR__ . '/vendor/autoload.php';

use Reactphp\Framework\Bridge\Server;
use Reactphp\Framework\Bridge\Pool;
use Reactphp\Framework\Bridge\Verify\VerifyUuid;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use Reactphp\Framework\Bridge\TcpBridge;
use React\EventLoop\Loop;

Server::$debug = true;

$server = new Server(new VerifyUuid([
    '8d24e2ba-c6f8-4bb6-8838-cacd37f64165' => '10.10.10.1',
    'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb' => '10.10.10.2',
    '41c5ee60-0628-4b11-9439-a10ba19cbcdd' => '10.10.10.3'
]), new TcpDecodeEncode);



$pool = new Pool($server, [
    'max_connections' => 20,
    'connection_timeout' => 2,
    'keep_alive' => 5,
    'wait_timeout' => 3
]);

$tcp = new TcpBridge('0.0.0.0:8010', $server);

client.php 注意修改里面的ip

<?php

require __DIR__ . '/vendor/autoload.php';

use Reactphp\Framework\Bridge\Client;
use Reactphp\Framework\Bridge\DecodeEncode\TcpDecodeEncode;
use React\EventLoop\Loop;
use function React\Async\async;

Client::$debug = true;
$uuid = $argv[1] ?? 'c4b34f0d-44fa-4ef5-9d28-ccef218d74fb';
echo "uuid: $uuid\n";

$uri = 'tcp://192.168.1.9:8010';
$client = new Client($uri, $uuid, new TcpDecodeEncode);
$client->start();



function run_command($Command)
{
    echo '+ ', $Command, "\n";

    $rc = 0;

    passthru($Command, $rc);

    if ($rc != 0)
        echo '+ Command returned ', $rc, "\n";

    return ($rc == 0);
}

$client->on('controllerConnected', function ($data) use ($client) {

    $ip = $data['something'];
    $br = ((php_sapi_name() == 'cli') ? '' : '<br />');

    global $TUN;

    if (is_resource($TUN)) {
        return;
    }

    // Try to create a new TAP-Device
    if (!is_resource($TUN = tuntap_new('', TUNTAP_DEVICE_TUN)))
        die('Failed to create TAP-Device' . "\n");

    $Interface = tuntap_name($TUN);

    echo 'Created ', $Interface, "\n";

    run_command('ip link set ' . $Interface . ' up');
    run_command("ip addr add $ip/24 dev " . $Interface);
    run_command("iptables -t nat -D POSTROUTING -p all -d $ip/24 -j SNAT --to-source $ip");
    run_command("iptables -t nat -A POSTROUTING -p all -d $ip/24 -j SNAT --to-source $ip");

    // Read Frames from the device
    echo 'Waiting for frames...', $br, "\n";


    $ipTostreams = [];

    Loop::addReadStream($TUN, async(function ($TUN) use ($client, &$ipTostreams) {
        // Try to read next frame from device
        $Data = $buffer =  fread($TUN, 8192);
        $Data = substr($Data, 4);
        if (($Length = strlen($Data)) < 20) {
            trigger_error('IPv4-Frame too short');

            return false;
        }

        // Parse default header
        $Byte = ord($Data[0]);
        $ipVersion = (($Byte >> 4) & 0xF);
        $ipHeaderLength = ($Byte & 0xF);

        if ($ipVersion != 4) {
            trigger_error('IP-Frame is version ' . $ipVersion . ', NOT IPv4');

            return false;
        } elseif (($ipHeaderLength < 5) || ($ipHeaderLength * 4 > $Length)) {
            trigger_error('IPv4-Frame too short for header');

            return false;
        }
        $ipSourceAddress = (ord($Data[12]) << 24) | (ord($Data[13]) << 16) | (ord($Data[14]) << 8) | ord($Data[15]);
        $ipSourceAddress = long2ip($ipSourceAddress);
        echo "ipSourceAddress: $ipSourceAddress\n";
        $ipTargetAddress = (ord($Data[16]) << 24) | (ord($Data[17]) << 16) | (ord($Data[18]) << 8) | ord($Data[19]);
        $ipTargetAddress = long2ip($ipTargetAddress);
        echo "ipTargetAddress: $ipTargetAddress\n";

        if ($client->getStatus() !== 1) {
            echo "client not ready\n";
            if (isset($ipTostreams[$ipTargetAddress])) {
                echo "close stream\n";
                $ipTostreams[$ipTargetAddress]->close();
                unset($ipTostreams[$ipTargetAddress]);
            }
            return;
        }

        if (isset($ipTostreams[$ipTargetAddress])) {
            if ($ipTostreams[$ipTargetAddress] === '') {
                echo "stream is connecting\n";
            } else {
                echo "write to stream\n";
                $ipTostreams[$ipTargetAddress]->write($buffer);
            }
        } else {
            echo "create stream\n";
            $ipTostreams[$ipTargetAddress] = '';
            $stream = $client->call(function ($stream, $info) {
                global $TUN;
                if (!isset($TUN) || !is_resource($TUN)) {
                    Loop::futureTick(function () use ($stream) {
                        $stream->emit('error', [new \Exception('TUN not found')]);
                    });
                    return $stream;
                }
                $stream->on('data', function ($data) use ($TUN) {
                    fwrite($TUN, $data);
                });
                return $stream;
            }, [
                'something' => $ipTargetAddress
            ]);

            $stream->write($buffer);


            $stream->on('data', function ($data) use ($TUN) {
                fwrite($TUN, $data);
            });

            $stream->on('error', function ($e) {
                echo "file: " . $e->getFile() . "\n";
                echo "line: " . $e->getLine() . "\n";
                echo $e->getMessage() . "\n";
            });

            $stream->on('close', function () use (&$ipTostreams, $ipTargetAddress) {
                echo "tun stream close\n";
                unset($ipTostreams[$ipTargetAddress]);
            });
            $ipTostreams[$ipTargetAddress] = $stream;
        }
    }));
});

启动服务端

php server.php

在两个客户端上分别启动

php client.php c4b34f0d-44fa-4ef5-9d28-ccef218d74fb
php client.php 41c5ee60-0628-4b11-9439-a10ba19cbcdd

验证
in 10.10.10.3

ping 10.10.10.2

你的linux ip互通后,假如你使用的是windows或mac,和linux在同一网段,可以修改路由策略访问ip 10.10.10.3

比如在mac上

route -n add -net 10.10.10.3 -netmask 255.255.255.0 '你的linuxip'

然后ping下ip试试,是不是通了?

其它例子

在文件夹下examples

以上

License

MIT

本作品采用《CC 协议》,转载必须注明作者和本文链接
Make everything simple instead of making difficulties as simple as possible
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 3

远程调用另一个服务的方法,看起来有点微服务的感觉。 我对微服务只是略懂,感觉了解微服务的概念和简单的使用是比较简单的,有很多现成的框架和实例可以学习。 但是应用的规模复杂后 debug 和 测试可能会很麻烦,也很少有人分享这方面的知识和经验。 比如 api 会有接口文档出bug可以通过查看接口文档的方式确认是不是参数有误,rpc 出 bug , 单客户端就不太好确定是服务的问题还是参数的问题。 然后比如测试,任意一个服务都有出问题的可能,测试的时候是否需要针对这些特例编写用例进行测试。如果不需要的话,通过测试的功能如何保证上线不会出问题。

5个月前 评论
zds

有现成的rpc协议和封装好的client

5个月前 评论
jcc123

感谢反馈

是否是rpc?

不是rpc,但在之上可以构建出rpc。

关于debug

这个包提供了通过事件监听error。

$stream->on('error', function(\Exception $e){
   echo "error:" $e->getMessage()."\n";
});
  • 一种是timeout。连接对端超时,说明对端不在线或服务不可用。
  • 一种是对端的exception,执行闭包代码的错误信息。 如果还定位不到错误,终端会输出沟通信息。

关于测试

就像写注释一样~

当然,对于这种下层包,测试是必不可少的。 目前在dev-master,也许会一直在dev-master,不建于用于生产环境中。 想体验一把或者用于个人项目尝尝鲜,还是非常推荐的。

5个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!