Pipeline Examples

Pipeline Examples

ExtractAndSumScore class definition

public static class ExtractAndSumScore
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

  private final String field;

  ExtractAndSumScore(String field) {
    this.field = field;
  }

  @Override
  public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> gameInfo) {

    return gameInfo
        .apply(
            MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
                .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())))
        .apply(Sum.integersPerKey());
  }
}

Basic ETL pipeline no windowing nothing

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  // Read events from a text file and parse them.
  pipeline
      .apply(TextIO.read().from(options.getInput()))
      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
      // Extract and sum username/score pairs from the event data.
      .apply("ExtractUserScore", new ExtractAndSumScore("user"))
      .apply(
          "WriteUserScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), false));

  // Run the batch pipeline.
  pipeline.run().waitUntilFinish();
}

Running pipelines with FIXED - WINDOW of 1 hour (Game example)

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
  final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));

  // Read 'gaming' events from a text file.
  pipeline
      .apply(TextIO.read().from(options.getInput()))
      // Parse the incoming data.
      .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))

      // Filter out data before and after the given times so that it is not included
      // in the calculations. As we collect data in batches (say, by day), the batch for the day
      // that we want to analyze could potentially include some late-arriving data from the
      // previous day.
      // If so, we want to weed it out. Similarly, if we include data from the following day
      // (to scoop up late-arriving events from the day we're analyzing), we need to weed out
      // events that fall after the time period we want to analyze.
      // [START DocInclude_HTSFilters]
      .apply(
          "FilterStartTime",
          Filter.by(
              (GameActionInfo gInfo) -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
      .apply(
          "FilterEndTime",
          Filter.by(
              (GameActionInfo gInfo) -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
      // [END DocInclude_HTSFilters]

      // [START DocInclude_HTSAddTsAndWindow]
      // Add an element timestamp based on the event log, and apply fixed windowing.
      .apply(
          "AddEventTimestamps",
          WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
      .apply(
          "FixedWindowsTeam",
          Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
      // [END DocInclude_HTSAddTsAndWindow]

      // Extract and sum teamname/score pairs from the event data.
      .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
      .apply(
          "WriteTeamScoreSums", new WriteToText<>(options.getOutput(), configureOutput(), true));

  pipeline.run().waitUntilFinish();
}

Implementing the Window for real time using processing time

/**
 * Extract user/score pairs from the event stream using processing time, via global windowing. Get
 * periodic updates on all users' running scores.
 */
@VisibleForTesting
static class CalculateUserScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration allowedLateness;

  CalculateUserScores(Duration allowedLateness) {
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> input) {
    return input
        .apply(
            "LeaderboardUserGlobalWindow",
            Window.<GameActionInfo>into(new GlobalWindows())
                // Get periodic results every ten minutes.
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_MINUTES)))
                .accumulatingFiredPanes()
                .withAllowedLateness(allowedLateness))
        // Extract and sum username/score pairs from the event data.
        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
  }
}

Implementation of Window for real time using event time


// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration teamWindowDuration;
  private final Duration allowedLateness;

  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
    this.teamWindowDuration = teamWindowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> expand(PCollection<GameActionInfo> infos) {
    return infos
        .apply(
            "LeaderboardTeamFixedWindows",
            Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
                // We will get early (speculative) results as well as cumulative
                // processing of late data.
                .triggering(
                    AfterWatermark.pastEndOfWindow()
                        .withEarlyFirings(
                            AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(FIVE_MINUTES))
                        .withLateFirings(
                            AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(TEN_MINUTES)))
                .withAllowedLateness(allowedLateness)
                .accumulatingFiredPanes())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
  }
}

Implementation of stateful ParDo

/**
 * Implements a streaming merge using stateful ParDo. Waits for two elements to arrive with the same key,
 * storing the first in MapState until the second arrives, then merges the two and outputs the merged element,
 * and finally clears the keyed state. This technique avoid double writes to the output database, reducing cost
 * on some target systems (e.g. RDS) and improving performance during backfill operations (where writes to the db
 * tend to bottleneck the pipeline).
 *
 * @param <K> type of key
 * @param <V> type of value
 */
