Apache Beam による ETL のマネージドサービス Dataflow のサンプルコード Word Count を読んで実行する

gcpetl

GCP の Dataflow はデータ処理のパイプラインを構築する OSS Apache Beam による、ストリーミングおよびバッチ ETL のマネージドサービス。PubSub から BigQuery にデータを流したり、機械学習を行ったりすることができる。料金はジョブに実行したリソースやシャッフルしたデータ量に対してかかる。

Google が提供するテンプレートを利用することで基本的なデータの移行はノーコードで行うことができる。

サンプルの Word Count を実行してみる。

結果は Cloud Storage に出力された。

accountability: 1
deserves: 1
tiring: 1
hardworking: 3
...

コードを見ていく。

まず @Template でメタデータを設定している。

@Template(
    name = "Word_Count",
    category = TemplateCategory.GET_STARTED,
    displayName = "Word Count",
    description =
        "Batch pipeline. Reads text from Cloud Storage, tokenizes text lines into individual words,"
            + " and performs frequency count on each of the words.",
    optionsClass = WordCountOptions.class,
    contactInformation = "https://cloud.google.com/support")

public interface WordCountOptions extends PipelineOptions {
  @TemplateParameter.GcsReadFile(
      order = 1,
      description = "Input file(s) in Cloud Storage",
      helpText =
          "The input file pattern Dataflow reads from. Use the example file "
              + "(gs://dataflow-samples/shakespeare/kinglear.txt) or enter the path to your own "
              + "using the same format: gs://your-bucket/your-file.txt")
  ValueProvider<String> getInputFile();

  void setInputFile(ValueProvider<String> value);

  @TemplateParameter.GcsWriteFolder(
      order = 2,
      description = "Output Cloud Storage file prefix",
      helpText = "Path and filename prefix for writing output files. Ex: gs://your-bucket/counts")
  ValueProvider<String> getOutput();

  void setOutput(ValueProvider<String> value);
}

main ではこの Option を受け取り Beam で パイプラインを構築している。

public static void main(String[] args) {
  WordCountOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
      .apply("WriteCounts", TextIO.write().to(options.getOutput()));

  p.run();
}

CountWords は PCollection を変換する PTransform を継承するクラスで、word ごとにカウントした結果を返している。

public class WordCount {
public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

ParDo によって word に分割する処理を記述した DoFn が並列に実行される。

static class ExtractWordsFn extends DoFn<String, String> {
  private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");

  @ProcessElement
  public void processElement(ProcessContext c) {
    // Check if the line is empty.
    if (c.element().trim().isEmpty()) {
      emptyLines.inc();
      return;
    }

    // Split the line into words.
    String[] words = c.element().split("[^a-zA-Z']+");

    // Output each word encountered into the output PCollection.
    for (String word : words) {
      if (!word.isEmpty()) {
        c.output(word);
      }
    }
  }
}

これをカスタムテンプレートとして動かすにはテンプレートをビルドする必要がある。