ReactPHP - 使用异步 HTTP 并行下载多个文件

ReactPHP - 使用异步 HTTP 并行下载多个文件

ReactPHP - Parallel download of files using asynchronous HTTP

  • Title: ReactPHP - 使用异步 HTTP 并行下载多个文件
  • Tag: ReactPHPAsynchronous
  • Author: Tacks
  • Create-Date: 2023-09-07
  • Update-Date: 2023-09-07

大纲

Ref

0、引入问题

通常在 PHP 中,当我们需要批量执行 HTTP 请求,如批量下载一批视频,我们可以一个一个下载,但是这样需要等待每个请求完成才会开始下一个新的请求,处理的文件越多,延迟就越大,在下载所有文件之前我们不能做其他的事情。

那 PHP 可不可以异步方式,来批量下载?

当然可以 ReactPHP 可以帮助你做到 !

1、 ReactPHP 一些基础组件

1.0 事前准备

composer require react/http

1.1 ♻️ React\EventLoop\Loop 事件驱动

React\EventLoop\Loop 事件循环是 ReactPHP 的核心组件,用于处理异步事件和非阻塞的 I/O 操作。

  • EventLoop 会自动帮你选择合适的事件驱动,如代码下可以查看当前选择的事件驱动器,更多可查看 loop-implementations

示例程序:Loop事件对象获取

$loop = React\EventLoop\Loop::get();
var_dump(get_class($loop));
// string(32) "React\EventLoop\StreamSelectLoop"
  • run() 运行事件循环,通常来说就是把要执行的任务都加到同一个 loop 实例中,最后调用 run() 开始执行,应用程序将进入事件驱动的模式,等待事件的触发和处理,直到任务结束。

示例程序:执行Loop事件循环

// 没有注册任何事件的loop, 运行事件循环的代码将会是空闲状态,没有具体的操作或任务执行
$loop = React\EventLoop\Loop::get();
$loop->run();

1.2 🆎 React\Promise\Promise Promise 异步

ReactPHP Promise 是参考了 JavaScript Promises/A+ 规范的实现方式,相关的规范和方法 API 基本一致。Promises/A+ 是一种在 JavaScript 社区中广泛接受和使用的 Promise 规范,它定义了 Promise 的行为和接口。

简单来说,在 JS 中,Promise 是异步编程的一种解决方案,ES6 将其写进了语言标准,统一了用法,原生提供了 Promise 对象。 Promise 类似一个容器,里面保存着某个未来才会结束的事件(通常是一个异步操作)的结果。

Promise 中文 意味 “承诺” ~, 表达对未来的承诺,只有两种结果,结果A和结果B。

Promise 一些个特点

1) Promise 对象有三种状态,pending (进行中)、 fulfilled (已成功) 、 rejected (已失败)
2) 一旦状态改变,就不会再变,而变化只有两种情况, pending=>fulfilled , pending=>rejected ;
3) 状态一旦确定,任何时候都会得到这个结果,你可以多次调用 then() 来获取完成的状态;

一些个方法

示例程序:Promise基本使用

// 创建 Deferred 延迟对象
$deferred = new React\Promise\Deferred();

// 获取 Promise 未来对象
$promise = $deferred->promise();

// 注册回调函数处理 Promise 的完成状态
// $promise->then(callable $onFulfilled = null, callable $onRejected = null);
// ①
$promise->then(
    // 已完成
    function ($result) {
        echo "Promise onFulfilled | result: " . $result . PHP_EOL;
    },
    // 已拒绝
    function ($reason) {
        echo "Promise onRejected  | reason: " . $reason . PHP_EOL;
    }
);

// ② 如上所述,状态一旦确定,多次调用,获得状态结果也都一致
$promise->then(
    // 已完成
    function ($result) {
        echo "Promise onFulfilled | result: " . $result . PHP_EOL;
    },
    // 已拒绝
    function ($reason) {
        echo "Promise onRejected  | reason: " . $reason . PHP_EOL;
    }
);

// 搞一个事件循环
$loop = React\EventLoop\Loop::get();

// 模拟异步操作,3秒后将Promise状态设置为 onFulfilled
$loop->addTimer(3, function () use ($deferred) {
    echo 'addTimer:onFulfilled' . PHP_EOL;
    $deferred->resolve("操作成功");
});

// 模拟异步操作,3秒后将Promise状态设置为 onRejected
$loop->addTimer(3, function () use ($deferred) {
    echo 'addTimer:onRejected' . PHP_EOL;
    $deferred->reject(new \Exception("退出异常"));
});

echo "Loop Run..." . PHP_EOL;

