MinIO 存储桶通知
作用
存储桶(Bucket)如果发生变化,比如上传和删除对象,可以使用存储桶时间通知机制进行监控,并通过以下方式发布出去,minio 支持以下消息通知方式
Supported Notification Targets |
---|
AMQP |
MQTT |
Elasticsearch |
NSQ |
Redis |
PostgreSQL |
MySQL |
Apache Kafka |
Webhooks |
整合 RabbitMQ
这里以 RabbitMQ 为例
# 使用命令查看初始的通知策略
$ mc admin config get minio | grep notigy
notify_webhook publish bucket notifications to webhook endpoints
notify_amqp publish bucket notifications to AMQP endpoints
notify_kafka publish bucket notifications to Kafka endpoints
notify_mqtt publish bucket notifications to MQTT endpoints
notify_nats publish bucket notifications to NATS endpoints
notify_nsq publish bucket notifications to NSQ endpoints
notify_mysql publish bucket notifications to MySQL databases
notify_postgres publish bucket notifications to Postgres databases
notify_elasticsearch publish bucket notifications to Elasticsearch endpoints
notify_redis publish bucket notifications to Redis datastores
1、将事件发布到 RabbitMQ 上
key | 参数类型 | 说明 |
---|---|---|
notigy_amqp | string | 创建的 RabbitMQ 实例名称,名称不重复即可 |
ARGS | 参数类型 | 说明 |
---|---|---|
url | url | AMQP 服务端点 示例:amqp://myuser:mypassword@localhost:5672 |
exchange | string | 交换机名称 |
exchange_type | string | 交换机类型 |
routing_key | string | 路由 key |
mandatory | on|off | 设置为“关闭”时忽略未传递的消息,默认为“开启” |
durable | on|off | 当设置为“on”时,跨代理重新启动持久队列,默认为“off” |
no_wait | on|off | 设置为“on”时非阻塞消息传递,默认为“off” |
internal | on|off | 设置为“on”以使发布者不直接使用交换,而仅在绑定到其他交换时使用 |
auto_deleted | on|off | 设置为“on”时自动删除队列,当没有消费者时 |
delivery_mode | number | 为非持久性设置为“1”或为持久性队列设置为“2” |
queue_dir | path | 未送达消息的暂存目录,例如 ‘/home/event’ |
queue_limit | number | 未送达消息的最大限制,默认为“100000” |
comment | sentence | (可选)设置添加评论 |
MinIO 支持持久事件存储。 持久存储将在 AMQP 代理离线时备份事件,并在代理重新上线时重放它。 可以通过设置 queue_dir 字段中的目录路径和 queue_limit 字段中 queue_dir 中事件的最大限制来配置事件存储。 例如,queue_dir 可以是 /home/events,queue_limit 可以是 1000。默认情况下,queue_limit 设置为 100000。
要更新配置,请使用 mc admin config get notify_amqp 命令获取 notify_amqp 的当前配置。
$ mc admin config get myminio/ notify_amqp
notify_amqp:1 delivery_mode="0" exchange_type="" no_wait="off" queue_dir="" queue_limit="0" url="" auto_deleted="off" durable="off" exchange="" internal="off" mandatory="off" routing_key=""
使用 mc admin config set 命令更新部署的配置。重新启动 MinIO 服务器以使更改生效。 如果没有错误,服务器将在启动时打印类似 SQS ARNs: arn:minio:sqs::1:amqp 的行。
# RabbitMQ 的示例配置如下所示:
# 注:这里官方文档很坑,官方文档给的示例是错的
$ mc admin config set minio/ notify_amqp:1 exchange="bucket_events" exchange_type="direct" mandatory="off" no_wait="on" url="amqp://myuser:mypassword@localhost:5672" auto_deleted="off" delivery_mode="0" durable="on" internal="off" routing_key="logs"
2、使用 MinIO 客户端启用存储桶通知
启用存储桶事件通知以在 myminio 服务器上上传或删除图像存储桶时触发 JPEG 图像。 这里的 ARN 值为 `arn:minio:sqs::1:amqp`。
要了解有关 ARN 的更多信息,请遵循 AWS ARN 文档。
$ mc mb myminio/images
$ mc event add myminio/images arn:minio:sqs::1:amqp --suffix .jpg
$ mc event list myminio/images
arn:minio:sqs::1:amqp s3:ObjectCreated:*,s3:ObjectRemoved:* Filter: suffix=”.jpg”
在RabbitMQ 上测试
官方文档是用 python 测试的,这里使用 Java 进行测试
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ 配置
// 这里以 direct 模式为例
@Configuration
public class DirectRabbitMqConfiguration {
@Bean
public DirectExchange directBucketExchange() {
return new DirectExchange("bucket_events",true,false);
}
/**
* 日志
*/
@Bean
public Queue directBucketQueue() {
return new Queue("bucket_logs",true);
}
/**
* 队列和交换机绑定
*/
@Bean
public Binding directBucketBinding() {
return BindingBuilder.bind(directBucketQueue()).to(directBucketExchange()).with("logs");
}
}
服务层
@Service
public class bucketConsumer {
@RabbitListener(queues = {"bucket_logs"})
public void bucketLogs(String logs) {
System.out.println("bucket_logs 收到消息:-> " + logs);
}
}
测试
将 JPEG 图像上传到图像存储桶中。
$ mc cp ~/Downloads/images/2.jpg minio/images
上传完成后,您应该会通过 RabbitMQ 收到以下事件通知。
{
"EventName": "s3:ObjectCreated:Put",
"Key": "images/2.jpg",
"Records": [
{
"eventVersion": "2.0",
"eventSource": "minio:s3",
"awsRegion": "",
"eventTime": "2021-06-15T01:55:32.135Z",
"eventName": "s3:ObjectCreated:Put",
"userIdentity": {
"principalId": "admin"
},
"requestParameters": {
"principalId": "admin",
"region": "",
"sourceIPAddress": "192.168.122.160"
},
"responseElements": {
"content-length": "0",
"x-amz-request-id": "16889EAD20502152",
"x-minio-deployment-id": "1d98c84f-cd4e-4af5-b09a-a44da43423b4",
"x-minio-origin-endpoint": "http://192.168.122.161:9000"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "Config",
"bucket": {
"name": "images",
"ownerIdentity": {
"principalId": "admin"
},
"arn": "arn:aws:s3:::images"
},
"object": {
"key": "2.jpg",
"size": 16480,
"eTag": "c2af47506b59fe431bec6db25c42eb63",
"contentType": "image/jpeg",
"userMetadata": {
"content-type": "image/jpeg",
"x-minio-internal-inline-data": "true"
},
"sequencer": "16889EAD23DB3576"
}
},
"source": {
"host": "192.168.122.160",
"port": "",
"userAgent": "MinIO (linux; amd64) minio-go/v7.0.11 mc/RELEASE.2021-05-26T19-19-26Z"
}
}
]
}
本作品采用《CC 协议》,转载必须注明作者和本文链接