PHP Socket 通讯 TCP

写 TCP Server 的目其实是想通过动手实践来了解更多 TCP 和 Socket 的知识。跟大家分享一下这次个项目心得,同时也是第一次在 phphub 发帖。

GitHub地址:SimpleTcpServer

TCP & Socket

TCP ( Transmission Control Protocol ),大家肯定不陌生。我们现在开发的 PHP 应用的通讯基本构建于 TCP 之上。顾名思义,简单来说 TCP 就是传输协议。TCP 三次握手保证了通讯的可靠性。顺便考一下你们,经典热门面试题目,TCP为什么要三次握手而不是两次?TCP 的细节就不详细介绍了,相信大家都挺了解的,不了解的朋友可以到 wiki 上查阅。

Socket,是封装好的通信协议的接口,提供网络通讯的能力,更加方便使用协议栈。

很多人会问,Socket 和 TCP 的区别是什么? 简单来说,Socket封装好了一套 API,比如,create、listen、connect、accept、send、read 和 write 等等,更方便使用 TCP。

I/O

为什么要讲一下 I/O 呢?网络 I/O 延迟给应用带来极大的负面影响。Socket的操作是 I/O 的集合。比如,accept 操作无限期地等待正在连接的到来。

I/O 模型有几种类型:

  • 阻塞 I/O(bloking I/O)
  • 非阻塞 I/O(non-bloking I/O)
  • 多路复用 I/O(multiplexing I/O)
  • 信号驱动式 I/O(signal-driven I/O)
  • 异步 I/O (asynchronous I/O)

这里主要简单讲阻塞、非阻塞和多路复用,以后有机会跟大家细讲。

I/O 的过程设计两个对象,一个是IO调用者(进程 process 或者线程 thread),另一个是系统内核 kernel。以 read 操作为例子,read 的过程经历了两个步骤:

  1. 等待数据就绪
  2. 将数据从内核拷贝到进程中

阻塞 I/O,对于网络 I/O,用户进程通过网络传输等待数据的到达。等待过程中,进程被阻塞。等到数据从网络接收完毕,内核开始复制数据到内存。直到内核返回结果,用户进程才解除阻塞。

非阻塞 I/O, 不断询问系统数据是否准备好。例如 socket accept 操作,调用 accept,立刻返回结果。通过返回的结果来判断数据是否准备好,如果还没准备好,继续再问系统,直到数据准备好。进程并没有阻塞,但是一直占用CPU,所以这个不断询问的操作做了很多无用功,浪费资源。

多路复用 I/O, 通过某种机制让系统通知进程其所等待的数据已经准备好。多路复用解决了非阻塞浪费CPU资源的问题。多路复用有三个著名的库 selectpollepoll。简答讲一下 select,因为等下实现的代码用到 selectselect 是系统级别的函数。PHP 里 socket_select 就是调用系统的 select()select 不断轮询文件的描述符的读写就绪状态,如果发现就绪,就通知进程处理数据。另外,select 不支持超过 1024 个的描述符。所以超过 1024 个连接,select 会处理不来。

Code

首先,本项目会用到 PHP 内置的 Socket 函数,跟大家简单介绍一下这些函数。

  1. resource socket_create ( int $domain , int $type , int $protocol ) 创建 Socket
  2. bool socket_bind ( resource $socket , string $address [, int $port = 0 ] ) 绑定地址
  3. bool socket_listen ( resource $socket [, int $backlog = 0 ] ) 监听Socket的连接
  4. resource socket_accept ( resource $socket ) 接受一个连接
  5. string socket_read ( resource $socket , int $length [, int $type = PHP_BINARY_READ ] ) 读操作
  6. int socket_write ( resource $socket , string $buffer [, int $length = 0 ] ) 写操作
  7. void socket_close ( resource $socket ) 关闭Socket,释放资源
  8. bool socket_set_nonblock ( resource $socket ) 设置非阻塞。一个很关键的方法,如果没设置非阻塞,socket的操作就会被阻塞,例如 receive, send, connect, accept 等等。默认情况下,所有操作都是阻塞的。后面会详细介绍。

Connection.php 一个Connection实例代表一个连接。

<?php

namespace Hbliang\SimpleTcpServer;

class Connection implements ConnectionInterface
{
    const MAX_READ = 2048;
    /**
     * @var ServerInterface
     */
    protected $server;

    protected $resource;

