laravel队列的消费能力

不知道在项目中使用php artisan queue:work的有多少, 如果在队列任务里面有http请求的任务, 数量一多, 就处理不过来, 处理的很慢, 不知道有没有遇到过相同情况的? 以及怎么解决了

w_W_v_V
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
讨论数量: 18
sanders

接上贴(说内容过长了 :sweat_smile:

因为 horizon 会根据队列的繁忙程度自动扩缩子进程,而子进程会导致容器的负载和内存开销提升或降低,这时我们再去通过 k8s 的横向扩缩对象进行容器的扩缩,对应的片段如下:

deployment-horizon.yaml 文件用来根据一个 horizon.csv 文件的内容来配置当前队列的名称及其对应的申请资源和限制资源。

{{- $currentScope := . }}
{{- range $offset,$_ := .Files.Lines "horizon.csv" }}
{{- if or (contains "supervisor" . ) (empty .) (contains "#" .)}}{{ else }}
{{- $row := dict "queueName" "default"}}
{{- range $column,$value := regexSplit "," (. | toString) -1 }}
{{- if eq $column 0}}{{ $_ := set $row "podName" $value }}{{ end }}
{{- if eq $column 1}}{{ $_ := set $row "queueName" $value }}{{ end }}
{{- if eq $column 4}}{{ $_ := set $row "requestMemory" $value }}{{ end }}
{{- if eq $column 5}}{{ $_ := set $row "requestCpu" $value }}{{ end }}
{{- if eq $column 6}}{{ $_ := set $row "limitMemory" $value }}{{ end }}
{{- if eq $column 7}}{{ $_ := set $row "limitCpu" $value }}{{ end }}
{{- if eq $column 10}}{{ $_ := set $row "maxProcesses" $value }}{{ end }}
{{- end }}
{{- with $currentScope }}
{{ if gt $offset 1 }}---{{ end }}
apiVersion: apps/v1
kind: Deployment
metadata:
# ...
spec:
  {{- if not .Values.autoscaling.enabled }}
  replicas: {{ .Values.replicaCount }}
  {{- end }}
  selector:
# ...
  template: # POD 模板
    metadata:
# ...
      labels:
# ...
    spec:
# ...
      containers:
        - name: {{ .Chart.Name }}-horizon
# ...
          resources: # 资源
            limits:
              cpu: {{ $row.limitCpu | default "2000m" }}
              memory: {{ $row.limitMemory | default "512Mi" }}
            requests:
              cpu: {{ $row.requestCpu | default "10m" }}
              memory: {{ $row.requestMemory | default "128Mi"}}
          volumeMounts:
# ...
          env:
# ...
      volumes: # POD 共享卷声明
# ...
      nodeSelector: # 节点选择器
# ...
      {{- end }}
{{- end }}
{{- end }}
{{- end }}

HPA对象同样根据 horizon.csv 的内容来控制横向扩缩的 pod 数量。

{{- if .Values.autoscaling.enabled }}
{{- $currentScope := . }}
{{- range $offset,$_ := .Files.Lines "horizon.csv" }}
{{- if or (contains "supervisor" . ) (empty .) (contains "#" .)}}{{ else }}
{{- $row := dict }}
{{- range $column,$value := regexSplit "," (. | toString) -1 }}
{{- if eq $column 0}}{{ $_ := set $row "podName" $value }}{{ end }}
{{- if eq $column 1}}{{ $_ := set $row "queueName" $value }}{{ end }}
{{- if eq $column 2}}{{ $_ := set $row "minReplicas" $value }}{{ end }}
{{- if eq $column 3}}{{ $_ := set $row "maxReplicas" $value }}{{ end }}
{{- if eq $column 8}}{{ $_ := set $row "targetMemoryPercent" $value }}{{ end }}
{{- if eq $column 9}}{{ $_ := set $row "targetCpuPercent" $value }}{{ end }}
{{- end }}
{{- with $currentScope }}
{{ if gt $offset 1 }}---{{ end }}
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
# ...
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: {{ include "akitado.fullname" . }}-horizon-{{ $row.podName }}
  minReplicas: {{ $row.minReplicas | default 1 }}
  maxReplicas: {{ $row.maxReplicas | default 1 }}
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: {{ $row.targetCpuPercent | default 90 }}
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: {{ $row.targetMemoryPercent | default 90 }}
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300 # 稳定窗口期 执行缩容后观察一段时间不做动作
      policies: # 缩容策略 多策略存在时采用销毁副本数量最多的策略
        - periodSeconds: 60 # 策略周期
          type: Percent # 目标:按比例
          value: 10 # 最多销毁副本所占百分比
    scaleUp:
      stabilizationWindowSeconds:horizon 120 # 稳定窗口期 执行扩容后观察一段时间不做动作
      policies:
        - periodSeconds: 60 # 策略周期
          type: Percent # 目标:按比例
          value: 10 # 最多扩容副本所占百分比
{{- end }}
{{- end }}
{{- end }}
{{- end }}

一个 horizon.csv 就集中控制了这些队列容器的资源:

file

最终使用 helm 部署到集群问题就解决了大部分。

file

还剩个尾巴就是节点的扩缩,这个我们用阿里云提供的自动扩缩节点池来实现,配置之后通过调度资源不足导致容器调度失败的事件来动态扩缩按量计费的节点,这个需要的费用可能会较多。

1周前 评论
w_W_v_V (楼主) 1周前

多个消费者 进行消费

2周前 评论
w_W_v_V (楼主) 1周前

异步队列不就是用来解决同步操作耗时的问题吗?处理不过来就增加消费进程数,Http使用并发请求处理,量再大架构就要调整,单节点改为多节点消费。

2周前 评论
小学毕业生 1周前
w_W_v_V (楼主) 1周前

这个东西感觉并不好用,如果非要用这个的话,建议加上 --queue=xxx 参数,防止所有的任务都挤在一起。

1周前 评论
w_W_v_V (楼主) 1周前

我给公司开发了一个内部系统,包含类似 github issue 的工单系统,使用 queue worker 来发送通知邮件。

这个系统还有每天抓取一百多个网站的信息,比如 SEO 信息、SSL、域名等信息,也会发送一定数量的包含 HTTP、 WHOIS 等网络请求,用 laravel scheduler 每天晚上执行一次。

1周前 评论
w_W_v_V (楼主) 1周前

我一般是通过horizon来管理队列 可以针对某些任务在有时候堆积到一定程度后会自动扩容子进程去消费任务 消费完后会自动回收 也可以方便监控你的队列是否有失败等等

1周前 评论
w_W_v_V (楼主) 1周前

可以运行多个队列进程的. 队列《Laravel 10 中文文档》

1周前 评论
w_W_v_V (楼主) 1周前
jimmy (作者) 1周前
w_W_v_V (楼主) 1周前
sanders

这道题我熟啊 :smile: 虽然可能不是最优解。

我们先用 horizon 在容器内将子进程进行扩缩,具体反映在启动脚本上,片段如下:

#!/usr/bin/env sh

set -e
# role horizon: message queue supervisor
# role console: terminal console for cluster administrator
role=${CONTAINER_ROLE:-console}

# queue: message queue name
queue=${CONTAINER_QUEUE}

# queueMaxProcesses: horizon max queue processes
# value cover sequence: env > config > default(3)
queue_max_processes=${QUEUE_MAX_PROC}
# .....
elif [[ "$role" = "horizon" ]]; then
    echo "horizon supervisor starting ..."
    if [[ "$queue" = "" && "$queue_max_processes" = "" ]]; then
        exec /usr/local/bin/php /www/wwwroot/artisan horizon --environment=production --verbose
    elif [[ "$queue_max_processes" = "" ]]; then
        exec /usr/local/bin/php /www/wwwroot/artisan horizon --environment=production --queue="$queue" --verbose
    elif [[ "$queue" = "" ]]; then
        exec /usr/local/bin/php /www/wwwroot/artisan horizon --environment=production --maxProcesses="$queue_max_processes" --verbose
    else
        exec /usr/local/bin/php /www/wwwroot/artisan horizon --environment=production --queue="$queue" --maxProcesses="$queue_max_processes" --verbose
    fi
1周前 评论
sanders

接上贴(说内容过长了 :sweat_smile:

因为 horizon 会根据队列的繁忙程度自动扩缩子进程,而子进程会导致容器的负载和内存开销提升或降低,这时我们再去通过 k8s 的横向扩缩对象进行容器的扩缩,对应的片段如下:

deployment-horizon.yaml 文件用来根据一个 horizon.csv 文件的内容来配置当前队列的名称及其对应的申请资源和限制资源。

{{- $currentScope := . }}
{{- range $offset,$_ := .Files.Lines "horizon.csv" }}
{{- if or (contains "supervisor" . ) (empty .) (contains "#" .)}}{{ else }}
{{- $row := dict "queueName" "default"}}
{{- range $column,$value := regexSplit "," (. | toString) -1 }}
{{- if eq $column 0}}{{ $_ := set $row "podName" $value }}{{ end }}
{{- if eq $column 1}}{{ $_ := set $row "queueName" $value }}{{ end }}
{{- if eq $column 4}}{{ $_ := set $row "requestMemory" $value }}{{ end }}
{{- if eq $column 5}}{{ $_ := set $row "requestCpu" $value }}{{ end }}
{{- if eq $column 6}}{{ $_ := set $row "limitMemory" $value }}{{ end }}
{{- if eq $column 7}}{{ $_ := set $row "limitCpu" $value }}{{ end }}
{{- if eq $column 10}}{{ $_ := set $row "maxProcesses" $value }}{{ end }}
{{- end }}
{{- with $currentScope }}
{{ if gt $offset 1 }}---{{ end }}
apiVersion: apps/v1
kind: Deployment
metadata:
# ...
spec:
  {{- if not .Values.autoscaling.enabled }}
  replicas: {{ .Values.replicaCount }}
  {{- end }}
  selector:
# ...
  template: # POD 模板
    metadata:
# ...
      labels:
# ...
    spec:
# ...
      containers:
        - name: {{ .Chart.Name }}-horizon
# ...
          resources: # 资源
            limits:
              cpu: {{ $row.limitCpu | default "2000m" }}
              memory: {{ $row.limitMemory | default "512Mi" }}
            requests:
              cpu: {{ $row.requestCpu | default "10m" }}
              memory: {{ $row.requestMemory | default "128Mi"}}
          volumeMounts:
# ...
          env:
# ...
      volumes: # POD 共享卷声明
# ...
      nodeSelector: # 节点选择器
# ...
      {{- end }}
{{- end }}
{{- end }}
{{- end }}

HPA对象同样根据 horizon.csv 的内容来控制横向扩缩的 pod 数量。

{{- if .Values.autoscaling.enabled }}
{{- $currentScope := . }}
{{- range $offset,$_ := .Files.Lines "horizon.csv" }}
{{- if or (contains "supervisor" . ) (empty .) (contains "#" .)}}{{ else }}
{{- $row := dict }}
{{- range $column,$value := regexSplit "," (. | toString) -1 }}
{{- if eq $column 0}}{{ $_ := set $row "podName" $value }}{{ end }}
{{- if eq $column 1}}{{ $_ := set $row "queueName" $value }}{{ end }}
{{- if eq $column 2}}{{ $_ := set $row "minReplicas" $value }}{{ end }}
{{- if eq $column 3}}{{ $_ := set $row "maxReplicas" $value }}{{ end }}
{{- if eq $column 8}}{{ $_ := set $row "targetMemoryPercent" $value }}{{ end }}
{{- if eq $column 9}}{{ $_ := set $row "targetCpuPercent" $value }}{{ end }}
{{- end }}
{{- with $currentScope }}
{{ if gt $offset 1 }}---{{ end }}
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
# ...
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: {{ include "akitado.fullname" . }}-horizon-{{ $row.podName }}
  minReplicas: {{ $row.minReplicas | default 1 }}
  maxReplicas: {{ $row.maxReplicas | default 1 }}
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: {{ $row.targetCpuPercent | default 90 }}
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: {{ $row.targetMemoryPercent | default 90 }}
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300 # 稳定窗口期 执行缩容后观察一段时间不做动作
      policies: # 缩容策略 多策略存在时采用销毁副本数量最多的策略
        - periodSeconds: 60 # 策略周期
          type: Percent # 目标:按比例
          value: 10 # 最多销毁副本所占百分比
    scaleUp:
      stabilizationWindowSeconds:horizon 120 # 稳定窗口期 执行扩容后观察一段时间不做动作
      policies:
        - periodSeconds: 60 # 策略周期
          type: Percent # 目标:按比例
          value: 10 # 最多扩容副本所占百分比
{{- end }}
{{- end }}
{{- end }}
{{- end }}

一个 horizon.csv 就集中控制了这些队列容器的资源:

file

最终使用 helm 部署到集群问题就解决了大部分。

file

还剩个尾巴就是节点的扩缩,这个我们用阿里云提供的自动扩缩节点池来实现,配置之后通过调度资源不足导致容器调度失败的事件来动态扩缩按量计费的节点,这个需要的费用可能会较多。

1周前 评论
w_W_v_V (楼主) 1周前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!