Glue DataBrewでデータを可視化して分析するProjectと機械学習の前処理を行うJobをCDKで作成する

awsmachinelearningetl

Glue DataBrewは、データを可視化してパラメータ間の相関を見たり、カテゴリー変数のエンコードや、欠損値や外れ値を置換する処理をコードなしで実行できるマネージドサービス。KaggleのHouse Prices Competitonの学習データで試してみる。全体のコードはGitHubにある。

KaggleのHouse Prices CompetitionのKernelからデータの探り方を学ぶ - sambaiz-net

料金は30分のセッションごとに$1と、Jobのノード時間あたり$0.48かかる。 ノードには通常のGlueのJobのDPUと同じく4vCPUと16GBのメモリが含まれ、時間あたりのコストの差は$0.04とそれほど大きくない。 使い分けとしては、非エンジニアが使う場合はGUIでデータを加工できるDataBrewを、独自の変換やカスタムコネクターを要する処理は通常のJobで行うと良いとのこと

GlueのカスタムコネクタでBigQueryに接続する - sambaiz-net

Datasetの作成

DatasetのソースとしてData CatalogのほかにRedshiftやRDS、S3に直接接続することもできる。

CDKでGlue Data CatalogのDatabase,Table,Partition,Crawlerを作成する - sambaiz-net

ソース

今回はS3にファイルを上げてそれを参照する。

createDataBucket() {
  const bucket = new s3.Bucket(this, 'DataBucket', {
    bucketName: `databrew-sample-${this.account}-${this.region}`,
    removalPolicy: cdk.RemovalPolicy.DESTROY
  })
  const deployData = new s3deploy.BucketDeployment(this, 'DeploySource', {
    sources: [s3deploy.Source.asset('./data')],
    destinationBucket: bucket,
    destinationKeyPrefix: "src/"
  })
  return {bucket, deployData}
}

createDataset(bucket: s3.IBucket) {
  return new databrew.CfnDataset(this, 'Dataset', {
    name: "databrew-sample-train-dataset",
    input: {
      s3InputDefinition: {
        bucket: bucket.bucketName,
        key: "src/<[^/]+>.csv"
      }
    },
    format: "CSV",
  })
}

Projectの作成

Datasetを可視化するProjectを作成する。RecipeのARNが必須なので一旦空のstepで作って渡している。 Policyはコンソール上で新規作成したときにできるものを参考にした。

Projectの作成

createRecipe() {
  return new databrew.CfnRecipe(this, 'Recipe', {
    name: "databrew-sample-recipe",
    steps: []
  })
}

createProject(bucket: s3.IBucket, deployData: s3deploy.BucketDeployment, datasetName: string, recipeName: string) {
  const role = new iam.Role(this, 'ProjectRole', {
    assumedBy: new iam.ServicePrincipal("databrew.amazonaws.com"),
    inlinePolicies: {
      "project": iam.PolicyDocument.fromJson({
        "Version": "2012-10-17",
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
              bucket.bucketArn,
              `${bucket.bucketArn}/*`
            ]
          },
          {
            "Effect": "Allow",
            "Action": [
              /*
              "glue:GetDatabases", 
              "glue:GetPartitions", 
              "glue:GetTable", 
              "glue:GetTables", 
              "glue:GetConnection",
              "lakeformation:GetDataAccess",
              */
              "ec2:DescribeVpcEndpoints",
              "ec2:DescribeRouteTables",
              "ec2:DescribeNetworkInterfaces",
              "ec2:DescribeSecurityGroups",
              "ec2:DescribeSubnets",
              "ec2:DescribeVpcAttribute",
              "ec2:CreateNetworkInterface"
            ],
            "Resource": [
              "*"
            ]
          },
          {
              "Effect": "Allow",
              "Action": "ec2:DeleteNetworkInterface",
              "Condition": {
                  "StringLike": {
                      "aws:ResourceTag/aws-glue-service-resource": "*"
                  }
              },
              "Resource": [
                  "*"
              ]
          },
          {
            "Effect": "Allow",
            "Action": [
              "ec2:CreateTags",
              "ec2:DeleteTags"
            ],
            "Condition": {
              "ForAllValues:StringEquals": {
                "aws:TagKeys": [
                  "aws-glue-service-resource"
                ]
              }
            },
            "Resource": [
              "arn:aws:ec2:*:*:network-interface/*",
              "arn:aws:ec2:*:*:security-group/*"
            ]
          },
          {
            "Effect": "Allow",
            "Action": [
              "logs:CreateLogGroup",
              "logs:CreateLogStream",
              "logs:PutLogEvents"
            ],
            "Resource": [
              "arn:aws:logs:*:*:log-group:/aws-glue-databrew/*"
            ]
          },
          {
            "Effect": "Allow",
            "Action": [
              "logs:CreateLogGroup",
              "logs:CreateLogStream",
              "logs:PutLogEvents"
            ],
            "Resource": [
              "arn:aws:logs:*:*:log-group:/aws-glue-databrew/*"
            ]
          }
        ]
      })
    }
  })
  
  const project = new databrew.CfnProject(this, 'Project', {
    name: "databrew-sample-project",
    datasetName: datasetName,
    sample: {
      type: "FIRST_N",
      size: 500
    },
    recipeName: recipeName,
    roleArn: role.roleArn
  })
  project.node.addDependency(deployData)
  return project
}

