UDAF 就是一個例子

  • 建立一個擴充套件 org.apache.hadoop.hive.ql.exec.hive.UDAF 的 Java 類建立一個實現 UDAFEvaluator 的內部類

  • 實施五種方法

    • init() - 此方法初始化賦值器並重置其內部狀態。我們在下面的程式碼中使用新的 Column() 來表示尚未彙總任何值。
    • iterate() - 每次有一個要聚合的新值時,都會呼叫此方法。評估者應該用執行聚合的結果更新其內部狀態(我們正在做總結 - 見下文)。我們返回 true 表示輸入有效。
    • terminatePartial() - 當 Hive 想要部分聚合的結果時呼叫此方法。該方法必須返回一個封裝聚合狀態的物件。
    • merge() - 當 Hive 決定將一個部分聚合與另一個聚合時,呼叫此方法。
    • terminate() - 當需要聚合的最終結果時呼叫此方法。
    public class MeanUDAF extends UDAF {
    // Define Logging
    static final Log LOG = LogFactory.getLog(MeanUDAF.class.getName());
    public static class MeanUDAFEvaluator implements UDAFEvaluator {
    /**
     * Use Column class to serialize intermediate computation
     * This is our groupByColumn
     */
    public static class Column {
     double sum = 0;
     int count = 0;
     }
    private Column col = null;
    public MeanUDAFEvaluator() {
     super();
     init();
     }
    // A - Initalize evaluator - indicating that no values have been
    // aggregated yet.
    public void init() {
     LOG.debug("Initialize evaluator");
     col = new Column();
     }
    // B- Iterate every time there is a new value to be aggregated
     public boolean iterate(double value) throws HiveException {
     LOG.debug("Iterating over each value for aggregation");
     if (col == null)
     throw new HiveException("Item is not initialized");
     col.sum = col.sum + value;
     col.count = col.count + 1;
     return true;
     }
    // C - Called when Hive wants partially aggregated results.
     public Column terminatePartial() {
     LOG.debug("Return partially aggregated results");
     return col;
     }
     // D - Called when Hive decides to combine one partial aggregation with another
     public boolean merge(Column other) {
     LOG.debug("merging by combining partial aggregation");
     if(other == null) {
     return true;
     }
     col.sum += other.sum;
     col.count += other.count;
     return true; 
    }
     // E - Called when the final result of the aggregation needed.
     public double terminate(){
     LOG.debug("At the end of last record of the group - returning final result"); 
     return col.sum/col.count;
     }
     }
    }

    hive> CREATE TEMPORARY FUNCTION <FUNCTION NAME> AS 'JAR PATH.jar';
    hive> select id, mean_udf(amount) from table group by id;