laravel 连接 trino
trino服务端测试使用
docker run -it --name trino -p 8080:8080 trinodb/trino
laravel的php测试代码
$YsPhpTrino = new \App\Libs\YsPhpTrino([
'host'=>'http://127.0.0.1:8080',
'basic_auth_user'=>'user',
'basic_auth_password'=>'',
]);
$result = $YsPhpTrino->query('select * from jmx.information_schema.tables limit 10');
dd($result);
trino连接的laravel类代码
文件名 ./app\Libs\YsPhpTrino.php
<?php
namespace App\Libs;
/**
* 参考
* https://github.com/tagomoris/presto-client-node/tree/master
*/
class YsPhpTrino
{
public $options;
const LOG_PRE = 'ysphptrino';
public $columns = [];
public $rows = [];
public $nextUriNum = 0;
public $timeStart = 0;
/**
* 构造方法
*/
public function __construct($options = []) {
$this->setOptions($options);
}
/**
* 设置 配置
*/
public function setOptions($options) {
$this->options = $options;
return $this;
}
/**
* 查询
*/
public function query($sql, $catalog = '', $schema = '', $source = '') {
$this->columns = [];
$this->rows = [];
$this->nextUriNum = 0;
$this->timeStart = time();
$this->log('', "ysphptrino_query:开始: query:".$sql);
$header = [];
if (!empty($catalog)) {
$header['X-Presto-Catalog'] = $catalog;
}
if (!empty($schema)) {
$header['X-Presto-Schema'] = $schema;
}
// 认证
if (!empty($this->options['basic_auth_user']) || !empty($this->options['basic_auth_password'])) {
$str = base64_encode(trim(@$this->options['basic_auth_user']).':'.trim(@$this->options['basic_auth_password']));
$header['Authorization'] = 'Basic '.$str;
}
$result = $this->request('POST', $this->getHost().'/v1/statement', $header, $sql);
if ($result['code'] !== 0) return $result;
$data = json_decode($result['data'], true);
$state = trim(@$data['stats']['state']);
if (in_array($state, ['QUEUED', 'PLANNING', 'STARTING']) && empty($data['data'])) {
$result = $this->nextUri(@$data['nextUri']);
$this->log('', "ysphptrino_query:开始结束用时: " . (time()-$this->timeStart) . " 秒.query:".$sql);
return $result;
}
return $this->back(1, '未知的state:'.$state);
}
/**
* nextUri
*/
public function nextUri($url) {
$this->nextUriNum++;
if ($this->nextUriNum > 100) return $this->back(1, 'nextUri请求次数过百');
if (empty($url)) return $this->back(1, 'nextUri为空');
$result = $this->request('GET', $url, [], '');
if ($result['code'] !== 0) return $result;
$data = json_decode($result['data'], true);
$state = trim(@$data['stats']['state']);
if (in_array($state, ['FAILED'])) {
$msg = $data['stats']['state'].':'.trim(@$data['error']['message']);
return $this->back(1, $msg);
}
if (in_array($state, ['QUEUED', 'PLANNING', 'STARTING', 'RUNNING', 'FINISHING']) && empty($data['data'])) {
return $this->nextUri(@$data['nextUri']);
}
if (in_array($state, ['RUNNING'])) {
// 这里的 data 必不为空
$this->columns = @$data['columns'];
$this->rows = array_merge($this->rows, $data['data']);
return $this->nextUri(@$data['nextUri']);
}
if (in_array($state, ['FINISHED'])) {
if (!empty($data['data'])) {
$this->rows = array_merge($this->rows, $data['data']);
}
return $this->back(0, '', ['columns'=>$this->columns, 'rows'=>$this->rows]);
}
return $this->back(1, '未知的state:'.$state);
}
/**
* 请求
*/
public function request($method, $url, $header, $body) {
$client = new \GuzzleHttp\Client();
$logPath = '';
$this->log($logPath, ['requestApi-start', $method, $url, $header, $body]);
try {
$response = $client->request($method, $url, [
'body'=>$body,
'headers'=>$header,
]);
} catch (ClientException $e) {
$this->log($logPath, "服务器" . $e->getResponse()->getStatusCode() . "异常,请联系管理员");
return $this->back(2, $url." 服务器" . $e->getResponse()->getStatusCode() . "异常,请联系管理员");
} catch (ConnectException $e) {
$this->log($logPath, "网络异常,请联系管理员");
return $this->back(2, $url ." 网络异常,请联系管理员");
} catch (\GuzzleHttp\Exception\RequestException $e) {
$msg = $e->getMessage();
$msgOther = [];
if ($e->getResponse()) {
$msg = $e->getResponse()->getStatusCode();
$msgOther = [$e->getResponse()->getBody()->getContents()];
}
if ($e->getResponse()) $msg = $e->getResponse()->getStatusCode();
$this->log($logPath, "服务器" . $msg . "异常,请联系管理员", $msgOther);
return $this->back(2, $url." 服务器" . $msg . "异常,请联系管理员");
} catch (\GuzzleHttp\Exception\ConnectException $e) {
$code = '异常代码:'.$e->getCode();
$msg = '异常消息:'.$e->getMessage();
$this->log($logPath, $code.' '.$msg);
return $this->back(2, $url ." 网络异常,请联系管理员");
}
$data = $response->getBody()->getContents();
$this->log($logPath, $data, [$response->getStatusCode()]);
if (trim($response->getStatusCode()) !== '200') {
return $this->back(2, $url." 服务器" . $response->getStatusCode() . "异常,请联系管理员");
}
return $this->back(0, '', $data);
}
/**
* log
*/
public function log($logPath, $msg, $context = []) {
if (gettype($msg) !== 'string') {
$msg = json_encode($msg);
}
$logN = \Log::build([
'driver' => 'daily',
'path' => storage_path('/logs/' .self::LOG_PRE. $logPath.'/ysphptrino.log'),
]);
$logN->info($msg, $context);
}
/**
* 获取host
* 不以 / 结尾
*/
public function getHost() {
$host = trim(@$this->options['host']);
if (substr($host, -1) === '/') $host = substr($host, 0, -1);
return $host;
}
/**
* 返回的数据
*/
public function back($code, $msg = '', $data = []) {
return ['code'=>$code, 'msg'=>$msg, 'message'=>$msg, 'data'=>$data];
}
}
本作品采用《CC 协议》,转载必须注明作者和本文链接