中文文档

发布事件到Kafka

MinIO 支持将 桶通知 事件发布到 Kafka 服务端点。

MinIO 依赖于 https://github.com/Shopify/sarama 项目来实现与Kafka的连接, 并共享该项目的Kafka支持功能。 要了解更多细节,请查看 sarama 项目的 兼容性和API稳定性 部分。

将 Kafka 端点添加到 MinIO 部署

以下步骤为MinIO部署添加一个新的Kafka服务端点, 以支持 桶通知

先决条件

Kafka 最低版本和支持的版本

MinIO 依赖于 https://github.com/Shopify/sarama 项目来实现与Kafka的连接, 并共享该项目的Kafka支持功能。 要了解更多细节,请查看 sarama 项目的 兼容性和API稳定性 部分。

MinIO mc 命令行工具

此过程使用 mc 命令行工具执行某些操作。 请参阅 mc快速入门 以获取安装说明。

1) 将 Kafka 端点添加到 MinIO

您可以使用环境变量 设置运行时配置设置 来配置新的Kafka服务端点。

MinIO 支持使用 环境变量 来指定 Kafka 服务端点及其相关配置设置。 当您设置环境变量后,minio server 进程将在下一次启动时 应用这些设置。

以下示例代码设置了与配置 Kafka 服务端点相关的 所有 环境变量。 最小的 必需 环境变量是 MINIO_NOTIFY_KAFKA_ENABLEMINIO_NOTIFY_KAFKA_BROKERS:

   export MINIO_NOTIFY_KAFKA_ENABLE_<IDENTIFIER>="on"
   export MINIO_NOTIFY_KAFKA_BROKERS_<IDENTIFIER>="<ENDPOINT>"
   export MINIO_NOTIFY_KAFKA_TOPIC_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_SASL_USERNAME_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_SASL_PASSWORD_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_SASL_MECHANISM_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_TLS_CLIENT_AUTH_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_SASL_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_TLS_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_TLS_SKIP_VERIFY_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_CLIENT_TLS_CERT_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_CLIENT_TLS_KEY_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_QUEUE_DIR_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_QUEUE_LIMIT_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_VERSION_<IDENTIFIER>="<string>"
   export MINIO_NOTIFY_KAFKA_COMMENT_<IDENTIFIER>="<string>"
  • <IDENTIFIER> 替换为 Kafka 服务端点的唯一 描述性字符串。 对于与新的目标服务端点相关的所有环境变量, 使用相同的 <IDENTIFIER> 值。 以下示例假设标识符为 PRIMARY

    如果指定的 <IDENTIFIER> 与 MinIO 部署上现有的 Kafka 服务端点匹配,新的设置将 覆盖 该端点上任何现有的设置。 Use mc admin config get notify_kafka 用于查看MinIO部署上当前配置的Kafka端点。

  • <ENDPOINT> 替换为以逗号分隔的 Kafka 代理列表。 例如:

    kafka1.example.com:2021,kafka2.example.com:2021

请参阅 用于存储桶通知的 Kafka 服务 以获取每个环境变量的完整文档说明。

MinIO 支持在运行中的 minio server 进程上使用 mc admin config set 命令和 notify_kafka 配置键添加或更新 Kafka 端点。 您必须重新启动 minio server 进程以应用 任何新的或更新的配置设置。

以下示例代码设置了与配置 Kafka 服务端点相关的 所有 设置。 最小 需要 设置是 notify_kafka brokers:

mc admin config set ALIAS/ notify_kafka:IDENTIFIER \
   brokers="<ENDPOINT>" \
   topic="<string>" \
   sasl_username="<string>" \
   sasl_password="<string>" \
   sasl_mechanism="<string>" \
   tls_client_auth="<string>" \
   tls="<string>" \
   tls_skip_verify="<string>" \
   client_tls_cert="<string>" \
   client_tls_key="<string>" \
   version="<string>" \
   queue_dir="<string>" \
   queue_limit="<string>" \
   comment="<string>"
  • IDENTIFIER 替换为 Kafka 服务端点的唯一 描述性字符串。 此过程中的以下示例 假设标识符为 PRIMARY

    如果指定的 IDENTIFIER 与 MinIO 部署上 现有的 Kafka 服务端点匹配,新的设置将 覆盖 该端点上的任何现有设置。 使用 mc admin config get notify_kafka to 查看 MinIO 部署上当前配置的 Kafka 端点。

  • ENDPOINT 替换为 Kafka 代理的逗号分隔列表。 例如:

    kafka1.example.com:2021,kafka2.example.com:2021

请参阅 Kafka Bucket Notification Configuration Settings 以获取每个设置的完整文档说明。

2) 重新启动MinIO部署。

您必须重新启动MinIO部署以应用配置更改。 使用 mc admin service restart 命令来重新启动部署。

mc admin service restart ALIAS

ALIAS 替换为要重新启动的部署的 别名

minio server 进程在启动时将为每个配置的 Kafka 目标打印一行, 类似于以下内容:

SQS ARNs: arn:minio:sqs::primary:kafka

当您将关联的 Kafka 部署配置为目标的桶通知时, 您必须指定 ARN 资源。

Identifying the ARN for your bucket notifications

