PHP Socket 通讯 TCP
写 TCP Server 的目其实是想通过动手实践来了解更多 TCP 和 Socket 的知识。跟大家分享一下这次个项目心得,同时也是第一次在 phphub 发帖。
GitHub地址:SimpleTcpServer
TCP & Socket
TCP ( Transmission Control Protocol ),大家肯定不陌生。我们现在开发的 PHP 应用的通讯基本构建于 TCP 之上。顾名思义,简单来说 TCP 就是传输协议。TCP 三次握手保证了通讯的可靠性。顺便考一下你们,经典热门面试题目,TCP为什么要三次握手而不是两次?TCP 的细节就不详细介绍了,相信大家都挺了解的,不了解的朋友可以到 wiki 上查阅。
Socket,是封装好的通信协议的接口,提供网络通讯的能力,更加方便使用协议栈。
很多人会问,Socket 和 TCP 的区别是什么? 简单来说,Socket封装好了一套 API,比如,create、listen、connect、accept、send、read 和 write 等等,更方便使用 TCP。
I/O
为什么要讲一下 I/O 呢?网络 I/O 延迟给应用带来极大的负面影响。Socket的操作是 I/O 的集合。比如,accept 操作无限期地等待正在连接的到来。
I/O 模型有几种类型:
- 阻塞 I/O(bloking I/O)
- 非阻塞 I/O(non-bloking I/O)
- 多路复用 I/O(multiplexing I/O)
- 信号驱动式 I/O(signal-driven I/O)
- 异步 I/O (asynchronous I/O)
这里主要简单讲阻塞、非阻塞和多路复用,以后有机会跟大家细讲。
I/O 的过程设计两个对象,一个是IO调用者(进程 process 或者线程 thread),另一个是系统内核 kernel。以 read 操作为例子,read 的过程经历了两个步骤:
- 等待数据就绪
- 将数据从内核拷贝到进程中
阻塞 I/O,对于网络 I/O,用户进程通过网络传输等待数据的到达。等待过程中,进程被阻塞。等到数据从网络接收完毕,内核开始复制数据到内存。直到内核返回结果,用户进程才解除阻塞。
非阻塞 I/O, 不断询问系统数据是否准备好。例如 socket accept 操作,调用 accept,立刻返回结果。通过返回的结果来判断数据是否准备好,如果还没准备好,继续再问系统,直到数据准备好。进程并没有阻塞,但是一直占用CPU,所以这个不断询问的操作做了很多无用功,浪费资源。
多路复用 I/O, 通过某种机制让系统通知进程其所等待的数据已经准备好。多路复用解决了非阻塞浪费CPU资源的问题。多路复用有三个著名的库 select、poll 和 epoll。简答讲一下 select,因为等下实现的代码用到 select。select 是系统级别的函数。PHP 里 socket_select
就是调用系统的 select()
。select
不断轮询文件的描述符的读写就绪状态,如果发现就绪,就通知进程处理数据。另外,select 不支持超过 1024 个的描述符。所以超过 1024 个连接,select 会处理不来。
Code
首先,本项目会用到 PHP 内置的 Socket 函数,跟大家简单介绍一下这些函数。
resource socket_create ( int $domain , int $type , int $protocol )
创建 Socketbool socket_bind ( resource $socket , string $address [, int $port = 0 ] )
绑定地址bool socket_listen ( resource $socket [, int $backlog = 0 ] )
监听Socket的连接resource socket_accept ( resource $socket )
接受一个连接string socket_read ( resource $socket , int $length [, int $type = PHP_BINARY_READ ] )
读操作int socket_write ( resource $socket , string $buffer [, int $length = 0 ] )
写操作void socket_close ( resource $socket )
关闭Socket,释放资源bool socket_set_nonblock ( resource $socket )
设置非阻塞。一个很关键的方法,如果没设置非阻塞,socket的操作就会被阻塞,例如 receive, send, connect, accept 等等。默认情况下,所有操作都是阻塞的。后面会详细介绍。
Connection.php
一个Connection
实例代表一个连接。
<?php
namespace Hbliang\SimpleTcpServer;
class Connection implements ConnectionInterface
{
const MAX_READ = 2048;
/**
* @var ServerInterface
*/
protected $server;
protected $resource;
public function __construct(ServerInterface $server, $resource)
{
$this->server = $server;
$this->resource = $resource;
}
public function getRemoteAddress()
{
socket_getpeername($this->resource, $ip, $port);
return $ip . ':' . $port;
}
public function getLocalAddress()
{
socket_getsockname($this->resource, $ip, $port);
return $ip . ':' . $port;
}
public function close()
{
socket_close($this->resource);
$this->server->removeConnection($this);
}
public function read()
{
return socket_read($this->resource, self::MAX_READ, PHP_BINARY_READ);
}
public function write($data)
{
return socket_write($this->resource, (string) $data);
}
public function getResource()
{
return $this->resource;
}
}
BlockServer.php
一个阻塞的Server
<?php
namespace Hbliang\SimpleTcpServer;
use Evenement\EventEmitter;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
class BlockServer extends EventEmitter implements LoggerAwareInterface, ServerInterface
{
use LoggerAwareTrait;
// 主socket
protected $master;
/**
* @var \SplObjectStorage
*/
protected $connections;
protected $running = false;
public function __construct($domain = 'localhost', $port = 8000)
{
// 创建一个socket
$socket = socket_create(AF_INET, SOCK_STREAM, getprotobyname('tcp'));
if ($socket === false) {
$this->throwLastError();
}
// 绑定地址
if (socket_bind($socket, $domain, $port) === false) {
$this->throwLastError();
}
$this->master = $socket;
$this->connections = new \SplObjectStorage();
$this->logger = new \Psr\Log\NullLogger();
}
public function close()
{
if (!$this->running) {
return;
}
socket_close($this->master);
}
public function run()
{
// 监听请求的连接
if (socket_listen($this->master) === false) {
$this->throwLastError();
}
$this->running = true;
$this->logger->info('start');
while ($this->running) {
$this->logger->info('waiting connection...');
// 成功接收新的连接,返回一个新的子socket
$socket = socket_accept($this->master);
if ($socket === false) {
$this->emit('error', [$this->lastError()]);
continue;
}
// 处理新的连接
$this->handleNewConnection($socket);
};
}
protected function handleNewConnection($socket)
{
$connection = new Connection($this, $socket);
$this->logger->info('new client from ' . $connection->getRemoteAddress());
$this->connections->attach($connection);
// 触发connection事件,监听connection事件并做出反应
$this->emit('connection', [$connection]);
do {
if (false === ($data = $connection->read())) {
$this->emit('error', [$this->lastError()]);
$connection->close();
break;
}
// 忽略空的消息
if (!$data = trim($data)) {
continue;
}
if ($data === 'quit') {
$connection->close();
$this->logger->info('client quit');
break;
}
// 触发data事件,只要客户端发来合法信息
$this->emit('data', [$connection, $data]);
} while (true);
}
public function removeConnection(ConnectionInterface $connection)
{
$this->connections->detach($connection);
}
public function lastError()
{
return new \Exception(socket_strerror(socket_last_error()));
}
protected function throwLastError()
{
throw $this->lastError();
}
}
SelectServer.php
一个基于select的server
<?php
namespace Hbliang\SimpleTcpServer;
use Evenement\EventEmitter;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
class SelectServer extends EventEmitter implements LoggerAwareInterface, ServerInterface
{
use LoggerAwareTrait;
const SELECT_TIMEOUT = 0;
protected $master;
protected $resources = [];
protected $running = false;
protected $booted = false;
public function __construct($domain = 'localhost', $port = 8000)
{
$socket = socket_create(AF_INET, SOCK_STREAM, getprotobyname('tcp'));
if ($socket === false) {
$this->throwLastError();
}
if (socket_bind($socket, $domain, $port) === false) {
$this->throwLastError();
}
// 设置非阻塞
socket_set_nonblock($socket);
$this->master = $socket;
$this->connections = new \SplObjectStorage();
$this->logger = new \Psr\Log\NullLogger();
}
public function close()
{
if (!$this->running) {
return;
}
socket_close($this->master);
}
public function pause()
{
$this->running = false;
}
public function resume()
{
if (!$this->booted) {
$this->booted = true;
if (socket_listen($this->master) === false) {
$this->throwLastError();
}
}
$this->running = true;
if (!in_array($this->master, $this->resources)) {
$this->resources[(int) $this->master] = $this->master;
}
}
public function run()
{
$this->resume();
while ($this->running) {
$reads = $this->resources;
$writes = [];
$except = [];
if (socket_select($reads, $writes, $except, self::SELECT_TIMEOUT) < 1) {
continue;
}
// 新的连接请求连接
if (in_array($this->master, $reads)) {
$newSocket = socket_accept($this->master);
if ($newSocket === false) {
$this->emit('error', [$this->lastError()]);
} else {
$this->handleNewConnection($newSocket);
}
// 不需要处理主socket
unset($reads[array_search($this->master, $reads)]);
}
// 因为PHP是单线程,下面两个foreach循环操作无可避免是阻塞的。
// 如果handleReadAction 和 handleWriteAction 方法需要执行时间较长,会影响到整个server的通信。
// 程序被阻塞在此,就无法及时接收新的连接和处理新到达的数据。
foreach ($reads as $read) {
$this->handleReadAction($read);
}
foreach ($writes as $write) {
$this->handleWriteAction($write);
}
}
}
protected function handleReadAction($resource)
{
$connection = new Connection($this, $resource);
if (false === ($data = $connection->read())) {
$this->emit('error', [$this->lastError()]);
$connection->close();
}
if (!$data = trim($data)) {
return;
}
if ($data === 'quit') {
$connection->close();
$this->logger->info('client quit');
return;
}
$this->emit('data', [$connection, $data]);
}
protected function handleWriteAction($resource)
{
}
protected function handleNewConnection($socket)
{
$connection = new Connection($this, $socket);
$this->logger->info('new client from ' . $connection->getRemoteAddress());
$this->resources[(int) $socket] = $socket;
$this->emit('connection', [$connection]);
}
public function removeConnection(ConnectionInterface $connection)
{
$resource = $connection->getResource();
$resourceId = (int) $resource;
if (isset($this->resources[$resourceId])) {
unset($this->resources[$resourceId]);
}
}
public function lastError()
{
return new \Exception(socket_strerror(socket_last_error()));
}
protected function throwLastError()
{
throw $this->lastError();
}
}
Example
Echo Server
examples/EchoServer.php
<?php
// require autoload file from composer
require __DIR__ . '/../vendor/autoload.php';
class Logger extends \Psr\Log\AbstractLogger
{
public function log($level, $message, array $context = array())
{
echo sprintf("%s: %s %s", $level, $message, !empty($context) ? json_encode($context) : '') . PHP_EOL;
}
}
// listen on address 127.0.0.1 and port 8000
$echoServer = new \Hbliang\SimpleTcpServer\SelectServer('127.0.0.1', 8000);
//$echoServer = new \Hbliang\SimpleTcpServer\BlockServer('127.0.0.1', 8000);
// trigger while receiving data from client
$echoServer->on('data', function (\Hbliang\SimpleTcpServer\Connection $connection, $data) {
// send data to client
$connection->write($data . PHP_EOL);
});
// trigger when new connection comes
$echoServer->on('connection', function (\Hbliang\SimpleTcpServer\Connection $connection) {
$connection->write('welcome' .PHP_EOL);
});
// trigger when occur error
$echoServer->on('error', function (\Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
$echoServer->setLogger(new Logger());
$echoServer->run();
启动 Echo Server php examples/EchoServer.php
利用 telnet 通讯 telnet localhost 8000
结果:
最后,关于 TCP 和 I/O 推荐两本书:
谢谢大家!
本作品采用《CC 协议》,转载必须注明作者和本文链接
点赞啊. 这块比较晦涩, 也是进阶的一个重要方向, 感谢分项.
请问能不能用php 的socket 来玩p2p网络啊?
@张铁林 想玩区块链了?
记录下服务器公网不能作为 tcp 的 ip 地址原因,首先安全组等于是个虚拟的防火墙,需要在系统中开放防火墙,接下来是重点:在服务端中将阿里云的 主私网IP 作为监听ip,例如:172.x.x.x,在客户端中就可以使用阿里云的 公网ip 了,例如:47.x.x.x,然后就可以愉快的玩耍了。