CDKでStep Functionsによるワークフローを構築する

aws

Step FunctionsはLambdaやGlueのJobなどからなるワークフローを構築できるサービス。今回はCDKで構築する。 全体のコードはGitHubにある。

taskを next() で繋げて StateMachine に渡すとワークフローができる。 Parallel() で複数のtaskを並列に実行でき、そのoutputは配列となる。 Choice() で条件分岐ができて、otherwise() を指定しないとエラーになる。

Objectの中でinputやContext Objectの値を参照する場合、キーの名前を.$で終える必要があり、 fromJsonPathAt() を用いるとキーに.$が追加される

import * as cdk from '@aws-cdk/core';
import * as sfn from '@aws-cdk/aws-stepfunctions';
import * as tasks from '@aws-cdk/aws-stepfunctions-tasks';
import { Function, Code, Runtime } from '@aws-cdk/aws-lambda';
import * as path from 'path'

export class CdkStepfunctionsSampleStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const fetchScoreFunction = new Function(this, 'FetchScoreFunction', {
      runtime: Runtime.GO_1_X,
      code: Code.fromAsset(path.join(__dirname, 'lambda', 'fetchScore')),
      handler: "main"
    })

    const invokeFetchScoreFunction1 = new tasks.LambdaInvoke(this, 'Invoke FetchScoreFunction1', {
      lambdaFunction: fetchScoreFunction,
      payload: sfn.TaskInput.fromObject({
        'now': sfn.TaskInput.fromJsonPathAt("$.time").value,
      }),
      outputPath: '$.Payload',
      retryOnServiceExceptions: false,
    })

    const invokeFetchScoreFunction2 = new tasks.LambdaInvoke(this, 'Invoke FetchScoreFunction2', {
      lambdaFunction: fetchScoreFunction,
      outputPath: '$.Payload',
      retryOnServiceExceptions: false,
    })

    const fetchScores = new sfn.Parallel(this, 'Fetch Scores').branch(
      invokeFetchScoreFunction1,
      invokeFetchScoreFunction2
    )

    const makeSummaryFunction = new Function(this, 'MakeSummaryFunction', {
      runtime: Runtime.GO_1_X,
      code: Code.fromAsset(path.join(__dirname, 'lambda', 'makeSummary')),
      handler: "main"
    })

    const invokeMakeSummaryFunction = new tasks.LambdaInvoke(this, 'Invoke MakeSummaryFunction', {
      lambdaFunction: makeSummaryFunction,
      outputPath: '$.Payload',
      retryOnServiceExceptions: false,
    })

    const definition = fetchScores.next(
      invokeMakeSummaryFunction
    ).next(
      new sfn.Choice(this, 'Check total is over 14')
        .when(sfn.Condition.numberGreaterThan('$.retry', 5),
          new sfn.Fail(this, 'Job Failed', {
            cause: 'AWS Batch Job Failed',
            error: 'DescribeJob returned FAILED',
          })
        )
        .when(sfn.Condition.numberLessThanEquals('$.total', 14), fetchScores)
        .otherwise(new sfn.Succeed(this, 'Job Succeeded'))
    )
  
    new sfn.StateMachine(this, 'StateMachine', {
      definition,
      timeout: cdk.Duration.minutes(5)
    });
  }
}

ただし配列の中で fromJsonPathAt() を使うと Cannot use JsonPath fields in an array エラーになってしまう。 実体はLambda関数である EvaluateExpression を挟んでinputを配列に加工すれば sfn.JsonPath.listAt で参照することができる。

new tasks.EvaluateExpression(this, name, {
  expression: `['--arg_1', $$.Execution.StartTime]`,
  resultPath: '$.emrArgs'
}).next(
    new EmrAddStep(
        stack,
        name,
        {
            name: name,
            jar: jar,
            args: sfn.JsonPath.listAt('$.emrArgs'),
            clusterId: clusterId,
        }
    )
)

実行すると次のようにグラフや実行結果を見ることができるので、どこでどのような入力によって失敗しているかが一目で分かる。

実行結果

参考

amazon web services - Using JsonPath Step Functions variable inside an array using CDK - Stack Overflow