ReactPHP - 使用异步 HTTP 并行下载多个文件
ReactPHP - 使用异步 HTTP 并行下载多个文件
ReactPHP - Parallel download of files using asynchronous HTTP
- Title: ReactPHP - 使用异步 HTTP 并行下载多个文件
- Tag:
ReactPHP
、Asynchronous
- Author: Tacks
- Create-Date: 2023-09-07
- Update-Date: 2023-09-07
大纲
Ref
- ReactPHP
- ReactPHP -
React\Http\Browser
- ReactPHP -
React\Promise\Promise
- ReactPHP -
Stream
- ECMAScript 6 入门 - Promise 对象
0、引入问题
通常在 PHP 中,当我们需要批量执行 HTTP 请求,如批量下载一批视频,我们可以一个一个下载,但是这样需要等待每个请求完成才会开始下一个新的请求,处理的文件越多,延迟就越大,在下载所有文件之前我们不能做其他的事情。
那 PHP 可不可以异步方式,来批量下载?
当然可以 ReactPHP
可以帮助你做到 !
1、 ReactPHP
一些基础组件
1.0 事前准备
composer require react/http
1.1 ♻️ React\EventLoop\Loop
事件驱动
React\EventLoop\Loop
事件循环是ReactPHP
的核心组件,用于处理异步事件和非阻塞的 I/O 操作。
EventLoop
会自动帮你选择合适的事件驱动,如代码下可以查看当前选择的事件驱动器,更多可查看 loop-implementations;
示例程序:Loop事件对象获取
$loop = React\EventLoop\Loop::get();
var_dump(get_class($loop));
// string(32) "React\EventLoop\StreamSelectLoop"
run()
运行事件循环,通常来说就是把要执行的任务都加到同一个loop
实例中,最后调用run()
开始执行,应用程序将进入事件驱动的模式,等待事件的触发和处理,直到任务结束。
示例程序:执行Loop事件循环
// 没有注册任何事件的loop, 运行事件循环的代码将会是空闲状态,没有具体的操作或任务执行
$loop = React\EventLoop\Loop::get();
$loop->run();
1.2 🆎 React\Promise\Promise
Promise 异步
ReactPHP
Promise
是参考了 JavaScriptPromises/A+
规范的实现方式,相关的规范和方法 API 基本一致。Promises/A+ 是一种在 JavaScript 社区中广泛接受和使用的 Promise 规范,它定义了 Promise 的行为和接口。
简单来说,在 JS 中,Promise 是异步编程的一种解决方案,ES6 将其写进了语言标准,统一了用法,原生提供了 Promise 对象。 Promise 类似一个容器,里面保存着某个未来才会结束的事件(通常是一个异步操作)的结果。
Promise 中文 意味 “承诺” ~, 表达对未来的承诺,只有两种结果,结果A和结果B。
Promise 一些个特点
1) Promise 对象有三种状态,pending
(进行中)、 fulfilled
(已成功) 、 rejected
(已失败)
2) 一旦状态改变,就不会再变,而变化只有两种情况, pending=>fulfilled , pending=>rejected ;
3) 状态一旦确定,任何时候都会得到这个结果,你可以多次调用 then() 来获取完成的状态;
一些个方法
- 定时器
addTimer()
- 承诺未来的结果
PromiseInterface::then()
- 参数1 回调函数:完成时的回调处理
- 参数2 回调函数:拒绝时的回调处理
- 承诺
Deferred::resolve($value)
- 拒绝
Deferred::reject($reason)
示例程序:Promise基本使用
// 创建 Deferred 延迟对象
$deferred = new React\Promise\Deferred();
// 获取 Promise 未来对象
$promise = $deferred->promise();
// 注册回调函数处理 Promise 的完成状态
// $promise->then(callable $onFulfilled = null, callable $onRejected = null);
// ①
$promise->then(
// 已完成
function ($result) {
echo "Promise onFulfilled | result: " . $result . PHP_EOL;
},
// 已拒绝
function ($reason) {
echo "Promise onRejected | reason: " . $reason . PHP_EOL;
}
);
// ② 如上所述,状态一旦确定,多次调用,获得状态结果也都一致
$promise->then(
// 已完成
function ($result) {
echo "Promise onFulfilled | result: " . $result . PHP_EOL;
},
// 已拒绝
function ($reason) {
echo "Promise onRejected | reason: " . $reason . PHP_EOL;
}
);
// 搞一个事件循环
$loop = React\EventLoop\Loop::get();
// 模拟异步操作,3秒后将Promise状态设置为 onFulfilled
$loop->addTimer(3, function () use ($deferred) {
echo 'addTimer:onFulfilled' . PHP_EOL;
$deferred->resolve("操作成功");
});
// 模拟异步操作,3秒后将Promise状态设置为 onRejected
$loop->addTimer(3, function () use ($deferred) {
echo 'addTimer:onRejected' . PHP_EOL;
$deferred->reject(new \Exception("退出异常"));
});
echo "Loop Run..." . PHP_EOL;
// 输出:
// Loop Run...
// Promise onFulfilled | result: 操作成功
// Promise onFulfilled | result: 操作成功
1.3 🌐 React\Http\Browser
HTTP 请求客户端
首先,需要了解到 React\Http\Browser
,其实就是一个简单的 HttpClient
,看一下如何发送一个 HTTP 请求。
get()
, 具体可看 get
示例程序:HTTP获取文件Header头信息
// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();
// 创建 Browser 用来发起 HTTP 请求
$client = new React\Http\Browser($loop);
$url = 'https://reactphp.org/apple-touch-icon.png';
$client->get($url)->then(function (Psr\Http\Message\ResponseInterface $response) {
$size = $response->getHeaders()['Content-Length'][0];
$type = $response->getHeaders()['Content-Type'][0];
echo "=================================". PHP_EOL;
echo 'Headers received' . PHP_EOL;
echo sprintf("Content-Length: %s", $size) . PHP_EOL;
echo sprintf("Content-Type: %s", $type) . PHP_EOL;
echo "=================================". PHP_EOL. PHP_EOL;
}, function (Exception $e) {
echo sprintf("Error: %s", $e->getMessage()) .PHP_EOL;
});
// 运行事件循环
$loop->run();
1.4 📚 React\Filesystem\Filesystem
Filesystem 文件系统
ReactPHP
异步生态系统中有一个允许与文件系统异步工作的组件:reactphp/filesystem
。该组件为文件系统中最常用的操作提供基于承诺Promise
的接口。
- 安装一下
composer require react/filesystem:v0.1.2 -W
示例程序:Filesystem异步操作文件
// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();
// 创建文件系统
$filesystem = React\Filesystem\Filesystem::create($loop);
// 获取文件对象 返回 React\Filesystem\Node\File
$file = $filesystem->file(__FILE__);
// 以 Promise 方式读取文件内容,异步执行
$file->getContents()->then(function ($contents) {
echo 'Reading completed size:' . strlen($contents) . PHP_EOL;
});
// 以 Promise 方式返回 包含有关文件信息的关联数组
$file->stat()->then(function ($stat) {
print_r($stat);
});
// 添加一个定时器
$loop->addPeriodicTimer(1, function () {
echo 'Timer' . PHP_EOL;
});
// 创建目录对象
$dir = $filesystem->dir(__DIR__);
// 以 Promise 方式读取文件列表
$dir->ls()->then(function (SplObjectStorage $nodes) {
foreach ($nodes as $node) {
echo $node . PHP_EOL;
}
});
// 运行事件循环
$loop->run();
1.5 🌊 Stream
流
流是表现出可流式传输行为的资源对象,它可以以线性方式读取或写入。
ReactPHP
在其整个生态系统中使用“流”的概念,为处理任意数据内容和大小的流提供一致的高级抽象。比如ReactPHP
中的Stream
流允许您以小块的形式高效地处理大量数据(例如多千兆字节的文件下载),而无需将所有内容一次存储在内存中。
每一种流都是继承 Evenement\EventEmitter
, 它实现了一些基础的方法。根据流的类型,又分为 Readable
、 Writable
、 Duplex
,对应 ReactPHP 也提供了三种接口 ReadableStreamInterface
、 WritableStreamInterface
、 DuplexStreamInterface
。
示例程序:ReadableResourceStream 流式读取日志文件
// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();
// 创建可读流 (假设 log.txt 日志文件很大,不能用 file_get_content 一次读取,可以采用流式分批读取)
$stream = new \React\Stream\ReadableResourceStream(fopen('log.txt', 'r'), $loop);
// 判断是否可读
var_dump($stream->isReadable());
// 监听接收事件
$stream->on('data', function($data){
echo 'Read data:', strlen($data) . PHP_EOL;
});
// 监听结束事件
$stream->on('end', function(){
echo "Read finished" . PHP_EOL;
});
// 运行事件循环
$loop->run();
/*
// 输出
bool(true)
Read data:65536
Read data:65536
Read data:65536
Read data:65536
Read data:65536
Read data:65536
Read data:26001
Read finished
*/
示例程序:Stream 的暂停和启动,每次打印一个字节
// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();
// 创建可读流 (第三个参数可以设置 每次读取 1字节 )
$stream = new \React\Stream\ReadableResourceStream(fopen('log.txt', 'r'), $loop, 1);
// 监听接收事件
$stream->on('data', function($data) use ($loop, $stream){
echo 'Read data:', $data . PHP_EOL;
// 暂停
$stream->pause();
// 1s 后启动
$loop->addTimer(1, function() use ($stream) {
$stream->resume();
});
});
// 监听结束事件
$stream->on('end', function(){
echo "Read finished" . PHP_EOL;
});
// 运行事件循环
$loop->run();
示例程序:Stream pipe 管道,读取内容然后写入标准输出
// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();
// 创建可读流
$input = new \React\Stream\ReadableResourceStream(fopen(__FILE__, 'r'), $loop);
// 创建可写流
$output = new \React\Stream\WritableResourceStream(fopen('php://stdout', 'w'), $loop);
// ① 创建管道
$input->pipe($output);
// ② 监听读事件
// $input->on('data', function($data) use ($output){
// // 写入标准输出
// $output->write($data);
// });
// 运行事件循环
$loop->run();
示例程序:Stream ThroughStream 处理流结合管道 pipe,读取内容然后处理过滤再写入标准输出
// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();
// 创建可读流
$input = new \React\Stream\ReadableResourceStream(fopen(__FILE__, 'r'), $loop);
// 创建可写流
$output = new \React\Stream\WritableResourceStream(fopen('php://stdout', 'w'), $loop);
// 处理器 (比如内容全部大写再输出)
$through = new \React\Stream\ThroughStream('strtoupper');
$input->pipe($through)->pipe($output);
// 运行事件循环
$loop->run();
2、 ReactPHP
实现下载文件
2.1 利用 Loop + Http + Filesystem 一口气下载文件
我们希望下载一个远程文件,文本、图片、视频都可以,利用
Http
下载,然后Filesystem
进行保存内容,结合Loop
采用异步事件驱动方式。
- 方法1
- 主要利用
WritableStreamInterface
可写文件流,对Http
的response
内容直接读取然后write()
写入流,最后end()
关闭流
- 主要利用
- 方法2
- 主要利用
Filesystem
提供的putContents()
快捷操作来代替上面 then() 操作,本质时一样的
- 主要利用
示例程序:文件下载
// 事件循环
$loop = React\EventLoop\Loop::get();
// 创建 Browser 用来发起 HTTP 请求
$client = new React\Http\Browser($loop);
// 下载文件
$url = 'https://reactphp.org/apple-touch-icon.png';
$savename = basename($url);
// 创建 Loop 事件循环
$filesystem = \React\Filesystem\Filesystem::create($loop);
// 文件1:c如果文件不存在则创建一个文件并w以可写模式打开该文件
$file = $filesystem->file($savename)->open('cw');
// 文件2:
$file2 = $filesystem->file('copy-' . $savename);
// 发起 HTTP 请求
$client->request('GET', $url)->then(function ($response) use ($file, $file2) {
// 当 Promise 处于 onFulfilled, 打开文件流,写入文件内容
$file->then(function(React\Stream\WritableStreamInterface $stream) use ($response){
$stream->write($response->getBody());
// 记得关闭流
$stream->end();
echo "Download 1 success" . PHP_EOL;
});
// 采用文件辅助方法,类似 file_put_contents() 函数,无需手动关闭文件流
$file2->putContents($response->getBody())->then(function () {
echo "Download 2 success" . PHP_EOL;
});
});
echo 'Downloading' . PHP_EOL;
$loop->run();
- 源码查看:React\Filesystem\Node\File
putContents()
文件写入
public function putContents($contents)
{
return $this->open('cw')->then(function (WritableStreamInterface $stream) use ($contents) {
$stream->write($contents);
return $this->close();
});
}
上面的操作是一次性将下载内容读取出来,如果文件很大,内容不能一次性加载到内存中,如何利用流的方式,分块进行读取写入,并且记录计算下载进度呢?
2.2 为程序添加一个下载进度
由于下载内容比较大,希望可以增加下载进度比,于是我们采用流式下载的方式 具体利用 HTTP 的
requestStreaming()
方式作为可读流,然后利用管道 pipe ,传入到可写流中。 [可读文件流] => pipe => [可写文件流]
示例程序:显示视频下载进度
React\Promise\Stream\unwrapWritable()
-> 参考文档React\Filesystem\Filesystem
得到文件操作,open()
得到的是由Promise
包装的文件流unwrapWritable()
函数接受一个Promise
对象作为参数,并返回对应的可写流- 方便地从
Promise
包装中提取可写流,并直接操作它
React\Stream\ThroughStream
-> 参考文档ThroughStream
相当于一个中间层管道,可以处理每次接收到的 chunk 数据,可以通过获取字节计算,来回显当前读取的进度
pipe()
- 将响应主体流
$stream
通过进度流$progress
写入文件流$fileStream
- 将响应主体流
\033[NA
光标向上移动 N 行\033[1A
每次输出将光标向上移动 1 行
// 事件循环
$loop = React\EventLoop\Loop::get();
// 下载地址
$url = 'https://v.cic.tsinghua.edu.cn/vod/video/6/e/530451.mp4';
$savename = basename($url);
echo 'Requesting ' . $url . '…' . PHP_EOL;
// HTTP 下载器
$client = new React\Http\Browser($loop);
// 保存的文件流
$filesystem = React\Filesystem\Filesystem::create($loop);
$file = $filesystem->file($savename)->open('cw');
$fileStream = React\Promise\Stream\unwrapWritable($file);
// 请求流式响应
$client->requestStreaming('GET', $url)->then(function ($response) use ($fileStream) {
// 获取总大小
$size = $response->getHeaders()['Content-Length'][0];
$currentSize = 0;
echo 'Headers size:', round($size / 1024 / 1024 , 2), ' MB '. PHP_EOL;
echo "=================================" . PHP_EOL;
echo RingCentral\Psr7\str($response);
echo "=================================". PHP_EOL. PHP_EOL;
// 获取响应的可读流
$stream = $response->getBody();
assert($stream instanceof React\Stream\ReadableStreamInterface);
// 创建一个进度流计算进度
$progress = new React\Stream\ThroughStream();
$progress->on('data', function ($chunk) use ($size, &$currentSize) {
$currentSize += strlen($chunk);
// \033[1A 光标向上移动一行
echo "\033[1A", "Downloading: ", number_format($currentSize / $size * 100), "%" . PHP_EOL;
});
$progress->on('end', function () {
echo 'Downloaded complete...' . PHP_EOL;
});
// 将响应主体流(stream)通过进度流(progress)写入文件流(fileStream)
$stream->pipe($progress)->pipe($fileStream);
// 注册close事件回调函数,该回调函数在流关闭时触发。
$timeStart = microtime(true);
$progress->on('close', function () use (&$currentSize, $timeStart, $fileStream) {
$fileStream->end();
$timeCost = microtime(true) - $timeStart;
echo "\r" . 'Downloaded ' . $currentSize . ' currentSize in ' . round($timeCost, 3) . 's => ' . round($currentSize / $timeCost / 1024 / 1024, 2) . ' MB/s' . PHP_EOL;
});
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
// 启动事件流
$loop->run();
/*
输出结果
Requesting https://v.cic.tsinghua.edu.cn/vod/video/6/e/530451.mp4…
Headers size:73.6 MB
=================================
HTTP/1.1 200 OK
Server: nginx/1.25.2
Date: Thu, 07 Sep 2023 07:04:50 GMT
Content-Type: video/mp4
Content-Length: 77180044
Last-Modified: Wed, 06 Sep 2023 09:33:34 GMT
Connection: keep-alive
ETag: "64f8476e-499ac8c"
Accept-Ranges: bytes
=================================
Downloading: 100%
Downloaded complete...
Downloaded 77180044 currentSize in 89.22s => 0.82 MB/s
*/
2.3 支持多个文件并行下载
封装了一个下载器
Downloader
,可以添加一组文件,然后批量并行下载。
示例程序:并行下载多个文件
class Downloader
{
private $loop;
private $client;
private $filesystem;
private $timeStart; // 下载开始时间
private $fileSizeArr; // 文件大小
public function __construct()
{
$this->timeStart = microtime(true);
// 初始化事件循环、HTTP 客户端和文件系统
$this->loop = React\EventLoop\Loop::get();
$this->client = new React\Http\Browser($this->loop);
$this->filesystem = React\Filesystem\Filesystem::create($this->loop);
}
/**
* 异步下载一批文件
*
* @param array $files
* @return void
*/
public function download($files)
{
foreach (array_reverse($files) as $index => $file) {
$this->setRequest($file['url'], $file['savename'], $index + 1);
}
echo "=============================================", PHP_EOL;
echo sprintf("[%s] Start ...", __CLASS__), PHP_EOL;
echo str_repeat(PHP_EOL, count($files));
}
/**
* 设置异步下载
*
* @param string $url
* @param string $savename
* @param int $position
* @return void
*/
private function setRequest($url, $savename, $position)
{
// 保存的文件流
$file = $this->filesystem->file($savename)->open('cw');
$fileStream = React\Promise\Stream\unwrapWritable($file);
// 请求流式响应
$this->client->requestStreaming('GET', $url)->then(function ($response) use ($fileStream, $savename, $position) {
// 获取总大小
$size = $response->getHeaders()['Content-Length'][0];
// 获取响应的可读流
$stream = $response->getBody();
assert($stream instanceof React\Stream\ReadableStreamInterface);
// 创建一个进度流计算进度
$progress = $this->makeProgressStream($size, $savename, $position);
// 将响应主体流(stream)通过进度流(progress)写入文件流(fileStream)
$stream->pipe($progress)->pipe($fileStream);
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
}
/**
* 下载进度
*
* @param int $size
* @param string $savename
* @param int $position
* @return \React\Stream\ThroughStream
*/
private function makeProgressStream($size, $savename, $position)
{
$currentSize = 0;
$progress = new React\Stream\ThroughStream();
$progress->on('data', function ($chunk) use ($size, &$currentSize, $savename, $position) {
$currentSize += strlen($chunk);
echo "\033[{$position}A", "\t",
"Downloading: $savename ",
number_format($currentSize / $size * 100), "%",
str_repeat("\n", $position);
});
$this->fileSizeArr[$savename] = $size;
return $progress;
}
/**
* 启动事件循环
*
* @return void
*/
public function run()
{
// 启动事件循环
$this->loop->run();
foreach (array_reverse($this->fileSizeArr) as $savename => $size) {
echo "Dowloaded", $savename, " size:", $size,
' (', round($size / 1024 / 1024, 2), ') MB ', PHP_EOL;
}
$timeCost = round(microtime(true) - $this->timeStart, 2);
echo sprintf("[%s] End cost time:%s", __CLASS__, $timeCost), PHP_EOL;
echo "=============================================", PHP_EOL;
}
}
// 使用示例
$downloader = new Downloader();
$downloader->download([
[
'url' => 'https://ecp.emic.edu.cn/files/files_1693929666130_2023-09-06_286.mp4',
'savename' => 'v1.mp4',
],
[
'url' => 'https://v.cic.tsinghua.edu.cn/vod/video/6/e/530451.mp4',
'savename' => 'v2.mp4',
],
[
'url' => 'https://ecp.emic.edu.cn/files/files_1693913133815_2023-09-05_371.mp4',
'savename' => 'v3.mp4',
]
]);
$downloader->run();
/*
=============================================
[Downloader] Start ...
Downloading: v1.mp4 100%
Downloading: v2.mp4 100%
Downloading: v3.mp4 100%
Dowloadedv1.mp4 size:12270860 (11.7) MB
Dowloadedv3.mp4 size:366125594 (349.16) MB
Dowloadedv4.mp4 size:352535515 (336.2) MB
[Downloader] End cost time:150.61
=============================================
*/
执行期间可以尝试看一下当前进程,会看到如下场景,多个进程同时处理。
14711 14710 root R 40560 0% 0 2% /usr/local/bin/php /test/reactphp/vendor/wyrihaximus/react-child-process-messenger/bin/child-process WyI2MzM2ZTkxODUyODU2YjhlNzVkMTY2MmM0ZGYyM2Y2OWM1MWExZGQxMz
14712 14710 root S 40560 0% 1 2% /usr/local/bin/php /test/reactphp/vendor/wyrihaximus/react-child-process-messenger/bin/child-process WyJjNjI4MmQwYWE1Y2U2M2E2MjM5MWI2MzhhOTM1YWVlYmNkMjcwNjc2ZT
14713 14710 root S 40560 0% 2 2% /usr/local/bin/php /test/reactphp/vendor/wyrihaximus/react-child-process-messenger/bin/child-process WyIwNTI1NDMxNjgxYmFiZDljODA2Yzg4YzNlNmQ3ZGFhMjM1NDQyYmU4OT
3、小结
ReactPHP
是基于PHP
的事件驱动编程框架,它的异步机制是通过底层的事件循环。React\EventLoop\Loop
事件循环是ReactPHP
的核心组件,它负责监听和分发事件,并驱动异步操作的执行。
本作品采用《CC 协议》,转载必须注明作者和本文链接
curl_multi