こんにちは。ロジカル・アーツの井川です。
以前に Step Functions を使う機会があったので、整理も兼ねて少しまとめてみたいと思います。今回は三つの Lambda 関数を呼び出しながらループ処理を行う、簡単なステートマシンを作成してみたいと思います。
なお、この記事は大別すると、概念・機能の説明(前半部)とハンズオン(後半部)からなります。とりあえず手を動かしてみたいという方は、以下目次の「Lambda 関数を使ったループ処理」から読み進めるといいかと思います。
Step Functions とは
AWS の複数のサービスをサーバレスのワークフローに整理する、サーバレスオーケストレーションサービスです。 例えば、Step Functions を使って複数の Lambda 関数の呼び出しを一元的に行うことができます。 対象となるサービスには AWS Lambda の他に、AWS Fargate、Amazon SageMaker などがあります。詳しくは以下のリンクを参照下さい。
Step Functions でサポートされる AWS サービス統合 - AWS Step Functions
Step Functions の使い方
まず、Step Functions で構築するワークフローのことをステートマシンと言い、ワークフローの各ステップをステートと言います。 ステートマシンは Amazon ステートマシン言語というもので定義されます。この言語は JSON をベースにしており、基本的には JSON と書き方は大きく変わりません。
ステートマシンは次のような構文で定義されます。
{ "Comment": "サンプルステートマシン", "StartAt": "HelloWorld", "States": { "HelloWorld": { "Type": "Pass", "Result": "Hello World!", "End": true } }, "TimeoutSeconds": 99999999, "Version": 1.0 }
各フィールドについて簡単に説明します:
- Comment(オプション)
- ステートマシンの説明。
- StartAt(必須)
- 下記の States にあるいずれかのステートオブジェクトの名前と一致する文字列。大文字と小文字は区別される。
- States(必須)
- コンマで区切られた一連のステートからなる。ステートについては後述。
- TimeoutSeconds(オプション)
- ステートマシンを実行可能な最大秒数。
- Version(オプション)
- Amazon ステートメント言語のバージョンで、デフォルトは 1.0
ステートマシンの実行は、States
フィールドにある一連のステートに対して行われますが、StartAt
フィールドで参照されているステートから開始します(上記の例で言えば、"HelloWorld"
ステートのこと)。このステートが "Type": "Succeed"
、"Type": "Fail"
、または "End": true
を含めば、ステートマシンは実行を終了します。それ以外の場合は、(例にはありませんが)Next
フィールドで参照されているステートで続行されます。実行は、"Type": "Succeed"
、"Type": "Fail"
、または "End": true
を含むステートに達するまで行われます。そうでなければ、ランタイムエラーが発生します。
次にステートについて見ていきましょう。
ステート(状態)
ステートは受け取った入力に対して特定の処理を行い、その結果を別のステートに渡すことができます。各ステートは、ステートの実行の種類を表す Type
(タイプ)フィールドを必ず持ちます。またステートには、Next
フィールドもしくは End
フィールドも必要です(Type
が Succeed
または Fail
の場合を除く)。
各タイプは上で述べた共通のフィールド以外に、タイプごとに異なるいくつかのフィールドを持ちます。今回は、この記事で使用するタイプとそのフィールドについてだけ説明を加えようと思います。どのようなタイプがあるか、またそれぞれの詳細については以下のリンクから参照できます。
この記事で使用しているステートのタイプは次の五種類になります。
Type: Pass
入力を出力に渡します。
Pass
ステートの持つフィールドには次のものがあります。
- Parameters(オプション)
- 下記の「入出力処理」の項を参照。
Type: Task
Resource
フィールドで指定された AWS リソースに対して作業が実行されます。
例えば、Lambda 関数を呼び出すことができます。
Task
ステートの持つフィールドには次のものがあります。
- Resource(必須)
- ARN(Amazon リソースネーム)。
- Parameters(オプション)
- 下記の「入出力処理」の項を参照。
- ResultPath(オプション)
- 下記の「入出力処理」の項を参照。
Type: Choice
分岐ロジックをステートマシンに追加します。
Choice
ステートの持つフィールドには次のものがあります。
- Choices(必須)
次に移行するステートを決定する一連のルールからなる配列。
この配列の要素は Choice ルールと呼ばれます。Choice ルールは比較(比較する入力変数、比較のタイプ、比較される値)とNext
フィールドを持ちます。各 Choice ルールはChoices
フィールドにリストされた順序で検討され、最初に合致した Choice ルールのNext
フィールドで指定したステートに移行します。- Default(オプション)
Choices
で移行するステートが決定されなかった場合に移行するステート名。
Type: Succeed
ステートマシンの実行を正常に終了します。
Succeed
ステートは終了状態のため、 End
フィールドも Next
フィールドも持ちません。
Type: Fail
ステートマシンの実行を停止し、失敗として記録します。
Fail
ステートの持つフィールドには次のものがあります。
- Cause(オプション)
- エラーの原因の説明。
- Error(オプション)
- エラー処理や、エラー原因の判別に用いるエラー名。
入出力処理
ステートの入出力は、上記の Parameters
フィールドや ResultPath
フィールドなどを使用して、フィルタリング及び制御できます。どういった流れでフィルタ及び制御がされるかは、公式ドキュメントの図が非常に分かり易いので、引用します。
残念ながら、すべてを説明すると内容が膨大になってしまうので、ここでは上で言及した Parameters
と ResultPath
のみを扱い、かつこの記事で使用している方法に限って説明します。詳細は、上図の引用元を参照下さい。
- Parameters
- ステートの入力のフィルタリングまたは静的な値を追加できます。入力の値を使用する場合、その JSON キーの末尾に
.$
が必要です。値は$.field_name
で参照します。静的な値を追加する場合は、単にキーと値のペアを追記します。
例)入力に含まれる message キーの値を使用したい場合
"message.$": "$.message"
- ResultPath
- ステートの入力に
Task
の結果を含めることが出来ます。"ResultPath": "$.field_name"
とすると、入力の JSON にキーがfield_name
、値がTask
の結果のペアが追加されます。
あとは実際にステートマシンを作成しながら、詳しく見ていこうと思います。
Lambda 関数を使ったループ処理
次の単純なアルゴリズムを、Step Functions と Lambda を使って実装してみたいと思います。この記事では、Lambda 関数のランタイムはすべて Python とします。
- 入力は 1 以上の整数 n
- 入力 n が 偶数ときは n / 2 を、奇数のときは 3n + 1 を出力する
- 2 番の出力を新たな入力とし、上記を繰り返す
- もし出力が 1 になれば、このアルゴリズムは停止する
このアルゴリズム自体はわざわざ Step Functions で実装するまでもないですが、 使い方を知るにはシンプルでいいかと思い、選びました。
さらに次のルールも追加しておきましょう。
- ループ回数が 15 に達したときアルゴリズムを停止する
この 15 という数には深い意味はありませんが、Step Functions は月 4000 回の状態遷移が無料で行えます。 そのため、あまり状態遷移をしないように制限を設けています。
さて、このアルゴリズムをステートマシンとして実装します。ステートマシンの名前は、このアルゴリズムにちなんで Collatz と名付けることにします*1。
以下にこの Collatz の各ステートの概要と定義コードを載せてあります。出来上がるワークフローは次の図のようになります。点線で囲まれた枠にある文字列が各ステート(の名前)を表し、矢印がステートの移行先を表しています。
実装ステート一覧
- InitialValue
-
- 概要:
-
一番最初に実行されるステートです。ステートマシンに対する入力が、このステートの入力になります。入力は
{"number": n}
の形とします。Parameters
フィールドを使用してloop_count
フィールドを追加します。値は 0 とします。IsPositive
ステートに移行します。 - Type:
Pass
- Next:
IsPositive
- Amazon ステート言語:
-
"InitialValue": { "Type": "Pass", "Parameters": { "number.$": "$.number", "loop_count": 0 }, "Next": "IsPositive" }
- IsPositive
-
- 概要:
-
次の形の入力を持ちます:
{"number": n, "loop_count": 0}
number
の値 n が正の整数でないならInvalid
ステートに移行し、 そうでなければModulo-2
ステートに移行します。 - Type:
Choice
- Next:
Invalid
またはModulo-2
- Amazon ステート言語:
-
"IsPositive": { "Type": "Choice", "Choices": [ { "Variable": "$.number", "NumericLessThanEquals": 0, "Next": "Invalid" } ], "Default": "Modulo-2" }
- Modulo-2
-
- 概要:
-
次の形の入力を持ちます:
{"number": n, "loop_count": m}
Lambda 関数Mod-2
を呼び出します。 この Lambda 関数はnumber
の値 n が偶数であればTrue
を、奇数であればFalse
を返します。
さらにこの出力を、ResultPath
を用いてis_even
フィールドの値とします。
IsEven
ステートに移行します。 - Type:
Task
- Next:
IsEven
- Amazon ステート言語:
-
"Modulo-2": { "Type": "Task", "Resource": "{Mod-2 の ARN}", "ResultPath": "$.is_even", "Next": "IsEven" }
- IsEven
-
- 概要:
-
次の形の入力を持ちます:
{"number": n, "loop_count": m, "is_even": true (or false)}
is_even
フィールドの値がtrue
であればEvenCase
ステートに移行し、そうでなければOddCase
ステートに移行します。 - Type:
Choice
- Next:
EvenCase
またはOddCase
- Amazon ステート言語:
-
"IsEven": { "Type": "Choice", "Choices": [ { "Variable": "$.is_even", "BooleanEquals": true, "Next": "EvenCase" } ], "Default": "OddCase" }
- EvenCase
-
- 概要:
-
次の形の入力を持ちます:
{"number": n, "loop_count": m, "is_even": true (or false)}
Parameters
フィールドを使用してnumber
とloop_count
フィールドだけ残します。
Lambda 関数DivBy2
を呼び出します。この Lambda 関数は{"number": n / 2, "loop_count": m + 1}
を返します。
IsOne
ステートに移行します。 - Type:
Task
- Next:
IsOne
- Amazon ステート言語:
-
"EvenCase": { "Type": "Task", "Resource": "{DivBy2 の ARN}", "Parameters": { "number.$": "$.number", "loop_count.$": "$.loop_count" }, "Next": "IsOne" }
- OddCase
-
- 概要:
-
次の形の入力を持ちます:
{"number": n, "loop_count": m, "is_even": true (or false)}
Parameters
フィールドを使用してnumber
とloop_count
フィールドだけ残します。
Lambda 関数MulBy3-Add1
を呼び出します。この Lambda 関数は{"number": 3 * n + 1, "loop_count": m + 1}
を返します。
IsOne
ステートに移行します。 - Type:
Task
- Next:
IsOne
- Amazon ステート言語:
-
"OddCase": { "Type": "Task", "Resource": "{MulBy3-Add1 の ARN}", "Parameters": { "number.$": "$.number", "loop_count.$": "$.loop_count" }, "Next": "IsOne" }
- IsOne
-
- 概要:
-
次の形の入力を持ちます:
{"number": n, "loop_count": m}
number
の値 n が 1 であればCompleted
ステートに移行し、loop_count
の値 m が 15 に達したときReached
ステートに移行します。そうでなければ、Modulo-2
ステートに移行します。 - Type:
Choice
- Next:
Completed
またはReached
またはModulo-2
- Amazon ステート言語:
-
"IsOne": { "Type": "Choice", "Choices": [ { "Variable": "$.loop_count", "NumericGreaterThanEquals": 15, "Next": "Reached" }, { "Variable": "$.number", "NumericEquals": 1, "Next": "Completed" } ], "Default": "Modulo-2" }
- Completed
-
- 概要:
-
ステートマシンの実行を正常に終了します。
- Type:
Succeed
- Amazon ステート言語:
-
"Completed": { "Type": "Succeed" }
- Invalid
-
- 概要:
-
ステートマシンの実行を停止し、失敗として記録します。
- Type:
Fail
- Cause:初期値が正の整数ではありませんでした。
- Error:NotPositiveNumberError
- Amazon ステート言語:
-
"Invalid": { "Type": "Fail", "Cause": "初期値が正の整数ではありませんでした。", "Error": "NotPositiveNumberError" }
- Reached
-
- 概要:
-
ステートマシンの実行を停止し、失敗として記録します。
- Type:
Fail
- Cause:ループ回数の上限に達しました。
- Error:ReachingLoopLimitError
- Amazon ステート言語:
-
"Exceeded": { "Type": "Fail", "Cause": "ループ回数の上限に達しました。", "Error": "ReachingLoopLimitError" }
実装 Lambda 関数一覧
- Mod-2
-
- 概要:
-
入力にある
number
の値が偶数であればTrue
を、 奇数であればFalse
を返します。 - 実装コード:
-
def lambda_handler(event, context): return False if event['number'] % 2 else True
- DivBy2
-
- 概要:
-
入力にある
number
の値を 2 で割り、loop_count
に 1 加えて返します。 - 実装コード:
-
def lambda_handler(event, context): event['number'] //= 2 event['loop_count'] += 1 return event
- MulBy3-Add1
-
- 概要:
-
入力にある
number
の値を 3 倍して 1 を加え、loop_count
に 1 加えて返します。 - 実装コード:
-
def lambda_handler(event, context): event['number'] = 3 * event['number'] + 1 event['loop_count'] += 1 return event
上記をもとに、いよいよ作成していきたいと思います。作成はすべてコンソール画面で行います。
Lambda 関数の作成
Step Functions から Lambda 関数を呼び出すときに、その ARN が必要になります。もちろんステートマシンを先に定義しておき、あとから Lambda 関数を作成してその ARN を書く、ということもできますが、今回はあらかじめ Lambda 関数を作成しておきましょう。
作成手順はほとんど同じなので、Mod-2
のみを画像付きで説明します。
Lambda のコンソール画面に行き、「関数の作成」をクリックします。
次の画面に移行したら、「一から作成」を選択し、「関数名」は「Mod-2」、ランタイムは「Python 3.8」を選択し、「関数の作成」をクリックします。
画面が遷移したら、lambda_function.py
のコードを次のコードに丸々置き換えます。
def lambda_handler(event, context): return False if event['number'] % 2 else True
これで完了です。
残りの関数も同様に作成していきます。異なるのは関数名と、コードだけです。
関数名:DivBy2
実装コード:
def lambda_handler(event, context): event['number'] //= 2 event['loop_count'] += 1 return event
関数名:MulBy3-Add1
実装コード:
def lambda_handler(event, context): event['number'] = 3 * event['number'] + 1 event['loop_count'] += 1 return event
ステートマシンの作成
今度は Step Functions のコンソール画面に行き、「ステートマシンの作成」をクリックします。
「コードスニペットで作成」を選択し、タイプは「標準」を選択します。
下にスクロールし、左側の赤枠で囲ったところに下にあるコードを貼り付けます。ARN のところはご自身のものに置き換えて下さい。右側のリロードボタンをクリックすれば、状態遷移の様子が図に反映されます。画像のようになれば、「次へ」をクリックします。
{ "Comment": "Collatz 予想におけるアルゴリズムで計算を行うステートマシン。", "StartAt": "InitialValue", "States": { "InitialValue": { "Type": "Pass", "Parameters": { "number.$": "$.number", "loop_count": 0 }, "Next": "IsPositive" }, "IsPositive": { "Type": "Choice", "Choices": [ { "Variable": "$.number", "NumericLessThanEquals": 0, "Next": "Invalid" } ], "Default": "Modulo-2" }, "Modulo-2": { "Type": "Task", "Resource": "{Mod-2 の ARN}", "ResultPath": "$.is_even", "Next": "IsEven" }, "IsEven": { "Type": "Choice", "Choices": [ { "Variable": "$.is_even", "BooleanEquals": true, "Next": "EvenCase" } ], "Default": "OddCase" }, "EvenCase": { "Type": "Task", "Resource": "{DivBy2 の ARN}", "Parameters": { "number.$": "$.number", "loop_count.$": "$.loop_count" }, "Next": "IsOne" }, "OddCase": { "Type": "Task", "Resource": "{MulBy3-Add1 の ARN}", "Parameters": { "number.$": "$.number", "loop_count.$": "$.loop_count" }, "Next": "IsOne" }, "IsOne": { "Type": "Choice", "Choices": [ { "Variable": "$.loop_count", "NumericGreaterThanEquals": 15, "Next": "Reached" }, { "Variable": "$.number", "NumericEquals": 1, "Next": "Completed" } ], "Default": "Modulo-2" }, "Completed": { "Type": "Succeed" }, "Invalid": { "Type": "Fail", "Cause": "初期値が正の整数ではありませんでした。", "Error": "NotPositiveNumberError" }, "Reached": { "Type": "Fail", "Cause": "ループ回数の上限に達しました。", "Error": "ReachingLoopLimitError" } } }
「名前」には 「Collatz」 と入力します。他のところはそのままにして、 画面下部の「ステートマシンの作成」 をクリックします。
作成できたら遊んでみましょう。
「実行の開始」をクリックすると、二枚目の画像の画面が出てきます。入力を {"number": 23}
のような形にして、「実行の開始」をクリックすればステートマシンが実行されます。
遷移した画面でしばらく待つと結果が確認できます。グラフ(ビジュアルワークフロー)にあるステートの箇所をクリックすると、そのステートの実行結果が確認できます。下の画像は Reached
ステートをクリックしたものです。入力のところを見てみると number
の値が 1、loop_count
の値が 15 になっています。
このことから、 IsOne
ステートで先に書いた Choice ルールが適用されていることが確認できます。
今度は入力を {"number": 48}
として実行してみましょう。
結果は、ループ回数 11 回で無事に終了しました。
よろしければ数値をいろいろ変えて遊んでみて下さい。
まとめ
いかがでしたでしょうか。今回は Lambda 関数のみを扱いましたが、他にもいろいろなサービスを統合して、一連のワークフローを定義することができ、非常に便利だと思います。この記事を通して、できるだけ多くの使い方に触れられるように書いたつもりではありますが、今回扱わなかったこともまだまだあるので、機会があればまた記事にしたいと思います。
もしお役に立てれば幸いです。