GCP の Dataflow はデータ処理のパイプラインを構築する OSS Apache Beam による、ストリーミングおよびバッチ ETL のマネージドサービス。PubSub から BigQuery にデータを流したり、機械学習を行ったりすることができる。料金はジョブに実行したリソースやシャッフルしたデータ量に対してかかる。
Google が提供するテンプレートを利用することで基本的なデータの移行はノーコードで行うことができる。
サンプルの Word Count を実行してみる。
結果は Cloud Storage に出力された。
accountability: 1
deserves: 1
tiring: 1
hardworking: 3
...
コードを見ていく。
まず @Template でメタデータを設定している。
@Template(
name = "Word_Count",
category = TemplateCategory.GET_STARTED,
displayName = "Word Count",
description =
"Batch pipeline. Reads text from Cloud Storage, tokenizes text lines into individual words,"
+ " and performs frequency count on each of the words.",
optionsClass = WordCountOptions.class,
contactInformation = "https://cloud.google.com/support")
public interface WordCountOptions extends PipelineOptions {
@TemplateParameter.GcsReadFile(
order = 1,
description = "Input file(s) in Cloud Storage",
helpText =
"The input file pattern Dataflow reads from. Use the example file "
+ "(gs://dataflow-samples/shakespeare/kinglear.txt) or enter the path to your own "
+ "using the same format: gs://your-bucket/your-file.txt")
ValueProvider<String> getInputFile();
void setInputFile(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 2,
description = "Output Cloud Storage file prefix",
helpText = "Path and filename prefix for writing output files. Ex: gs://your-bucket/counts")
ValueProvider<String> getOutput();
void setOutput(ValueProvider<String> value);
}
main ではこの Option を受け取り Beam で パイプラインを構築している。
public static void main(String[] args) {
WordCountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run();
}
CountWords は PCollection を変換する PTransform を継承するクラスで、word ごとにカウントした結果を返している。
public class WordCount {
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement());
return wordCounts;
}
}
ParDo によって word に分割する処理を記述した DoFn が並列に実行される。
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
@ProcessElement
public void processElement(ProcessContext c) {
// Check if the line is empty.
if (c.element().trim().isEmpty()) {
emptyLines.inc();
return;
}
// Split the line into words.
String[] words = c.element().split("[^a-zA-Z']+");
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
これをカスタムテンプレートとして動かすにはテンプレートをビルドする必要がある。