KEDAは「Kubernetes-based Event-Driven Autoscaling」の略で、イベントをトリガーとしてKubernetesのデプロイを管理することを目的としたCRD(Custom Resource Definitio)で、イベントを使ってpodを0~nにスケールイン・アウトできる機能です。
KEDA is a Kubernetes-based event-driven autoscaler. KEDA determines how any container in Kubernetes should be scaled based on the number of events that need to be processed.
CNCFではSandbox Projectとして入っています。
この記事では、AzureのServiceBusのQueueと連携させてジョブをスケールさせてみます。ちなみに、MSが中心に開発しているようですが、AWSやGCPのPubSubなどにも対応しているようです。
環境準備
ServiceBus準備
まずはServiceBusを準備するので、変数を設定しておきます。値は環境に合わせて変えてください。
RG=rg-keda LOCATION=japaneast SB=sb-keda-space SBQ=sbq-keda-queue SBQ_AUTH_MGMT=sbq-keda-queue-auth-mgmt
以下の通りAzure CLI使ってServiceBusを準備します。
# ServiceBusのResourceGroup作成 $ az group create \ --resource-group ${RG} \ --location ${LOCATION} # ServiceBusのNamespace作成 $ az servicebus namespace create \ --resource-group ${RG} \ --name ${SB} \ --sku basic # ServiceBus Queue作成 $ az servicebus queue create \ --resource-group ${RG} \ --namespace-name ${SB} \ --name ${SBQ} # ServiceBus Queueのアクセスポリシーを作成します # 本当は分けるべきですが、シンプルにするため管理権限で作成します $ az servicebus queue authorization-rule create \ --resource-group ${RG} \ --namespace-name ${SB} \ --queue-name ${SBQ} \ --name ${SBQ_AUTH_MGMT} \ --rights Manage Send Listen
KEDAのインストール
KubernetesクラスタにKEDAをインストールします。
まずはHelmのバージョン確認します。
$ helm version version.BuildInfo{Version:"v3.1.2", GitCommit:"d878d4d45863e42fd5cff6743294a11d28a9abce", GitTreeState:"clean", GoVersion:"go1.13.8"}
後は以下のURLを参考にデプロイします。
今回はHelmのメジャーバージョンが3なので以下のような手順になります。
$ helm repo add kedacore https://kedacore.github.io/charts
$ helm repo update
$ kubectl create namespace keda
$ helm install keda kedacore/keda --namespace keda
Namespaceのkedaを確認すると以下の二つのPodが起動しているのがわかります。
$ kubectl get po -n keda NAME READY STATUS RESTARTS AGE keda-operator-dbfbd6bdb-whsn7 1/1 Running 0 20s keda-operator-metrics-apiserver-8678f8c5d9-rbvxm 1/1 Running 0 20s
検証用アプリ準備
検証用に簡単なアプリ作りました。
ServiceBus送信側
送信する側です。今回はローカルのDockerからこちらを実行しようと思います。
SEND_MSG_NUMの環境変数調整することで送信するメッセージ数を調整できます。
import os from datetime import datetime from azure.servicebus import QueueClient, Message conn_str = os.environ.get("KEDA_SERVICEBUS_QUEUE_CONNECTIONSTRING") msg_num = int(os.environ.get("SEND_MSG_NUM", 1)) queue_client = QueueClient.from_connection_string(conn_str) for i in range(1, msg_num + 1): text = f"MSG:{i}:{datetime.now()}" queue_client.send(Message(text)) print(f"Msg sent -> {text}")
ServiceBus受信側
Queueより1メッセージだけ取り出して表示するだけのコードです。
import os from azure.servicebus import QueueClient, Message conn_str = os.environ.get("KEDA_SERVICEBUS_QUEUE_CONNECTIONSTRING") queue_client = QueueClient.from_connection_string(conn_str) with queue_client.get_receiver() as queue_receiver: messages = queue_receiver.fetch_next(max_batch_size=1, timeout=3) for message in messages: print(f"Received Msg -> {message}") message.complete()
Dockerfile準備
送信、受信ともに以下の定義で共通です。
FROM python:3.7.7-slim-buster WORKDIR /usr/src/app RUN pip install azure.servicebus COPY ./app.py ./ CMD ["python","app.py"]
コンテナビルド
以下のように配置します。
app/ ├── receiver │ ├── Dockerfile │ └── app.py └── sender ├── Dockerfile └── app.py
コンテナをそれぞれビルドします。
# 送信側はローカルのDockerから実行するためにローカルでビルド docker build -t sbq-sender ./app/sender # 受信側はACR上にビルド ACR_NAME=myacr az acr build --registry ${ACR_NAME} --image sbq-receiver ./app/receiver
Kubernetesにデプロイ
ServiceBusへの接続文字列取得
はずはServiceBusへの接続文字列のテキストを取得します。 SBQ_CONN_STR
の変数は後で使うのでそのまま取っておきます。
SBQ_CONN_STR=$(az servicebus queue authorization-rule keys list \ --resource-group ${RG} \ --namespace-name ${SB} \ --queue-name ${SBQ} \ --name ${SBQ_AUTH_MGMT} \ --query 'primaryConnectionString' --output tsv) # SBQ_CONN_STRはこんな文字列です⇒Endpoint=sb://sb-keda-space.servicebus.windows.net/;SharedAccessKeyName=xxxxx
Base64にエンコードして出力された値をコピーしておきます。
WSLならclip.exe使うと便利です。
$ echo ${SBQ_CONN_STR} | base64 -w0 | clip.exe
Kubernetes Manifest
sbq-connectionstring
に先程取得した ${SBQ_CONN_STR}
をBase64にエンコードした値を設定します。ACRの値も自身の環境のもに変えてください。
apiVersion: v1 kind: Secret metadata: name: sbq-secrets data: sbq-connectionstring: RW5kcG9pbnQ9c2I6Ly9zYi1rZ... # ☆先程コピーしたBase64の接続文字列 --- apiVersion: keda.k8s.io/v1alpha1 kind: TriggerAuthentication metadata: name: azure-servicebus-auth spec: secretTargetRef: - parameter: connection name: sbq-secrets key: sbq-connectionstring --- apiVersion: keda.k8s.io/v1alpha1 kind: ScaledObject metadata: name: sbq-scaled-receive namespace: default spec: scaleType: job pollingInterval: 10 cooldownPeriod: 30 minReplicaCount: 0 maxReplicaCount: 15 ttlSecondsAfterFinished: 0 triggers: - type: azure-servicebus metadata: queueName: sbq-keda-queue queueLength: '5' # ここがPodのスケールのMax値 authenticationRef: name: azure-servicebus-auth jobTargetRef: parallelism: 1 completions: 1 activeDeadlineSeconds: 60 backoffLimit: 1 template: spec: restartPolicy: Never containers: - name: sbq-receiver image: myacr.azurecr.io/sbq-receiver # ☆ACRのURLは値を自身の環境のものを設定します env: - name: KEDA_SERVICEBUS_QUEUE_CONNECTIONSTRING valueFrom: secretKeyRef: name: sbq-secrets key: sbq-connectionstring
ScaledObjectのパラメータは以下を直接参照するのが一番詳しいと思います。
https://github.com/kedacore/keda/blob/v1.3.0/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml
デプロイ
Manifestを準備できたら以下のようにデプロイします。
kubectl apply -f scaled-receve-job.yaml
デプロイできていることを確認してみます。
$ kubectl get ScaledObject NAME DEPLOYMENT TRIGGERS AGE sbq-scaled-receive azure-servicebus 82s # この時点ではジョブは実行されていません $ kubectl get po No resources found in default namespace.
Kedaを試してみる
モニタリングコマンド
いくつかコマンドを打ってモニタリングすると動きが良くわかります。
このコマンドでpodの一覧を表示します。 -w
もありますが、 watch -dが変更もわかって面白いです。
watch -n3 -d kubectl get po
以下のように、ServiceBusのQueueの状況をモニタリングできます。
$ watch -n2 -d az servicebus queue show --resource-group ${RG} --namespace-name ${SB} --name ${SBQ} --query 'countDetails' { "activeMessageCount": 0, "deadLetterMessageCount": 0, "scheduledMessageCount": 0, "transferDeadLetterMessageCount": 0, "transferMessageCount": 0 }
KEDAがServiceBusの状況をLoopで確認して、Jobを実行するのが確認できます。
kubectl logs -n keda -f keda-operator-dbfbd6bdb-{hash}
メッセージ投入
先程ビルドしたイメージからメッセージを投入できます。 SEND_MSG_NUM
で送信するメッセージ数を指定できます。
$ docker run \ -e KEDA_SERVICEBUS_QUEUE_CONNECTIONSTRING=${SBQ_CONN_STR} \ -e SEND_MSG_NUM=4 \ sbq-sender
このManifestの設定だとシーケンシャルに1メッセージずつ処理する様子が見て取れます。全部のメッセージ処理が終わると、新規のジョブが作られることは無くなります。
$ kubectl get po NAME READY STATUS RESTARTS AGE sbq-scaled-receive-5jtzn-tmqvn 1/1 Running 0 3s sbq-scaled-receive-czqs8-4dc78 0/1 Completed 0 73s sbq-scaled-receive-fgqrp-x8fmp 0/1 Completed 0 63s sbq-scaled-receive-ggfq7-rjdl6 0/1 Completed 0 93s sbq-scaled-receive-hv4zd-mq2ct 0/1 Completed 0 13s sbq-scaled-receive-ktmdf-4mvpr 0/1 Completed 0 83s
最後に少しメモ
仕組みをちょっとだけ確認しておきました。
LOOP内でServiceBusのQueueの長さを取ってきています。先程のモニタリングコマンドで言うと、 kubectl logs -n keda -f keda-operator-dbfbd6bdb-{hash}
で確認できる内容です。
https://github.com/kedacore/keda/blob/v1.3.0/pkg/handler/scale_loop.go#L65-L100
https://github.com/kedacore/keda/blob/v1.3.0/pkg/scalers/azure_servicebus_scaler.go#L154
// Returns the current metrics to be served to the HPA func (s *azureServiceBusScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { queuelen, err := s.GetAzureServiceBusLength(ctx)
Queueの長さに合わせてジョブを生成するようです。
https://github.com/kedacore/keda/blob/master/pkg/handler/scale_loop.go#L79-L99
metrics, _ := scaler.GetMetrics(ctx, "queueLength", nil) for _, m := range metrics { if m.MetricName == "queueLength" { metricValue, _ = m.Value.AsInt64() queueLength += metricValue } } //省略--- h.scaleJobs(scaledObject, isScaledObjectActive, queueLength, maxValue)
ここで呼び出される h.scaleJobs(scaledObject, isScaledObjectActive, queueLength, maxValue)
のシグネチャは以下の部分になります。ここでわかりにくいのですが、k8sのyamlにあった spec.triggers.metadata.queueLength
はqueueLength
でなく maxScale
にマッピングされます。
https://github.com/kedacore/keda/blob/v1.3.0/pkg/handler/scale_jobs.go#L18
func (h *ScaleHandler) scaleJobs(scaledObject *kedav1alpha1.ScaledObject, isActive bool, scaleTo int64, maxScale int64) {
この queueLength
と maxScale
の値を元にスケール値を決めます。
また、今回は接続文字列で認証しましたが、aad-pod-identityにも対応しているようです。
apiVersion: keda.k8s.io/v1alpha1 kind: TriggerAuthentication metadata: name: azure-servicebus-auth spec: podIdentity: provider: azure