KEDAを使ってServiceBus Queueに合わせてKubernetesジョブをスケールする

KEDAは「Kubernetes-based Event-Driven Autoscaling」の略で、イベントをトリガーとしてKubernetesのデプロイを管理することを目的としたCRD(Custom Resource Definitio)で、イベントを使ってpodを0~nにスケールイン・アウトできる機能です。

KEDA is a Kubernetes-based event-driven autoscaler. KEDA determines how any container in Kubernetes should be scaled based on the number of events that need to be processed.

keda.sh

CNCFではSandbox Projectとして入っています。

www.cncf.io

この記事では、AzureのServiceBusのQueueと連携させてジョブをスケールさせてみます。ちなみに、MSが中心に開発しているようですが、AWSやGCPのPubSubなどにも対応しているようです。

https://keda.sh/docs/scalers/

環境準備

ServiceBus準備

まずはServiceBusを準備するので、変数を設定しておきます。値は環境に合わせて変えてください。

RG=rg-keda
LOCATION=japaneast
SB=sb-keda-space
SBQ=sbq-keda-queue
SBQ_AUTH_MGMT=sbq-keda-queue-auth-mgmt

以下の通りAzure CLI使ってServiceBusを準備します。

# ServiceBusのResourceGroup作成
$ az group create \
 --resource-group ${RG}  \
 --location ${LOCATION}

# ServiceBusのNamespace作成
$ az servicebus namespace create \
 --resource-group ${RG} \
 --name ${SB} \
 --sku basic

# ServiceBus Queue作成
$ az servicebus queue create \
 --resource-group ${RG} \
 --namespace-name ${SB} \
 --name ${SBQ}

# ServiceBus Queueのアクセスポリシーを作成します
# 本当は分けるべきですが、シンプルにするため管理権限で作成します
$ az servicebus queue authorization-rule create \
 --resource-group ${RG}  \
 --namespace-name  ${SB} \
 --queue-name ${SBQ} \
 --name ${SBQ_AUTH_MGMT} \
 --rights Manage Send Listen

KEDAのインストール

KubernetesクラスタにKEDAをインストールします。

まずはHelmのバージョン確認します。

$ helm version
version.BuildInfo{Version:"v3.1.2", GitCommit:"d878d4d45863e42fd5cff6743294a11d28a9abce", GitTreeState:"clean", GoVersion:"go1.13.8"}

後は以下のURLを参考にデプロイします。

https://keda.sh/docs/deploy/

今回はHelmのメジャーバージョンが3なので以下のような手順になります。

$ helm repo add kedacore https://kedacore.github.io/charts
$ helm repo update
$ kubectl create namespace keda
$ helm install keda kedacore/keda --namespace keda

Namespaceのkedaを確認すると以下の二つのPodが起動しているのがわかります。

$ kubectl get po -n keda
NAME                                               READY   STATUS    RESTARTS   AGE
keda-operator-dbfbd6bdb-whsn7                      1/1     Running   0          20s
keda-operator-metrics-apiserver-8678f8c5d9-rbvxm   1/1     Running   0          20s

検証用アプリ準備

検証用に簡単なアプリ作りました。

ServiceBus送信側

送信する側です。今回はローカルのDockerからこちらを実行しようと思います。

SEND_MSG_NUMの環境変数調整することで送信するメッセージ数を調整できます。

import os
from datetime import datetime

from azure.servicebus import QueueClient, Message

conn_str = os.environ.get("KEDA_SERVICEBUS_QUEUE_CONNECTIONSTRING")
msg_num = int(os.environ.get("SEND_MSG_NUM", 1))

queue_client = QueueClient.from_connection_string(conn_str)
for i in range(1, msg_num + 1):
    text = f"MSG:{i}:{datetime.now()}"
    queue_client.send(Message(text))
    print(f"Msg sent -> {text}")

ServiceBus受信側

Queueより1メッセージだけ取り出して表示するだけのコードです。

import os

from azure.servicebus import QueueClient, Message

conn_str = os.environ.get("KEDA_SERVICEBUS_QUEUE_CONNECTIONSTRING")

