Step FunctionsのSDKサービス統合を使って多重実行を許容しないステートマシンを構築する

AWS

step functionsのSDKサービス統合を使って、多重実行を許容しないステートマシンを構築しましたので、忘備録がてらまとめようと思います。

業務でステートマシンを使ってバッチ処理を実行しているのですが、同じ時間帯に意図せず同じバッチが多重実行される可能性が出てきました。

そこで、バッチ(ステートマシン)の多重実行を防止するため、ステートマシンの実行数を取得し、それが2以上の時(つまり、多重実行の時)はステートマシンの起動を失敗させるようにしたいと考えました。

調べてみると、Step FunctionsのSDKサービス統合というものを使用すると、ステートマシンの実行数を取得できるようです。

そこで、SDKサービス統合を使ってステートマシンの実行数を取得し、多重実行を許容しないステートマシンを構築することにしました。

Step FunctionsのSDKサービス統合では、Step FunctionsのワークフローからAWSの各サービスのAPIアクションを呼び出すことができます。

ほぼすべてのAWSサービスのAPIアクションを呼び出すことができますが、一部呼び出せないものもあります。サポートされているSDKサービス統合はこちらから確認できます。

SDKサービス統合を使用するには、ワークフローのTaskステートのResourceフィールドにおいて、以下のように使用したいAPIアクションを指定します。

Type: Task
Resource: arn:aws:states:::aws-sdk:serviceName:apiAction.[serviceIntegrationPattern]

例えば、APIアクションとしてstepfunctionsのlist-executionsを使用する場合、以下のようになります(参考)。

Type: Task
Resource: arn:aws:states:::aws-sdk:sfn:listExecutions

また、APIアクションによってはオプションを渡したい時があります。オプションはTaskステートのParametersフィールドで指定することができます(参考)。例えば、list-executionsでオプション--state-machine-arnするには以下のようにします。

Type: Task
Resource: "arn:aws:states:::aws-sdk:sfn:listExecutions"
Parameters:
    StateMachineArn: "(state machine arnを記載)"

それでは、早速、SDKサービス統合を使用し、多重実行を許容しないステートマシンを構築します。今回は以下のようなシンプルなステートマシンを構築します。

各ステートを説明すると以下の通りです。

  1. Count running executions: このステートマシンの実行数を取得します。ここでSDKサービス統合を使用します。使用するAPIアクションはlist-executionsです。
  2. Choice: ステートマシンの実行数が2以上かどうか(多重実行かどうか)によって次の状態を切り替えます。
  3. Fail State: ステートマシンの実行数が2以上のとき(多重実行のとき)はこの状態が選択され、ステートマシンが失敗します。
  4. Wait: ステートマシンの実行数が2未満のとき(多重実行でないとき)はこの状態が選択され、所定時間(今回は5分間)待機します。

以下、上記のステートマシンのコードを説明します。

ディレクトリ構成

terraformを使って構築します。ディレクトリ構成は以下の通りです。

.
└── terraform
    └── workflow
        ├── iam_state_machine.tf
        ├── sfn_state_machine.tf
        ├── templates
        │   └── state_machine
        │       └── check_duplicate_execution.asl.yaml
        └── variables.tf

ステートマシンの定義

  • check_duplicate_execution.asl.yaml
StartAt: Count running executions
States:
    Count running executions:
        Next: Choice
        # SDKサービス統合を使用するにはTypeフィールドでTaskを指定する
        Type: Task
        #SDK統合サービス統合でAPIアクションlistExecutionsを使用
         Resource: "arn:aws:states:::aws-sdk:sfn:listExecutions"         
        # 実行数を取得したいので、組み込み関数States.ArrayLengthを使用。詳細は後述。
        # 実行数はcountに格納される。
        ResultSelector:
            count.$: "States.ArrayLength($.Executions)"
        ResultPath: $.result
        Parameters:
            # StateMachineArn: 実行情報を取得するステートマシンのARN。ここではこのステートマシンのARNを指定。
            StateMachineArn: "arn:aws:states:ap-northeast-1:${aws_account_id}:stateMachine:check-duplicate-execution"
            # StatusFilter: ステータスで絞り込み。ここではRUNNING(現在実行中)を指定。
            StatusFilter: "RUNNING"
    Choice:
        Type: Choice
        Choices:
            # 実行数が2以上の場合、Fail Stateに移行するようにする。
            - Variable: $.result.count
              NumericGreaterThanEquals: 2
              Next: Fail State
        Default: Wait
    Fail State:
        Type: Fail
        Cause: "state machines duplicate execution"
    Wait:
        End: true
        Type: Wait
        Seconds: 300