// 输出:
// Loop Run...
// Promise onFulfilled | result: 操作成功
// Promise onFulfilled | result: 操作成功

1.3 🌐 React\Http\Browser HTTP 请求客户端

首先,需要了解到 React\Http\Browser ,其实就是一个简单的 HttpClient ,看一下如何发送一个 HTTP 请求。

  • get() , 具体可看 get

示例程序:HTTP获取文件Header头信息

// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();

// 创建 Browser 用来发起 HTTP 请求
$client = new React\Http\Browser($loop);

$url = 'https://reactphp.org/apple-touch-icon.png';

$client->get($url)->then(function (Psr\Http\Message\ResponseInterface $response) {
    $size   = $response->getHeaders()['Content-Length'][0];
    $type   = $response->getHeaders()['Content-Type'][0];

    echo "=================================". PHP_EOL;
    echo 'Headers received' . PHP_EOL;
    echo sprintf("Content-Length: %s", $size) . PHP_EOL;
    echo sprintf("Content-Type: %s", $type) . PHP_EOL;
    echo "=================================". PHP_EOL. PHP_EOL;

}, function (Exception $e) {
    echo sprintf("Error: %s", $e->getMessage()) .PHP_EOL;
});

// 运行事件循环
$loop->run();

1.4 📚 React\Filesystem\Filesystem Filesystem 文件系统

ReactPHP 异步生态系统中有一个允许与文件系统异步工作的组件:reactphp/filesystem 。该组件为文件系统中最常用的操作提供基于承诺 Promise 的接口。

  • 安装一下
composer require react/filesystem:v0.1.2 -W

示例程序:Filesystem异步操作文件

// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();

// 创建文件系统
$filesystem = React\Filesystem\Filesystem::create($loop);

// 获取文件对象 返回 React\Filesystem\Node\File
$file = $filesystem->file(__FILE__);

// 以 Promise 方式读取文件内容,异步执行
$file->getContents()->then(function ($contents) {
    echo 'Reading completed size:' . strlen($contents) . PHP_EOL;
});
// 以 Promise 方式返回 包含有关文件信息的关联数组
$file->stat()->then(function ($stat) {
    print_r($stat);
});

// 添加一个定时器
$loop->addPeriodicTimer(1, function () {
    echo 'Timer' . PHP_EOL;
});

// 创建目录对象
$dir = $filesystem->dir(__DIR__);
// 以 Promise 方式读取文件列表
$dir->ls()->then(function (SplObjectStorage $nodes) {
    foreach ($nodes as $node) {
        echo $node . PHP_EOL;
    }
});

// 运行事件循环
$loop->run();

1.5 🌊 Stream

流是表现出可流式传输行为的资源对象,它可以以线性方式读取或写入。 ReactPHP 在其整个生态系统中使用“流”的概念,为处理任意数据内容和大小的流提供一致的高级抽象。比如 ReactPHP 中的 Stream 流允许您以小块的形式高效地处理大量数据(例如多千兆字节的文件下载),而无需将所有内容一次存储在内存中。

每一种流都是继承 Evenement\EventEmitter, 它实现了一些基础的方法。根据流的类型,又分为 ReadableWritableDuplex ,对应 ReactPHP 也提供了三种接口 ReadableStreamInterfaceWritableStreamInterfaceDuplexStreamInterface

示例程序:ReadableResourceStream 流式读取日志文件

// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();

// 创建可读流 (假设 log.txt 日志文件很大,不能用 file_get_content 一次读取,可以采用流式分批读取)
$stream = new \React\Stream\ReadableResourceStream(fopen('log.txt', 'r'), $loop);

// 判断是否可读
var_dump($stream->isReadable());

// 监听接收事件
$stream->on('data', function($data){
    echo 'Read data:', strlen($data) . PHP_EOL;
});

// 监听结束事件
$stream->on('end', function(){
    echo "Read finished" . PHP_EOL;
});

// 运行事件循环
$loop->run();

/*
// 输出
bool(true)
Read data:65536
Read data:65536
Read data:65536
Read data:65536
Read data:65536
Read data:65536
Read data:26001
Read finished
 */

示例程序:Stream 的暂停和启动,每次打印一个字节

// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();

// 创建可读流 (第三个参数可以设置 每次读取 1字节 )
$stream = new \React\Stream\ReadableResourceStream(fopen('log.txt', 'r'), $loop, 1);

// 监听接收事件
$stream->on('data', function($data) use ($loop, $stream){
    echo 'Read data:', $data . PHP_EOL;
    // 暂停
    $stream->pause();
    // 1s 后启动
    $loop->addTimer(1, function() use ($stream) {
        $stream->resume();
    });
});

