Flink 的神奇分流器:sideoutput

收藏待读

Flink 的神奇分流器:sideoutput

今天浪尖給大家講講 flink 的一個神奇功能, sideouptut 側輸出。

為了說明側輸出 (sideouptut) 的作用,浪尖舉個例子,比如現在有一篇文章吧,單詞長度不一,但是我們想對單詞長度小於 5 的單詞進行 wordcount 操作,同時又想記錄下來哪些單詞的長度大於了 5 ,那麼我們該如何做呢?

比如, Datastream 是單詞流,那麼一般做法 ( 只寫了代碼模版 )

datastream.filter(word.length>=5); // 獲取不統計的單詞,也即是單詞長度大於等於 5

datastream.filter(word.length <5);// 獲取需要進行 wordcount 的單詞。

這樣數據,然後每次篩選都要保留整個流,然後遍歷整個流,顯然很浪費性能,假如能夠在一個流了多次輸出就好了, flink 的側輸出提供了這個功能,側輸出的輸出 (sideoutput) 類型可以與主流不同,可以有多個側輸出 (sideoutput) ,每個側輸出不同的類型。

下面浪尖就來具體講一下如何使用側輸出。

1. 定義 OutputTag

在使用側輸出的時候需要先定義一個 OutputTag 。定義方式,如下:

OutputTag outputTag = newOutputTag(“side-output”) {};

OutputTag 有兩個構造函數,上面例子構造函數只有一個 id 參數,還有一個構造函數包括兩個參數, id TypeInformation 信息。

OutputTag(String id)
OutputTag(String id, TypeInformationtypeInfo)

2. 使用特定的函數

要使用側輸出,在處理數據的時候除了要定義相應類型的 OutputTag 外,還要使用特定的函數,主要是有四個:

ProcessFunction

CoProcessFunction

ProcessWindowFunction

ProcessAllWindowFunction

本文主要是以 ProcessFunction 為例講解如何使用 flink 的側輸出 (sideoutput) 功能,具體這幾個函數的深入含義及應用,後面再出文章分析。

上述函數中暴漏了 Context 參數給用戶,讓用戶可以將數據通過 outputtag 發給側輸出流。

3. 案例

準備數據

/**
 * Provides the default data sets used for the WordCount example program.
 * The default data sets are used, if no parameters are given to the program.
 *
 */
public class WordCountData {

   public static final String[] WORDS = new String[] {
      "To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer",
      "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,",
      "And by opposing end them?--To die,--to sleep,--",
      "No more; and by a sleep to say we end",
      "The heartache, and the thousand natural shocks",
      "That flesh is heir to,--'tis a consummation",
      "Devoutly to be wish'd. To die,--to sleep;--",
      "To sleep! perchance to dream:--ay, there's the rub;",
      "For in that sleep of death what dreams may come,",
      "When we have shuffled off this mortal coil,",
      "Must give us pause: there's the respect",
      "That makes calamity of so long life;",
      "For who would bear the whips and scorns of time,",
      "The oppressor's wrong, the proud man's contumely,",
      "The pangs of despis'd love, the law's delay,",
      "The insolence of office, and the spurns",
      "That patient merit of the unworthy takes,",
      "When he himself might his quietus make",
      "With a bare bodkin? who would these fardels bear,",
      "To grunt and sweat under a weary life,",
      "But that the dread of something after death,--",
      "The undiscover'd country, from whose bourn",
      "No traveller returns,--puzzles the will,",
      "And makes us rather bear those ills we have",
      "Than fly to others that we know not of?",
      "Thus conscience does make cowards of us all;",
      "And thus the native hue of resolution",
      "Is sicklied o'er with the pale cast of thought;",
      "And enterprises of great pith and moment,",
      "With this regard, their currents turn awry,",
      "And lose the name of action.--Soft you now!",
      "The fair Ophelia!--Nymph, in thy orisons",
      "Be all my sins remember'd."
   };
}

定義 OutputTag 對象:

private static final OutputTag rejectedWordsTag = new OutputTag("rejected") {};

定義 ProcessFunction 函數:

/**
 * 以用戶自定義FlatMapFunction函數的形式來實現分詞器功能,該分詞器會將分詞封裝為(word,1),
 * 同時不接受單詞長度大於5的,也即是側輸出都是單詞長度大於5的單詞。
 */
public static final class Tokenizer extends ProcessFunction<String, Tuple2> {
   private static final long serialVersionUID = 1L;

   @Override
   public void processElement(
         String value,
         Context ctx,
         Collector<Tuple2> out) throws Exception {
      // normalize and split the line
      String[] tokens = value.toLowerCase().split("W+");

      // emit the pairs
      for (String token : tokens) {
         if (token.length() > 5) {
            ctx.output(rejectedWordsTag, token);
         } else if (token.length() > 0) {
            out.collect(new Tuple2(token, 1));
         }
      }

   }
}

初始化 flink ,並使用側輸出:

public static void main(String[] args) throws Exception {

   // set up the execution environment
   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

   env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

   // 獲取輸入數據
   DataStream text = env.fromElements(WordCountData.WORDS);

   SingleOutputStreamOperator<Tuple2> tokenized = text
         .keyBy(new KeySelector() {
            private static final long serialVersionUID = 1L;

            @Override
            public Integer getKey(String value) throws Exception {
               return 0;
            }
         })
         .process(new Tokenizer());

   // 獲取側輸出
   DataStream rejectedWords = tokenized
         .getSideOutput(rejectedWordsTag)
         .map(new MapFunction() {
            private static final long serialVersionUID = 1L;

            @Override
            public String map(String value) throws Exception {
               return "rejected: " + value;
            }
         });
   
   DataStream<Tuple2> counts = tokenized
         .keyBy(0)
         .window(TumblingEventTimeWindows.of(Time.seconds(5)))
         .sum(1);

   // wordcount結果輸出
   counts.print();
   // 側輸出結果輸出
   rejectedWords.print();
   
   // execute program
   env.execute("Streaming WordCount SideOutput");
}

直接本地運行,查看結果:

Flink 的神奇分流器:sideoutput

推薦閱讀:

調試flink源碼

Flink異步IO第一講

結合Spark講一下Flink的runtime

乾貨|kafka流量監控的原理及實現

Flink 的神奇分流器:sideoutput

原文 :

相關閱讀

免责声明:本文内容来源于mp.weixin.qq.com,已注明原文出处和链接,文章观点不代表立场,如若侵犯到您的权益,或涉不实谣言,敬请向我们提出检举。