public class StreamingMerge<K, V extends Message>
    extends DoFn<KV<K, V>, KV<K, V>> {

  @StateId("index")
  private final StateSpec<MapState<K, V>> indexSpec;
  private final SerializableBiFunction<V, V, V> mergeFunction;
  private final Maybe<SerializableFunction<V, Boolean>> dataCompletenessFunction;

  /**
   * This function is used to merge multiple updates coming from 1 or more streams into a single entry
   * that gets emitted when data is complete. User would define what does data completion means. State gets
   * retained in the map even after emitting a record downstream. This is really helpful in cases where
   * you want to avoid sending partial records downstream. Eg SNS notifications.
   *
   * @param keyCoder specify a coder for your key
   * @param valueCoder specify a coder for the value
   * @param mergeFunction function representing how to merge 2 records together
   * @param dataCompletenessFunction function representing data completeness
   * @param <K> key type
   * @param <V> value type
   * @return an object of StreamingMerge
   */
  public static <K, V extends Message> StreamingMerge<K, V> of(
          Coder<K> keyCoder,
          Coder<V> valueCoder,
          SerializableBiFunction<V, V, V> mergeFunction,
          Maybe<SerializableFunction<V, Boolean>> dataCompletenessFunction) {
    return new StreamingMerge<>(
        keyCoder, valueCoder, mergeFunction, dataCompletenessFunction);
  }

  /**
   * This function is used to merge exactly 2 changes to a key. If the key does not exist in the map,
   * it creates an entry in the map and updates the map when the 2nd change arrives.
   * As soon as the 2nd change arrives, the updated value is emitted downstream and the value is removed
   * from the map.
   * It uses a default function to merge two different protos. It also provides a default completeness function
   * which just lets everything pass through.
   * If your pipelines relies on having more than 2 updates for a key, you should pass in a custom data
   * completeness function that would be used to figure out when to emit something downstream.
   *
   * @param keyCoder specify a coder for your key
   * @param valueCoder specify a coder for the value
   * @param <K> key type
   * @param <V> value type
   * @return an object of StreamingMerge
   */
  @SuppressWarnings("unchecked")
  public static <K, V extends Message> StreamingMerge<K,V> of(Coder<K> keyCoder, Coder<V> valueCoder) {
    return new StreamingMerge<>(
        keyCoder,
        valueCoder,
        (V left, V right) -> (V) left.toBuilder().mergeFrom(right).build(),
        Maybe.empty());
  }

  private StreamingMerge(
      Coder<K> keyCoder, Coder<V> valueCoder,
      SerializableBiFunction<V, V, V> mergeFunction,
      Maybe<SerializableFunction<V, Boolean>> dataCompletenessFunction) {
    this.indexSpec = StateSpecs.map(keyCoder, valueCoder);
    this.mergeFunction = mergeFunction;
    this.dataCompletenessFunction = dataCompletenessFunction;
  }

  /**
   * Gets called for each element.
   * @param context context
   * @param index mapIndex
   */
  @ProcessElement
  public void processElement(
      ProcessContext context,
      @StateId("index") MapState<K, V> index) {

    final K currentElementKey = context.element().getKey();
    final ReadableState<V> currentState = index.get(currentElementKey);
    final V valueFromState = currentState.read();
    final V currentElementValue = context.element().getValue();

    if (valueFromState != null) {
      final V currentElementValueCopy = currentElementValue;
      V updateValue = mergeFunction.apply(valueFromState, currentElementValueCopy);
      // We will only emit something downstream when the completeness function specified by the user returns true
      if (dataCompletenessFunction.isPresent()) {
        index.put(currentElementKey, updateValue);
        if (dataCompletenessFunction.get().apply(updateValue)) {
          context.output(KV.of(currentElementKey, updateValue));
        }
      } else {
        index.remove(currentElementKey);
        context.output(KV.of(currentElementKey, updateValue));
      }
    } else {
      index.put(currentElementKey, currentElementValue);
    }
  }
}