在创建端点之前,您定义了 <IDENTIFIER> 以便为您的存储桶通知分配目标ARN。 以下是返回部署上配置的ARNs的步骤: 识别之前创建的ARN,请查找您指定的 <IDENTIFIER>

查看 JSON 输出

  1. 复制并运行以下命令,将 ALIAS 替换为部署的 别名

    mc admin info --json ALIAS
    
  2. 在JSON输出中,查找键 info.sqsARN

    您需要的ARN是匹配您指定的 <IDENTIFIER> 的该键的值。

    例如, arn:minio:sqs::primary:kafka.

使用 jq 解析 JSON 中的值

  1. Install jq

  2. 复制并运行以下命令,将 ALIAS 替换为部署的 别名

    mc admin info --json ALIAS | jq  .info.sqsARN
    

    这将返回用于通知的ARN,例如 arn:minio:sqs::primary:kafka

3) 使用 Kafka 端点作为目标配置桶通知

要使用配置好的 Kafka 服务作为目标添加新的桶通知事件, 请使用 mc event add 命令:

mc event add ALIAS/BUCKET arn:minio:sqs::primary:kafka \
  --event EVENTS
  • ALIAS 替换为MinIO部署的 别名

  • BUCKET 替换为您要配置事件的桶的 名称。

  • EVENTS 替换为 MinIO 触发通知的 事件 的逗号分隔列表。

使用 mc event ls 命令查看给定通知目标的 所有配置桶事件:

mc event ls ALIAS/BUCKET arn:minio:sqs::primary:kafka

4) 验证配置的事件

对桶执行一个操作,然后检查 Kafka 服务以 确认通知数据。 所需操作取决于在配置桶通知时指定的 events

例如,如果桶通知配置包括 s3:ObjectCreated:Put 事件, 您可以使用 mc cp 命令在 桶中创建一个新对象并 触发通知。

mc cp ~/data/new-object.txt ALIAS/BUCKET

更新 MinIO 部署中的 Kafka 端点

以下步骤将更新 MinIO 部署中现有的 Kafka 服务端点, 以支持 桶通知

先决条件

Kafka 最低版本和支持的版本

MinIO 依赖于 https://github.com/Shopify/sarama 项目来实现与Kafka的连接, 并共享该项目的Kafka支持功能。 要了解更多细节,请查看 sarama 项目的 兼容性和API稳定性 部分。

MinIO mc 命令行工具

此过程使用 mc 命令行工具执行某些操作。 请参阅 mc快速入门 以获取安装说明。

1) 列出部署中配置的 Kafka 端点

使用 mc admin config get 命令列出当前 在部署中配置的 Kafka 服务端点:

mc admin config get ALIAS/ notify_kafka

ALIAS 替换为 MinIO 部署的 别名

命令的输出类似于以下内容:

notify_kafka:primary tls_skip_verify="off"  queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""
notify_kafka:secondary tls_skip_verify="off"  queue_dir="" queue_limit="0" sasl="off" sasl_password="" sasl_username="" tls_client_auth="0" tls="off" brokers="" topic="" client_tls_cert="" client_tls_key="" version=""

notify_kafka 是一个顶级配置键, 用于配置 Kafka 通知设置brokers 键指定了 给定 notify_kafka 键的 Kafka 服务端点。 notify_kafka:<IDENTIFIER> 后缀描述了该 Kafka 服务端点的唯一标识符。

请注意,在下一步中,您想要更新的 Kafka 服务端点的 标识符。

2) 更新 Kafka 端点

使用 mc admin config set 命令为Kafka服务端点设置 新配置:

mc admin config set ALIAS/ notify_kafka:<IDENTIFIER> \
   brokers="https://kafka1.example.net:9200, https://kafka2.example.net:9200" \
   topic="<string>" \
   sasl_username="<string>" \
   sasl_password="<string>" \
   sasl_mechanism="<string>" \
   tls_client_auth="<string>" \
   tls="<string>" \
   tls_skip_verify="<string>" \
   client_tls_cert="<string>" \
   client_tls_key="<string>" \
   version="<string>" \
   queue_dir="<string>" \
   queue_limit="<string>" \
   comment="<string>"

配置设置 notify_kafka brokers 是Kafka服务端点 所需的 最低 要求。 所有其他配置 设置都是 可选的。 请参阅: Kafka 通知设置 以获取完整的Kafka配置设置列表。

3) 重新启动MinIO部署。

您必须重新启动MinIO部署以应用配置更改。 使用 mc admin service restart 命令来重新启动部署。

mc admin service restart ALIAS

ALIAS 替换为要重新启动的部署的 别名

minio server 进程在启动时为每个配置的Kafka目标打印一行, 如下所示:

SQS ARNs: arn:minio:sqs::primary:kafka

4) 验证更改

对具有事件配置的桶执行一个操作, 并使用更新后的Kafka服务端点检查Kafka服务中的通知数据。 所需操作取决于在配置桶通知时指定的 events

例如,如果桶通知配置包括 s3:ObjectCreated:Put 事件, 您可以使用 mc cp 命令在 桶中创建一个新对象并 触发通知。

mc cp ~/data/new-object.txt ALIAS/BUCKET
Join Slack 商业支持购买咨询