hadoop - spark - pig script
Cómo manejar la memoria de derrame en el cerdo (1)
Mi código me gusta así:
pymt = LOAD ''pymt'' USING PigStorage(''|'') AS ($pymt_schema);
pymt_grp = GROUP pymt BY key
results = FOREACH pymt_grp {
/*
* some kind of logic, filter, count, distinct, sum, etc.
*/
}
Pero ahora encuentro muchos registros como ese:
org.apache.pig.impl.util.SpillableMemoryManager: Spilled an estimate of 207012796 bytes from 1 objects. init = 5439488(5312K) used = 424200488(414258K) committed = 559284224(546176K) max = 559284224(546176K)
De hecho, encuentro la causa, la razón principal es que hay una clave "caliente", algo así como key = 0 como dirección IP, pero no quiero filtrar esta clave. ¿Hay alguna solución? Implementé una interfaz algebraica y de acumuladores en mi UDF.
Tuve problemas similares con datos muy sesgados o DISTINCT anidados en FOREACH (ya que PIG hará una memoria diferente). La solución fue sacar DISTINCT de FOREACH como ejemplo, ver mi respuesta a ¿Cómo optimizar un grupo por declaración en PIG latin?
Si no desea hacer DISTINCT antes de SUM y COUNT de lo que sugeriría, use 2 GROUP BY. El primero agrupa en la columna Clave más otra columna o el número aleatorio mod 100, actúa como un Sal (para separar los datos de una sola clave en múltiples Reductores). Que el segundo GROUP BY solo en la columna Key y calcule la SUM final del grupo 1 COUNT o Sum.
Ex:
inpt = load ''/data.csv'' using PigStorage('','') as (Key, Value);
view = foreach inpt generate Key, Value, ((int)(RANDOM() * 100)) as Salt;
group_1 = group view by (Key, Salt);
group_1_count = foreach group_1 generate group_1.Key as Key, COUNT(view) as count;
group_2 = group group_1_count by Key;
final_count = foreach group_2 generate flatten(group) as Key, SUM(group_1_count.count) as count;