Json to Proto/AutoValue

public class JsonToJournalRecords
    extends PTransform<PCollection<String>, PCollection<JournalRecord<JsonNode>>> {

  private static final JsonToJournalRecords INSTANCE = new JsonToJournalRecords();
  private static final ObjectMapper OBJECT_MAPPER = ObjectMapperBuilder.get();
  private static final ObjectReader OBJECT_READER = OBJECT_MAPPER.readerFor(
      new TypeReference<JournalRecord<JsonNode>[]>() {});

  public static JsonToJournalRecords of() {
    return INSTANCE;
  }

  @Override
  public PCollection<JournalRecord<JsonNode>> expand(PCollection<String> input) {
    return input.apply(
        FlatMapElements
            .into(new TypeDescriptor<JournalRecord<JsonNode>>() {})
            .via(JsonToJournalRecords::parse));
  }

  private static ImmutableList<JournalRecord<JsonNode>> parse(final String record) {
    try {
      return ImmutableList.copyOf(OBJECT_READER.<JournalRecord<JsonNode>[]>readValue(record));
    } catch (IOException e) {
      throw new RuntimeException(
          String.format("Unable to parse input record as an %s or array", JsonNode.class.getName()),
          e);
    }
  }
}

Use AutoValue and JSON annotation to create builder class

@AutoValue
@JsonSerialize(as = JournalRecord.class)
public abstract class JournalRecord<T> implements Serializable {
  @JsonCreator
  public static <T> JournalRecord<T> of(
      @JsonProperty("json") final T message,
      @JsonProperty("clock") final String clock,
      @JsonProperty("updatedAt") final Instant updatedAt,
      @JsonProperty("status") final String status) {
    // work around https://github.com/FasterXML/jackson-databind/issues/921
    // by removing @JsonDeserialize(builder = AutoValue_ProtoECommEntry.Builder.class)
    // from the class and adding this JsonCreator instead...

    return new AutoValue_JournalRecord.Builder<T>()
        .setMessage(message)
        .setClock(clock)
        .setUpdatedAt(updatedAt)
        .setStatus(status)
        .build();
  }

  @AutoValue.Builder
  public abstract static class Builder<T> {
    public abstract Builder<T> setMessage(T message);

    public abstract Builder<T> setClock(String clock);

    public abstract Builder<T> setUpdatedAt(Instant updatedAt);

    public abstract Builder<T> setStatus(String status);

    public abstract JournalRecord<T> build();
  }

  @JsonProperty("json")
  public abstract T getMessage();

  @JsonProperty("clock")
  public abstract String getClock();

  @JsonProperty("updatedAt")
  public abstract Instant getUpdatedAt();

  @JsonProperty("status")
  public abstract String getStatus();
}

Read from Kinesis

KinesisIO
        .read()
        .withPollingInterval(Duration.millis(1000))
        .withStreamName(kinesisStreamName)
        .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
        .withAWSClientsProvider(KinesisClientsProvider.of(runMode))
        .withArrivalTimeWatermarkPolicy();

Triggers

Single global window for its windowing function. This can be useful when you want your pipeline to provide periodic updates on an unbounded data set — for example, a running average of all data provided to the present time, updated every N seconds or every N elements.

Event based trigger

      // Create a bill at the end of the month.
        AfterWatermark.pastEndOfWindow()
            // During the month, get near real-time estimates.
            .withEarlyFirings(
                AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDuration(Duration.standardMinutes(1))
            // Fire on any late data so the bill can be corrected.
            .withLateFirings(AfterPane.elementCountAtLeast(1))

  PCollection<String> pc = ...;
  pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES))
                               .triggering(AfterProcessingTime.pastFirstElementInPane()
                                                              .plusDelayOf(Duration.standardMinutes(1)))
                               .discardingFiredPanes());

AfterPane.elementCountAtLeast()