queue_client = QueueClient.from_connection_string(conn_str)
with queue_client.get_receiver() as queue_receiver:
    messages = queue_receiver.fetch_next(max_batch_size=1, timeout=3)
    for message in messages:
        print(f"Received Msg -> {message}")
        message.complete()

Dockerfile準備

送信、受信ともに以下の定義で共通です。

FROM python:3.7.7-slim-buster

WORKDIR /usr/src/app

RUN pip install azure.servicebus

COPY ./app.py ./

CMD ["python","app.py"]

コンテナビルド

以下のように配置します。

app/
├── receiver
│   ├── Dockerfile
│   └── app.py
└── sender
    ├── Dockerfile
    └── app.py

コンテナをそれぞれビルドします。

# 送信側はローカルのDockerから実行するためにローカルでビルド
docker build -t sbq-sender ./app/sender

# 受信側はACR上にビルド
ACR_NAME=myacr
az acr build  --registry ${ACR_NAME} --image sbq-receiver ./app/receiver

Kubernetesにデプロイ

ServiceBusへの接続文字列取得

はずはServiceBusへの接続文字列のテキストを取得します。 SBQ_CONN_STR の変数は後で使うのでそのまま取っておきます。

SBQ_CONN_STR=$(az servicebus queue authorization-rule keys list \
  --resource-group ${RG}  \
  --namespace-name ${SB} \
  --queue-name ${SBQ} \
  --name ${SBQ_AUTH_MGMT} \
  --query 'primaryConnectionString' --output tsv)


# SBQ_CONN_STRはこんな文字列です⇒Endpoint=sb://sb-keda-space.servicebus.windows.net/;SharedAccessKeyName=xxxxx

Base64にエンコードして出力された値をコピーしておきます。

WSLならclip.exe使うと便利です。

$ echo ${SBQ_CONN_STR} | base64 -w0 | clip.exe

Kubernetes Manifest

sbq-connectionstring に先程取得した ${SBQ_CONN_STR} をBase64にエンコードした値を設定します。ACRの値も自身の環境のもに変えてください。

apiVersion: v1
kind: Secret
metadata:
  name: sbq-secrets
data:
  sbq-connectionstring: RW5kcG9pbnQ9c2I6Ly9zYi1rZ...    # ☆先程コピーしたBase64の接続文字列    
---
apiVersion: keda.k8s.io/v1alpha1
kind: TriggerAuthentication
metadata:
  name: azure-servicebus-auth
spec:
  secretTargetRef:
  - parameter: connection
    name: sbq-secrets
    key: sbq-connectionstring
---
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
  name: sbq-scaled-receive
  namespace: default
spec:
  scaleType: job
  pollingInterval: 10
  cooldownPeriod:  30
  minReplicaCount: 0
  maxReplicaCount: 15
  ttlSecondsAfterFinished: 0
  triggers:
  - type: azure-servicebus
    metadata:
      queueName: sbq-keda-queue
      queueLength: '5'     # ここがPodのスケールのMax値
    authenticationRef:
      name: azure-servicebus-auth
  jobTargetRef:
    parallelism: 1
    completions: 1
    activeDeadlineSeconds: 60
    backoffLimit: 1
    template:
      spec:
        restartPolicy: Never
        containers:
          - name: sbq-receiver
            image: myacr.azurecr.io/sbq-receiver # ☆ACRのURLは値を自身の環境のものを設定します
            env:
            - name: KEDA_SERVICEBUS_QUEUE_CONNECTIONSTRING
              valueFrom:
                secretKeyRef:
                  name: sbq-secrets
                  key: sbq-connectionstring

ScaledObjectのパラメータは以下を直接参照するのが一番詳しいと思います。

https://github.com/kedacore/keda/blob/v1.3.0/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml

デプロイ

Manifestを準備できたら以下のようにデプロイします。

kubectl apply -f scaled-receve-job.yaml 

デプロイできていることを確認してみます。

$ kubectl get ScaledObject 
NAME                 DEPLOYMENT   TRIGGERS           AGE
sbq-scaled-receive                azure-servicebus   82s

# この時点ではジョブは実行されていません
$ kubectl get po
No resources found in default namespace.