    public function __construct(ServerInterface $server, $resource)
    {
        $this->server = $server;
        $this->resource = $resource;
    }

    public function getRemoteAddress()
    {
        socket_getpeername($this->resource, $ip, $port);
        return $ip . ':' . $port;
    }

    public function getLocalAddress()
    {
        socket_getsockname($this->resource, $ip, $port);
        return $ip . ':' . $port;
    }

    public function close()
    {
        socket_close($this->resource);
        $this->server->removeConnection($this);
    }

    public function read()
    {
        return socket_read($this->resource, self::MAX_READ, PHP_BINARY_READ);
    }

    public function write($data)
    {
        return socket_write($this->resource, (string) $data);
    }

    public function getResource()
    {
        return $this->resource;
    }
}

BlockServer.php 一个阻塞的Server

<?php

namespace Hbliang\SimpleTcpServer;

use Evenement\EventEmitter;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;

class BlockServer extends EventEmitter implements LoggerAwareInterface, ServerInterface
{
    use LoggerAwareTrait;

    // 主socket
    protected $master;

    /**
     * @var \SplObjectStorage
     */
    protected $connections;

    protected $running = false;

    public function __construct($domain = 'localhost', $port = 8000)
    {
        // 创建一个socket
        $socket = socket_create(AF_INET, SOCK_STREAM, getprotobyname('tcp'));
        if ($socket === false) {
            $this->throwLastError();
        }
        // 绑定地址
        if (socket_bind($socket, $domain, $port) === false) {
            $this->throwLastError();
        }

        $this->master = $socket;
        $this->connections = new \SplObjectStorage();
        $this->logger = new \Psr\Log\NullLogger();
    }

    public function close()
    {
        if (!$this->running) {
            return;
        }

        socket_close($this->master);
    }

    public function run()
    {
        // 监听请求的连接 
        if (socket_listen($this->master) === false) {
            $this->throwLastError();
        }

        $this->running = true;

        $this->logger->info('start');

        while ($this->running) {
            $this->logger->info('waiting connection...');
            // 成功接收新的连接,返回一个新的子socket
            $socket = socket_accept($this->master);
            if ($socket === false) {
                $this->emit('error', [$this->lastError()]);
                continue;
            }
            // 处理新的连接
            $this->handleNewConnection($socket);
        };
    }

    protected function handleNewConnection($socket)
    {
        $connection = new Connection($this, $socket);

        $this->logger->info('new client from ' . $connection->getRemoteAddress());

        $this->connections->attach($connection);

        // 触发connection事件,监听connection事件并做出反应
        $this->emit('connection', [$connection]);

        do {
            if (false === ($data = $connection->read())) {
                $this->emit('error', [$this->lastError()]);
                $connection->close();
                break;
            }

            // 忽略空的消息
            if (!$data = trim($data)) {
                continue;
            }

            if ($data === 'quit') {
                $connection->close();
                $this->logger->info('client quit');
                break;
            }

            // 触发data事件,只要客户端发来合法信息
            $this->emit('data', [$connection, $data]);

        } while (true);
    }

    public function removeConnection(ConnectionInterface $connection)
    {

        $this->connections->detach($connection);
    }

    public function lastError()
    {
        return new \Exception(socket_strerror(socket_last_error()));
    }

    protected function throwLastError()
    {
        throw $this->lastError();
    }
}

SelectServer.php 一个基于select的server

<?php

namespace Hbliang\SimpleTcpServer;

use Evenement\EventEmitter;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;

class SelectServer extends EventEmitter implements LoggerAwareInterface, ServerInterface
{
    use LoggerAwareTrait;

    const SELECT_TIMEOUT = 0;

    protected $master;

    protected $resources = [];

    protected $running = false;

    protected $booted = false;

    public function __construct($domain = 'localhost', $port = 8000)
    {
        $socket = socket_create(AF_INET, SOCK_STREAM, getprotobyname('tcp'));
        if ($socket === false) {
            $this->throwLastError();
        }
        if (socket_bind($socket, $domain, $port) === false) {
            $this->throwLastError();
        }

        // 设置非阻塞
        socket_set_nonblock($socket);

        $this->master = $socket;
        $this->connections = new \SplObjectStorage();
        $this->logger = new \Psr\Log\NullLogger();
    }

    public function close()
    {
        if (!$this->running) {
            return;
        }

        socket_close($this->master);
    }

