Read and run WordCount Sample on Dataflow, managed ETL service using Apache Beam

gcpetl

GCP’s Dataflow is a managed service for streaming and batch ETL using Apache Beam, an open-source software for building data processing pipelines. It can stream data from PubSub to BigQuery and perform machine learning etc. Pricing is based on the resources used for job execution and the amount of data shuffled.

Basic data migration can be performed without coding by using templates provided by Google.

Let’s run the sample Word Count.

The results were output to Cloud Storage.

accountability: 1
deserves: 1
tiring: 1
hardworking: 3
...

Let’s take a look at the code.

First, metadata is set using @Template.

@Template(
    name = "Word_Count",
    category = TemplateCategory.GET_STARTED,
    displayName = "Word Count",
    description =
        "Batch pipeline. Reads text from Cloud Storage, tokenizes text lines into individual words,"
            + " and performs frequency count on each of the words.",
    optionsClass = WordCountOptions.class,
    contactInformation = "https://cloud.google.com/support")

public interface WordCountOptions extends PipelineOptions {
  @TemplateParameter.GcsReadFile(
      order = 1,
      description = "Input file(s) in Cloud Storage",
      helpText =
          "The input file pattern Dataflow reads from. Use the example file "
              + "(gs://dataflow-samples/shakespeare/kinglear.txt) or enter the path to your own "
              + "using the same format: gs://your-bucket/your-file.txt")
  ValueProvider<String> getInputFile();

  void setInputFile(ValueProvider<String> value);

  @TemplateParameter.GcsWriteFolder(
      order = 2,
      description = "Output Cloud Storage file prefix",
      helpText = "Path and filename prefix for writing output files. Ex: gs://your-bucket/counts")
  ValueProvider<String> getOutput();

  void setOutput(ValueProvider<String> value);
}

In the main method, this Option is received and a pipeline is constructed using Beam.

public static void main(String[] args) {
  WordCountOptions options =
      PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
  Pipeline p = Pipeline.create(options);
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
      .apply("WriteCounts", TextIO.write().to(options.getOutput()));

  p.run();
}

CountWords is a class that extends PTransform, which transforms PCollection, and returns the result of counting for each word.

public class WordCount {
public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

The DoFn describing the process of splitting into words is executed in parallel by ParDo.

static class ExtractWordsFn extends DoFn<String, String> {
  private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");

  @ProcessElement
  public void processElement(ProcessContext c) {
    // Check if the line is empty.
    if (c.element().trim().isEmpty()) {
      emptyLines.inc();
      return;
    }

    // Split the line into words.
    String[] words = c.element().split("[^a-zA-Z']+");

    // Output each word encountered into the output PCollection.
    for (String word : words) {
      if (!word.isEmpty()) {
        c.output(word);
      }
    }
  }
}

You need to build the template to run this as a custom template.