ここでは、ステートCount running executionsでSDKサービス統合を使用し、list-executionsのAPIアクションを使用しています。

ParametersのStateMachineArnではこのステートマシンのARNを指定しており、このステートマシンの実行数を取得します。

注意点としては、ResultSelectorで組み込み関数States.ArrayLengthを使っています。これは以下の理由によります。

list-executionsでは、以下のように”executions”のフィールドの値が実行情報を要素とする配列になります。

% aws stepfunctions list-executions --state-machine-arn "arn:aws:states:ap-northeast-1:{aws_account_id}:stateMachine:check-duplicate-execution" --status-filter "RUNNING"
{
    "executions": [
        {
            "executionArn": "arn:aws:states:ap-northeast-1:{aws_account_id}:execution:check-duplicate-execution:b93f0a79-1467-4c35-84e2-cffd5fa05bdd",
            "stateMachineArn": "arn:aws:states:ap-northeast-1:{aws_account_id}:stateMachine:check-duplicate-execution",
            "name": "b93f0a79-1467-4c35-84e2-cffd5fa05bdd",
            "status": "RUNNING",
            "startDate": "2024-03-24T18:13:55.253000+09:00"
        }
    ]
}

これから、実行数を取得するため、”executions”のフィールドの値(配列)の要素数をStates.ArrayLengthを使って取得しています。

ステートマシンのリソース作成

  • sfn_state_machine.tf
resource "aws_sfn_state_machine" "state_machine_check_duplicate_execution" {
  name     = "check-duplicate-execution"
  role_arn = aws_iam_role.state_machine_check_duplicate_execution_iam_role.arn
  definition = jsonencode(yamldecode(templatefile("${path.module}/templates/state_machine/check_duplicate_execution.asl.yaml", {
    aws_account_id = data.aws_caller_identity.current.account_id
    }
    )
  ))
}

権限管理

  • iam_state_machine.tf
# ロール
data "aws_iam_policy_document" "state_machine_check_duplicate_execution_iam_role_document" {
  statement {
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["states.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "state_machine_check_duplicate_execution_iam_role" {
  name               = "state-machine-check-duplicate-execution"
  assume_role_policy = data.aws_iam_policy_document.state_machine_check_duplicate_execution_iam_role_document.json
}

# IAMポリシー
# ステートマシンがSDKサービス統合のAPIアクションlist-executionsを使用することを許可する
data "aws_iam_policy_document" "state_machine_check_duplicate_execution_iam_policy_document" {
  statement {
    effect = "Allow"
    actions = [
      "states:ListExecutions"
    ]
    # list-executionsで実行情報を取得できるステートマシンを指定する。ここではワイルドカードを指定し  任意のステートマシンの実行情報を取得できるようにしている。
    resources = ["*"]
  }
}

# IAMポリシーアタッチ
resource "aws_iam_policy" "state_machine_check_duplicate_execution_iam_policy" {
  name   = "state-machine-check-duplicate-execution"
  policy = data.aws_iam_policy_document.state_machine_check_duplicate_execution_iam_policy_document.json
}

resource "aws_iam_policy_attachment" "state_check_duplicate_execution_role_policy" {
  name       = "state-machine-check-duplicate-policy-attachment"
  roles      = [aws_iam_role.state_machine_check_duplicate_execution_iam_role.name]
  policy_arn = aws_iam_policy.state_machine_check_duplicate_execution_iam_policy.arn
}

上記のように、SDKサービス統合を使用するには権限を付与する必要があります(参考)。

変数

  • variables.tf
data "aws_caller_identity" "current" {}

動作確認

以上で定義したステートマシンを実行します。

まず、1回目の実行では以下のようにChoiceでFail Stateが選ばれることなく、Waitに移り、5分待機します。

ステートCount running executionsの出力で実行数(count)が1になっていることがわかります。これは現在実行中のステートマシンに対応します。

1回目の実行でWaitが行われている状態で、このステートマシンをもう1度実行します。

すると、以下のように、実行数(count)が2になり、ChoiceでFail Stateに移り、ステートマシンが失敗します。 

これにより、多重実行を許容しないステートマシンが構築できました。

以上、本記事では、step functionsのSDKサービス統合を使って、多重実行を許容しないステートマシンを構築しました。

本記事が少しでも参考になれば幸いです。

コメント

タイトルとURLをコピーしました