Below is one of the good collection of examples for most frequently used functions in Pig. Pig Functions Examples.
Table of Contents
- Contents
- LOAD
- DESCRIEBE/EXPLAIN/ILLUSTRATE
- FOREACH
- GROUP
- STORE
- LIMIT
- ORDER
- DISTINCT
- JOIN
- JOIN USING MULTIPLE KEYS
- OUTER JOINS:
- SELF JOIN:
- COUNT NUMBER OF ROWS IN SELF JOIN’S OUTPUT:
- SAMPLE:
- PARALLEL:
- UDF:REGISTER
- UDF:DEFINE
- CALLING JAVA STATIC FUNCTIONS:
- FLATTEN
- REPLACE EMPTY BAG WITH CONSTANT BAG:
- NESTED FOREACH
- ORDER BY THE group:
- SAMPLE SCRIPTS:
- EXEXUTE PIG SCRIPT COMMAND LINE:
- PIG VERSION
- Share this:
Contents
COUNT NUMBER OF ROWS IN SELF JOIN’S OUTPUT
REPLACE EMPTY BAG WITH CONSTANT BAG
EXEXUTE PIG SCRIPT COMMAND LINE
LOAD
1 2 3 4 5 |
grunt> records = LOAD 'Downloads/s2.txt' as (date:chararray, temp:int); grunt> records2 = LOAD 'data/NYSE_daily' as (exchange, stock); grunt> records1 = LOAD 'data/NYSE_daily' as (exchange, symbol, date); grunt> records = LOAD 'data/NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); |
DESCRIEBE/EXPLAIN/ILLUSTRATE
1 2 3 4 |
grunt> describe records; grunt> explain daily; grunt>illustrate grpsymbol; |
FOREACH
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
grunt> list_temp = FOREACH records GENERATE temp; grunt> records_parsed = FOREACH records GENERATE SUBSTRING(date, 0,4) as year,temp as temperature; grunt> begin_columns_only = FOREACH records GENERATE ..date; grunt> middle_columns_only = FOREACH records GENERATE date..low; grunt> end_columns_only = FOREACH records GENERATE low..; grunt> count_occurence = FOREACH group_difference GENERATE group, COUNT(difference.diff); grunt> total_count = FOREACH stock_count_group GENERATE SUM(stock_count.$1); grunt>stock_count = FOREACH stock_group GENERATE group, COUNT(records2.stock); grunt> sum_symbols = FOREACH group_count_symbols GENERATE SUM(count_symbols.$0); grunt> count_symbols = FOREACH group_distinct_records3 GENERATE COUNT(distinct_records3.$1); grunt> sum_symbols = FOREACH group_count_symbols GENERATE SUM(count_symbols.$0); grunt>total = foreach grp_daily generate group, COUNT(daily.symbol); grunt> gain1 = FOREACH records GENERATE symbol, date, $6 - $3; grunt>total = foreach grp_daily generate group, COUNT(daily.symbol); |
GROUP
1 2 3 4 5 6 7 8 9 10 11 |
grunt> group_records = GROUP records_parsed BY year; grunt> max_temp = FOREACH group_records GENERATE records_parsed.year, MAX(records_parsed.temperature); grunt> difference = FOREACH records GENERATE symbol, (high == low ? 'SAME' : 'DIFFERENT') as diff; grunt> group_difference = GROUP difference BY symbol; grunt> group_difference2 = GROUP difference BY diff; grunt> stock_group = GROUP records2 by stock; grunt> stock_count_group = GROUP stock_count ALL; grunt> group_distinct_records3 = GROUP distinct_records3 BY $1; grunt> group_count_symbols = GROUP count_symbols ALL; grunt>grp_daily = group daily by exchange; |
STORE
1 2 3 4 |
grunt> store group_difference into 'group_difference'; grunt> store group_difference2 into 'group_difference2'; grunt> store total into 'output/daily_total_records'; |
LIMIT
1 2 3 4 |
grunt> first10records1 = LIMIT records1 10; grunt> pos_top5 = LIMIT pos 5; grunt> joined_first_10 = LIMIT joined 10; |
ORDER
1 2 |
grunt> ordered_first10records1 = ORDER first10records1 BY date, symbol; |
DISTINCT
1 2 3 |
grunt> distinct_records3 = DISTINCT records3; grunt> distinct_records4 = DISTINCT records4; |
JOIN
1 2 3 4 |
grunt> daily = LOAD 'data/NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); grunt> dividends = LOAD 'data/NYSE_dividends' as (exchange, symbol, date, dividends); grunt> joined = JOIN daily by symbol, dividends by symbol; |
JOIN USING MULTIPLE KEYS
1 2 |
grunt> joined2 = JOIN daily by (symbol,date), dividends by (symbol,date); |
OUTER JOINS:
1 2 3 4 |
grunt> joined_left_outer = JOIN daily BY (symbol, date) LEFT OUTER, dividends BY (symbol, date); grunt> joined_right_outer = JOIN daily BY (symbol, date) RIGHT, dividends BY (symbol, date); grunt> joined_full_outer = JOIN daily BY (symbol, date) FULL OUTER, dividends BY (symbol, date); |
SELF JOIN:
1 2 3 4 5 |
grunt> dividends1 = LOAD 'data/NYSE_dividends' as (exchange, symbol, date, dividends); grunt> dividends2 = LOAD 'data/NYSE_dividends' as (exchange, symbol, date, dividends); grunt> self_join = JOIN dividends1 BY symbol, dividends2 BY symbol; grunt> increased = FILTER self_join BY dividends1::date < dividends2::date AND dividends1::dividends < dividends2::dividends; |
COUNT NUMBER OF ROWS IN SELF JOIN’S OUTPUT:
1 2 3 4 5 6 7 |
grunt> increased_group = GROUP increased ALL; grunt> increased_count = FOREACH increased_group GENERATE COUNT(increase $1); grunt> count_group = GROUP increased_count ALL; grunt> count = FOREACH count_group GENERATE SUM(increased_count.$1); ERROR:Index 1 out of range in schema::long grunt> count = FOREACH count_group GENERATE SUM(increased_count.$0); |
SAMPLE:
1 2 3 4 |
grunt> dividends = LOAD 'data/NYSE_dividends' as (exchange, symbol, date, dividends); grunt> dividends1percent = SAMPLE dividends .01; grunt> dump dividends1percent; |
PARALLEL:
1 2 3 4 5 6 |
grunt> SET DEFAULT_PARALLEL 10; grunt> records = LOAD 'data/NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); grunt> bysymbol = GROUP records BY symbol PARALLEL 10; grunt> average = FOREACH bysymbol GENERATE group, AVG(records.close) AS avg; grunt> sorted = ORDER average BY avg PARALLEL 2; |
UDF:REGISTER
1 2 3 4 |
grunt> REGISTER Downloads/piggybank.jar; grunt> records = LOAD 'data/NYSE_dividends' as (exchange:chararray, symbol:chararray); grunt> backwards = FOREACH records GENERATE org.apache.pig.piggybank.evaluation.string.Reverse(symbol); |
UDF:DEFINE
1 2 3 4 5 |
grunt> REGISTER Downloads/piggybank.jar grunt> DEFINE reverse org.apache.pig.piggybank.evaluation.string.Reverse(); grunt> records = LOAD 'data/NYSE_dividends' as (exchange:chararray, symbol:chararray); grunt> backwards = FOREACH records GENERATE reverse(symbol); |
CALLING JAVA STATIC FUNCTIONS:
1 2 3 4 5 6 |
grunt> DEFINE hex InvokeForString('java.lang.Integer.toHexString', 'int'); grunt> records = LOAD 'data/NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); grunt> nonnull = FILTER records BY volume IS NOT NULL; grunt> in_hex = FOREACH nonnull GENERATE symbol, hex((int)volume); --NOTE: public static String toHexString(int i) is a method of class java.lang.Integer [public final class Integer] |
FLATTEN
1 2 3 |
grunt> records = LOAD 'data/baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); grunt> pos = FOREACH records GENERATE name, FLATTEN(position) as position; |
REPLACE EMPTY BAG WITH CONSTANT BAG:
1 2 |
no_empty = FOREACH records GENERATE name, ((position IS NULL OR IsEmpty(position)) ? {('unknown')}: position) as position; |
NESTED FOREACH
1 2 3 4 5 6 7 8 |
daily = LOAD group_daily = GROUP daily BY exchange; unique_count = FOREACH group_daily{ sym = daily.symbol; distinct_sym = DISTINCT sym; GENERATE group, COUNT(symbol); }; |
1 2 3 4 5 6 7 8 |
grunt> daily = LOAD 'data/NYSE_daily' as (exchange, symbol); grunt> grpdaily = GROUP daily BY exchange; grunt> ucount = FOREACH grpdaily { sym=daily.symbol; unique_symbol=DISTINCT sym; GENERATE group,COUNT(unique_symbol); }; |
1 2 3 4 5 6 7 8 9 10 11 |
grunt> grpsymbol = GROUP ucount ALL; grunt> total_count = FOREACH grpsymbol GENERATE SUM(ucount.$1); grunt> REGISTER acme.jar grunt> DEFINE analyze com.acme.financial.AnalyzeStock(); grunt> daily = LOAD 'data/NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray,open:float,high:float, low:flo)at, close:float, volumne:int, adjclose:float); grunt> grpdaily = GROUP daily BY symbol; grunt> analyzed = FOREACH grpdaily { sorted = ORDER daily BY date; GENERATE group, analyze(sorted); }; |
1 2 3 4 5 6 7 8 |
grunt> records = LOAD 'data/NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); grunt> grouped = GROUP records BY symbol; grunt> top3 = FOREACH grouped { sorted = ORDER records BY dividends DESC; top = LIMIT sorted 3; GENERATE group, flatten(top); }; |
–NOTE: DOESN’T SEEM TO WORK; top3 spits > 3 records
ORDER BY THE group:
1 2 3 4 5 |
grunt> records = LOAD 'data/NYSE_dividends' as (exchange:chararray, symbol:chararray)Ó grunt> grouped = GROUP records BY symbol; grunt> ordered_group = ORDER grouped BY group; grunt> first5 = LIMIT ordered_group 5; |
SAMPLE SCRIPTS:
1 2 3 4 5 6 7 |
grunt> dividends = load 'data/NYSE_dividends' as (e:chararray, s:chararray, d:chararray, div:float); grunt> recent = filter dividends by d > '2009-01-01'; grunt> trimmed = foreach recent generate s, div; grunt> grpd = group trimmed by s; grunt> avgdiv = foreach grpd generate group, AVG(trimmed.div); grunt> illustrate avgdiv; |
[cloudera@localhost ~]$ pig -e ‘illustrate -script scripts/illustrate.pig’;
ERROR:
1 2 3 4 5 |
grunt> records = LOAD 'data/NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); grunt> difference = FOREACH records GENERATE symbol, (high == low ? 'SAME' : 'DIFFERENT') as diff ; grunt> group_difference2 = GROUP difference BY diff; grunt> count_occurence2 = FOREACH group_difference2 GENERATE group, COUNT(difference.diff); |
1 2 3 4 5 |
grunt> records = LOAD 'data/NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); grunt> difference = FOREACH records GENERATE symbol, (high == low ? 'SAME' : 'DIFFERENT') as diff ; grunt> group_difference = GROUP difference BY symbol; grunt> count_occurence = FOREACH group_difference GENERATE group, COUNT(difference.diff); |
OTHER:
EXEXUTE PIG SCRIPT COMMAND LINE:
1 2 3 4 5 |
[cloudera@localhost ~]$ pig -x local scripts/grpdaily.pig [cloudera@localhost ~]$ pig -x local scripts/daily_total_count.pig [cloudera@localhost ~]$ pig -x local -e 'explain -script scripts/daily_total_count.pig' [cloudera@localhost ~]$ pig -x local -e 'explain -dot -out output/daily_total_count_dot_out -script scripts/daily_total_count.pig'; |
PIG VERSION
1 2 3 |
[cloudera@localhost ~]$ pig -version Apache Pig version 0.11.0-cdh4.3.0 (rexported) |
–HADOOP VERSION
1 2 3 |
[cloudera@localhost ~]$ hadoop version Hadoop 2.0.0-cdh4.3.0 |
1 2 3 4 5 |
[cloudera@localhost ~]$pig -x local [cloudera@localhost ~]$ pig -h [cloudera@localhost ~]$ pig -h properties [cloudera@localhost ~]$ pig -e fs -ls |
Note: above command lists the HDFS directory; results are same as executing the command: hadoop fs -ls
Thank You Ganesh Pillai for such a good documentation of these commands with examples.