Kedaを試してみる

モニタリングコマンド

いくつかコマンドを打ってモニタリングすると動きが良くわかります。

このコマンドでpodの一覧を表示します。 -w もありますが、 watch -dが変更もわかって面白いです。

watch -n3 -d kubectl get po

以下のように、ServiceBusのQueueの状況をモニタリングできます。

$ watch -n2 -d az servicebus queue show --resource-group ${RG} --namespace-name ${SB} --name ${SBQ} --query 'countDetails'
{
  "activeMessageCount": 0,
  "deadLetterMessageCount": 0,
  "scheduledMessageCount": 0,
  "transferDeadLetterMessageCount": 0,
  "transferMessageCount": 0
}

KEDAがServiceBusの状況をLoopで確認して、Jobを実行するのが確認できます。

kubectl logs -n keda -f keda-operator-dbfbd6bdb-{hash}

メッセージ投入

先程ビルドしたイメージからメッセージを投入できます。 SEND_MSG_NUM で送信するメッセージ数を指定できます。

$ docker run \
 -e KEDA_SERVICEBUS_QUEUE_CONNECTIONSTRING=${SBQ_CONN_STR} \
 -e SEND_MSG_NUM=4 \
 sbq-sender

このManifestの設定だとシーケンシャルに1メッセージずつ処理する様子が見て取れます。全部のメッセージ処理が終わると、新規のジョブが作られることは無くなります。

$ kubectl get po
NAME                             READY   STATUS      RESTARTS   AGE
sbq-scaled-receive-5jtzn-tmqvn   1/1     Running     0          3s
sbq-scaled-receive-czqs8-4dc78   0/1     Completed   0          73s
sbq-scaled-receive-fgqrp-x8fmp   0/1     Completed   0          63s
sbq-scaled-receive-ggfq7-rjdl6   0/1     Completed   0          93s
sbq-scaled-receive-hv4zd-mq2ct   0/1     Completed   0          13s
sbq-scaled-receive-ktmdf-4mvpr   0/1     Completed   0          83s

最後に少しメモ

仕組みをちょっとだけ確認しておきました。

LOOP内でServiceBusのQueueの長さを取ってきています。先程のモニタリングコマンドで言うと、 kubectl logs -n keda -f keda-operator-dbfbd6bdb-{hash} で確認できる内容です。

https://github.com/kedacore/keda/blob/v1.3.0/pkg/handler/scale_loop.go#L65-L100

https://github.com/kedacore/keda/blob/v1.3.0/pkg/scalers/azure_servicebus_scaler.go#L154

// Returns the current metrics to be served to the HPA
func (s *azureServiceBusScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
    queuelen, err := s.GetAzureServiceBusLength(ctx)

Queueの長さに合わせてジョブを生成するようです。

https://github.com/kedacore/keda/blob/master/pkg/handler/scale_loop.go#L79-L99

       metrics, _ := scaler.GetMetrics(ctx, "queueLength", nil)

        for _, m := range metrics {
            if m.MetricName == "queueLength" {
                metricValue, _ = m.Value.AsInt64()
                queueLength += metricValue
            }
        }
//省略---
    h.scaleJobs(scaledObject, isScaledObjectActive, queueLength, maxValue)

ここで呼び出される h.scaleJobs(scaledObject, isScaledObjectActive, queueLength, maxValue) のシグネチャは以下の部分になります。ここでわかりにくいのですが、k8sのyamlにあった spec.triggers.metadata.queueLengthqueueLength でなく maxScale にマッピングされます。

https://github.com/kedacore/keda/blob/v1.3.0/pkg/handler/scale_jobs.go#L18

func (h *ScaleHandler) scaleJobs(scaledObject *kedav1alpha1.ScaledObject, isActive bool, scaleTo int64, maxScale int64) {

この queueLengthmaxScale の値を元にスケール値を決めます。

また、今回は接続文字列で認証しましたが、aad-pod-identityにも対応しているようです。

apiVersion: keda.k8s.io/v1alpha1
kind: TriggerAuthentication
metadata:
  name: azure-servicebus-auth
spec:
  podIdentity:
    provider: azure

github.com