// 监听结束事件
$stream->on('end', function(){
    echo "Read finished" . PHP_EOL;
});

// 运行事件循环
$loop->run();

示例程序:Stream pipe 管道,读取内容然后写入标准输出

// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();

// 创建可读流 
$input = new \React\Stream\ReadableResourceStream(fopen(__FILE__, 'r'), $loop);
// 创建可写流
$output = new \React\Stream\WritableResourceStream(fopen('php://stdout', 'w'), $loop);

// ① 创建管道
$input->pipe($output);

// ② 监听读事件
// $input->on('data', function($data) use ($output){
//     // 写入标准输出
//     $output->write($data);
// });

// 运行事件循环
$loop->run();

示例程序:Stream ThroughStream 处理流结合管道 pipe,读取内容然后处理过滤再写入标准输出

// 创建 Loop 事件循环
$loop = React\EventLoop\Loop::get();

// 创建可读流 
$input = new \React\Stream\ReadableResourceStream(fopen(__FILE__, 'r'), $loop);
// 创建可写流
$output = new \React\Stream\WritableResourceStream(fopen('php://stdout', 'w'), $loop);

// 处理器 (比如内容全部大写再输出)
$through = new \React\Stream\ThroughStream('strtoupper');

$input->pipe($through)->pipe($output);

// 运行事件循环
$loop->run();

2、 ReactPHP 实现下载文件

2.1 利用 Loop + Http + Filesystem 一口气下载文件

我们希望下载一个远程文件,文本、图片、视频都可以,利用 Http 下载,然后 Filesystem 进行保存内容,结合 Loop 采用异步事件驱动方式。

  • 方法1
    • 主要利用 WritableStreamInterface 可写文件流,对 Httpresponse 内容直接读取然后 write() 写入流,最后 end() 关闭流
  • 方法2
    • 主要利用 Filesystem 提供的 putContents() 快捷操作来代替上面 then() 操作,本质时一样的

示例程序:文件下载

// 事件循环
$loop = React\EventLoop\Loop::get();

// 创建 Browser 用来发起 HTTP 请求
$client = new React\Http\Browser($loop);

// 下载文件
$url = 'https://reactphp.org/apple-touch-icon.png';
$savename = basename($url);

// 创建 Loop 事件循环
$filesystem = \React\Filesystem\Filesystem::create($loop);
// 文件1:c如果文件不存在则创建一个文件并w以可写模式打开该文件
$file  = $filesystem->file($savename)->open('cw');
// 文件2:
$file2 = $filesystem->file('copy-' . $savename);

// 发起 HTTP 请求
$client->request('GET', $url)->then(function ($response) use ($file, $file2) {
    // 当 Promise 处于 onFulfilled, 打开文件流,写入文件内容
    $file->then(function(React\Stream\WritableStreamInterface $stream) use ($response){
        $stream->write($response->getBody());
        // 记得关闭流
        $stream->end();
        echo "Download 1 success" . PHP_EOL;
    });

    // 采用文件辅助方法,类似 file_put_contents() 函数,无需手动关闭文件流
    $file2->putContents($response->getBody())->then(function () {
        echo "Download 2 success" . PHP_EOL;
    });
});

echo 'Downloading' . PHP_EOL;

$loop->run();
  • 源码查看:React\Filesystem\Node\File putContents() 文件写入
public function putContents($contents)
{
    return $this->open('cw')->then(function (WritableStreamInterface $stream) use ($contents) {
        $stream->write($contents);
        return $this->close();
    });
}

上面的操作是一次性将下载内容读取出来,如果文件很大,内容不能一次性加载到内存中,如何利用流的方式,分块进行读取写入,并且记录计算下载进度呢?

2.2 为程序添加一个下载进度

由于下载内容比较大,希望可以增加下载进度比,于是我们采用流式下载的方式 具体利用 HTTP 的 requestStreaming() 方式作为可读流,然后利用管道 pipe ,传入到可写流中。 [可读文件流] => pipe => [可写文件流]