    public function pause()
    {
        $this->running = false;
    }

    public function resume()
    {
        if (!$this->booted) {
            $this->booted = true;

            if (socket_listen($this->master) === false) {
                $this->throwLastError();
            }
        }

        $this->running = true;

        if (!in_array($this->master, $this->resources)) {
            $this->resources[(int) $this->master] = $this->master;
        }
    }

    public function run()
    {
        $this->resume();

        while ($this->running) {
            $reads = $this->resources;
            $writes = [];
            $except = [];

            if (socket_select($reads, $writes, $except, self::SELECT_TIMEOUT) < 1) {
                continue;
            }

            // 新的连接请求连接
            if (in_array($this->master, $reads)) {
                $newSocket = socket_accept($this->master);

                if ($newSocket === false) {
                    $this->emit('error', [$this->lastError()]);
                } else {
                    $this->handleNewConnection($newSocket);
                }

                // 不需要处理主socket
                unset($reads[array_search($this->master, $reads)]);
            }

            // 因为PHP是单线程,下面两个foreach循环操作无可避免是阻塞的。

            // 如果handleReadAction 和 handleWriteAction 方法需要执行时间较长,会影响到整个server的通信。
            // 程序被阻塞在此,就无法及时接收新的连接和处理新到达的数据。

            foreach ($reads as $read) {
                $this->handleReadAction($read);
            }

            foreach ($writes as $write) {
                $this->handleWriteAction($write);
            }
        }
    }

    protected function handleReadAction($resource)
    {
        $connection = new Connection($this, $resource);
        if (false === ($data = $connection->read())) {
            $this->emit('error', [$this->lastError()]);
            $connection->close();
        }

        if (!$data = trim($data)) {
            return;
        }

        if ($data === 'quit') {
            $connection->close();
            $this->logger->info('client quit');
            return;
        }

        $this->emit('data', [$connection, $data]);
    }

    protected function handleWriteAction($resource)
    {

    }

    protected function handleNewConnection($socket)
    {
        $connection = new Connection($this, $socket);

        $this->logger->info('new client from ' . $connection->getRemoteAddress());

        $this->resources[(int) $socket] = $socket;

        $this->emit('connection', [$connection]);
    }

    public function removeConnection(ConnectionInterface $connection)
    {
        $resource = $connection->getResource();
        $resourceId = (int) $resource;

        if (isset($this->resources[$resourceId])) {
            unset($this->resources[$resourceId]);
        }
    }

    public function lastError()
    {
        return new \Exception(socket_strerror(socket_last_error()));
    }

    protected function throwLastError()
    {
        throw $this->lastError();
    }
}

Example

Echo Server

examples/EchoServer.php

<?php

// require autoload file from composer
require __DIR__ . '/../vendor/autoload.php';

class Logger extends \Psr\Log\AbstractLogger
{
    public function log($level, $message, array $context = array())
    {
        echo sprintf("%s: %s %s", $level, $message, !empty($context) ? json_encode($context) : '') . PHP_EOL;
    }
}

// listen on address 127.0.0.1 and port 8000
$echoServer = new \Hbliang\SimpleTcpServer\SelectServer('127.0.0.1', 8000);
//$echoServer = new \Hbliang\SimpleTcpServer\BlockServer('127.0.0.1', 8000);

// trigger while receiving data from client
$echoServer->on('data', function (\Hbliang\SimpleTcpServer\Connection $connection, $data) {
    // send data to client
    $connection->write($data . PHP_EOL);
});

// trigger when new connection comes
$echoServer->on('connection', function (\Hbliang\SimpleTcpServer\Connection $connection) {
    $connection->write('welcome' .PHP_EOL);
});

// trigger when occur error
$echoServer->on('error', function (\Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$echoServer->setLogger(new Logger());

$echoServer->run();

启动 Echo Server php examples/EchoServer.php

利用 telnet 通讯 telnet localhost 8000

结果:

最后,关于 TCP 和 I/O 推荐两本书:

谢谢大家!

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 1年前 自动加精
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
讨论数量: 3

点赞啊. 这块比较晦涩, 也是进阶的一个重要方向, 感谢分项.

1年前 评论

请问能不能用php 的socket 来玩p2p网络啊?

1年前 评论

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
未填写
文章
1
粉丝
10
喜欢
70
收藏
12
排名:405
访问:9465
私信
所有博文