コンソール上からProjectを開きセッションの開始まで1分ほど待つと、次のような画面が表示され、データの内容と共に分布や統計値を見ることができる。

Project画面

Profileでの分析

データをプロファイリングするJobを作成する。

Profile Jobの作成

createJobRole(bucket: s3.IBucket) {
  const role = new iam.Role(this, 'ProfileJobRole', {
    assumedBy: new iam.ServicePrincipal("databrew.amazonaws.com"),
    inlinePolicies: {
      "project": iam.PolicyDocument.fromJson({
        "Version": "2012-10-17",
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "s3:GetObject",
              "s3:PutObject",
              "s3:ListBucket",
              "s3:DeleteObject"
            ],
            "Resource": [
              bucket.bucketArn,
              `${bucket.bucketArn}/*`
            ]
          },
          {
            "Effect": "Allow",
            "Action": [
              "s3:PutObjectAcl"
            ],
            "Resource": [
              `${bucket.bucketArn}/*`
            ],
            "Condition": {
              "StringEquals": {
                "s3:x-amz-acl": "bucket-owner-full-control"
              }
            }
          }
        ]
      })
    }
  })
}

createProfileJob(datasetName: string, projectName: string, bucket: s3.IBucket, role: iam.IRole) {
  return new databrew.CfnJob(this, 'ProfileJob', {
    name: "databrew-sample-profile-job",
    type: "PROFILE",
    jobSample: {
      mode: "FULL_DATASET"
    },
    datasetName: datasetName,
    outputLocation: {
      bucket: bucket.bucketName,
      key: "profile/"
    },
    roleArn: role.roleArn
  })
}

コンソール上からJobを実行し完了するとパラメータ間の相関や値の分布などが表示された。

Profile

ただ予測対象であるSalePriceとの相関が出ない。 S3に出力された元ファイルを見ても他のパラメータには存在するcorrelationsがないので、UI上の問題ではない。 パラメータを減らしたら出るようになったので、数に制約がありそうだ。

{
  "name": "OverallCond",
  "type": "int",
  "correlations": {
    "Id": 0.012608924775955575,
    "MSSubClass": -0.059315817084104695,
    "LotArea": -0.005636270274099278,
    "OverallQual": -0.09193234263907189,
    "OverallCond": 1.0,
    "YearBuilt": -0.37598319560698945,
    "YearRemodAdd": 0.07374149814528905,
    "BsmtFinSF1": -0.04623085591822496,
    "BsmtFinSF2": 0.040229169899767575,
    "BsmtUnfSF": -0.13684056989352236
  },
  ...
}

Recipeの作成

Projectでカテゴリーの文字列を数値にマッピングしてみる。

文字列と数値へのマッピング

適用するとRecipeに追加されるのでJSON形式でダウンロードしてコピーすればCDKに反映できる。

JSON形式でダウンロード

createRecipe() {
  return new databrew.CfnRecipe(this, 'Recipe', {
    name: "databrew-sample-recipe",
    steps: [{
      "action": {
        "operation": "CATEGORICAL_MAPPING",
        "parameters": {
          "categoryMap": "{\"RL\":\"1\",\"RM\":\"2\",\"FV\":\"3\",\"C (all)\":\"4\",\"RH\":\"5\"}",
          "deleteOtherRows": "false",
          "mapType": "NUMERIC",
          "mappingOption": "TOP_X_VALUES",
          "other": "6",
          "sourceColumn": "MSZoning",
          "targetColumn": "MSZoning_map"
        }
      }
    }]
  })
}

あとはこれを実行するJobを作成し、手動や定期実行などする。

createRecipeJob(projectName: string, bucket: s3.IBucket, role: iam.IRole) {
  return new databrew.CfnJob(this, 'RecipeJob', {
    name: "databrew-sample-recipe-job",
    type: "RECIPE",
    projectName: projectName,
    outputs: [{
      compressionFormat: "GZIP",
      format: "CSV",
      location: {
        bucket: bucket.bucketName,
        key: "dest/"
      }
    }],
    roleArn: role.roleArn
  })
}