示例程序:显示视频下载进度

  • React\Promise\Stream\unwrapWritable() -> 参考文档
    • React\Filesystem\Filesystem 得到文件操作,open() 得到的是由 Promise 包装的文件流
    • unwrapWritable() 函数接受一个 Promise 对象作为参数,并返回对应的可写流
    • 方便地从 Promise 包装中提取可写流,并直接操作它
  • React\Stream\ThroughStream -> 参考文档
    • ThroughStream 相当于一个中间层管道,可以处理每次接收到的 chunk 数据,可以通过获取字节计算,来回显当前读取的进度
  • pipe()
    • 将响应主体流 $stream 通过进度流 $progress 写入文件流 $fileStream
  • \033[NA 光标向上移动 N 行
    • \033[1A 每次输出将光标向上移动 1 行
// 事件循环
$loop = React\EventLoop\Loop::get();

// 下载地址
$url = 'https://v.cic.tsinghua.edu.cn/vod/video/6/e/530451.mp4';
$savename = basename($url);
echo 'Requesting ' . $url . '…' . PHP_EOL;

// HTTP 下载器
$client = new React\Http\Browser($loop);

// 保存的文件流
$filesystem = React\Filesystem\Filesystem::create($loop);
$file       = $filesystem->file($savename)->open('cw');
$fileStream = React\Promise\Stream\unwrapWritable($file);

// 请求流式响应
$client->requestStreaming('GET', $url)->then(function ($response) use ($fileStream) {

    // 获取总大小
    $size = $response->getHeaders()['Content-Length'][0];
    $currentSize = 0;
    echo 'Headers size:', round($size  / 1024 / 1024 , 2), ' MB '. PHP_EOL;
    echo "=================================" . PHP_EOL;
    echo RingCentral\Psr7\str($response);
    echo "=================================". PHP_EOL. PHP_EOL;

    // 获取响应的可读流
    $stream = $response->getBody();
    assert($stream instanceof React\Stream\ReadableStreamInterface);

    // 创建一个进度流计算进度
    $progress = new React\Stream\ThroughStream();
    $progress->on('data', function ($chunk) use ($size, &$currentSize) {
        $currentSize += strlen($chunk);
        // \033[1A 光标向上移动一行 
        echo  "\033[1A", "Downloading: ", number_format($currentSize / $size * 100), "%" . PHP_EOL;
    });
    $progress->on('end', function () {
        echo 'Downloaded complete...' . PHP_EOL;
    });

    // 将响应主体流(stream)通过进度流(progress)写入文件流(fileStream)
    $stream->pipe($progress)->pipe($fileStream);

    // 注册close事件回调函数,该回调函数在流关闭时触发。
    $timeStart = microtime(true);
    $progress->on('close', function () use (&$currentSize, $timeStart, $fileStream) {
        $fileStream->end();
        $timeCost = microtime(true) - $timeStart;
        echo "\r" . 'Downloaded ' . $currentSize . ' currentSize in ' . round($timeCost, 3) . 's => ' . round($currentSize / $timeCost / 1024 / 1024, 2) . ' MB/s' . PHP_EOL;
    });

}, function (Exception $e) {
    echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

// 启动事件流
$loop->run();

/*
输出结果
Requesting https://v.cic.tsinghua.edu.cn/vod/video/6/e/530451.mp4…
Headers size:73.6 MB
=================================
HTTP/1.1 200 OK
Server: nginx/1.25.2
Date: Thu, 07 Sep 2023 07:04:50 GMT
Content-Type: video/mp4
Content-Length: 77180044
Last-Modified: Wed, 06 Sep 2023 09:33:34 GMT
Connection: keep-alive
ETag: "64f8476e-499ac8c"
Accept-Ranges: bytes

=================================
Downloading: 100%
Downloaded complete...
Downloaded 77180044 currentSize in 89.22s => 0.82 MB/s
*/

2.3 支持多个文件并行下载

封装了一个下载器 Downloader ,可以添加一组文件,然后批量并行下载。

示例程序:并行下载多个文件

class Downloader
{
    private $loop;
    private $client;
    private $filesystem;

    private $timeStart;    // 下载开始时间
    private $fileSizeArr;  // 文件大小

    public function __construct()
    {
        $this->timeStart = microtime(true);

        // 初始化事件循环、HTTP 客户端和文件系统
        $this->loop = React\EventLoop\Loop::get();
        $this->client = new React\Http\Browser($this->loop);
        $this->filesystem = React\Filesystem\Filesystem::create($this->loop);
    }

    /**
     * 异步下载一批文件
     *
     * @param array $files
     * @return void
     */
    public function download($files)
    {
        foreach (array_reverse($files) as $index => $file) {
            $this->setRequest($file['url'], $file['savename'], $index + 1);
        }
        echo "=============================================", PHP_EOL;
        echo sprintf("[%s] Start ...", __CLASS__), PHP_EOL;
        echo str_repeat(PHP_EOL, count($files));
    }

    /**
     * 设置异步下载
     *
     * @param string $url
     * @param string $savename
     * @param int $position
     * @return void
     */
    private function setRequest($url, $savename, $position)
    {
        // 保存的文件流
        $file = $this->filesystem->file($savename)->open('cw');
        $fileStream = React\Promise\Stream\unwrapWritable($file);

        // 请求流式响应
        $this->client->requestStreaming('GET', $url)->then(function ($response) use ($fileStream, $savename, $position) {
            // 获取总大小
            $size = $response->getHeaders()['Content-Length'][0];

            // 获取响应的可读流
            $stream = $response->getBody();
            assert($stream instanceof React\Stream\ReadableStreamInterface);

            // 创建一个进度流计算进度
            $progress = $this->makeProgressStream($size, $savename, $position);
            // 将响应主体流(stream)通过进度流(progress)写入文件流(fileStream)
            $stream->pipe($progress)->pipe($fileStream);
        }, function (Exception $e) {
            echo 'Error: ' . $e->getMessage() . PHP_EOL;
        });
    }

    /**
     * 下载进度
     *
     * @param int $size
     * @param string $savename
     * @param int $position
     * @return \React\Stream\ThroughStream
     */
    private function makeProgressStream($size, $savename, $position)
    {
        $currentSize = 0;
        $progress = new React\Stream\ThroughStream();
        $progress->on('data', function ($chunk) use ($size, &$currentSize, $savename, $position) {
            $currentSize += strlen($chunk);
            echo "\033[{$position}A", "\t",
            "Downloading: $savename ",
            number_format($currentSize / $size * 100), "%",
            str_repeat("\n", $position);
        });

        $this->fileSizeArr[$savename] = $size;

        return $progress;
    }

    /**
     * 启动事件循环
     *
     * @return void
     */
    public function run()
    {
        // 启动事件循环
        $this->loop->run();

        foreach (array_reverse($this->fileSizeArr) as $savename => $size) {
            echo "Dowloaded", $savename, " size:", $size,
            ' (', round($size  / 1024 / 1024, 2), ') MB ', PHP_EOL;
        }

        $timeCost = round(microtime(true) - $this->timeStart, 2);
        echo sprintf("[%s] End cost time:%s", __CLASS__, $timeCost), PHP_EOL;

        echo "=============================================", PHP_EOL;
    }
}

// 使用示例
$downloader = new Downloader();
$downloader->download([
    [
        'url'      => 'https://ecp.emic.edu.cn/files/files_1693929666130_2023-09-06_286.mp4',
        'savename' => 'v1.mp4',
    ],
    [
        'url'      => 'https://v.cic.tsinghua.edu.cn/vod/video/6/e/530451.mp4',
        'savename' => 'v2.mp4',
    ],
    [
        'url'      => 'https://ecp.emic.edu.cn/files/files_1693913133815_2023-09-05_371.mp4',
        'savename' => 'v3.mp4',
    ]
]);
$downloader->run();

/*
=============================================
[Downloader] Start ...
        Downloading: v1.mp4 100%
        Downloading: v2.mp4 100%
        Downloading: v3.mp4 100%
Dowloadedv1.mp4 size:12270860 (11.7) MB
Dowloadedv3.mp4 size:366125594 (349.16) MB
Dowloadedv4.mp4 size:352535515 (336.2) MB
[Downloader] End cost time:150.61
=============================================
*/

执行期间可以尝试看一下当前进程,会看到如下场景,多个进程同时处理。

14711 14710 root     R    40560   0%   0   2% /usr/local/bin/php /test/reactphp/vendor/wyrihaximus/react-child-process-messenger/bin/child-process WyI2MzM2ZTkxODUyODU2YjhlNzVkMTY2MmM0ZGYyM2Y2OWM1MWExZGQxMz
14712 14710 root     S    40560   0%   1   2% /usr/local/bin/php /test/reactphp/vendor/wyrihaximus/react-child-process-messenger/bin/child-process WyJjNjI4MmQwYWE1Y2U2M2E2MjM5MWI2MzhhOTM1YWVlYmNkMjcwNjc2ZT
14713 14710 root     S    40560   0%   2   2% /usr/local/bin/php /test/reactphp/vendor/wyrihaximus/react-child-process-messenger/bin/child-process WyIwNTI1NDMxNjgxYmFiZDljODA2Yzg4YzNlNmQ3ZGFhMjM1NDQyYmU4OT

3、小结

ReactPHP 是基于 PHP 的事件驱动编程框架,它的异步机制是通过底层的事件循环。React\EventLoop\Loop 事件循环是 ReactPHP 的核心组件,它负责监听和分发事件,并驱动异步操作的执行。

本作品采用《CC 协议》,转载必须注明作者和本文链接
明天我们吃什么 悲哀藏在现实中 Tacks
讨论数量: 2

curl_multi

7个月前 评论
Tacks (楼主) 7个月前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!