hadoop - spark - pig script
Apache Pig: FLATTEN y ejecuciĆ³n paralela de reductores (4)
Creo que hay una asimetría en los datos. Solo un pequeño número de mapeadores está produciendo una producción exponencialmente grande. Mire la distribución de claves en sus datos. Al igual que los datos contienen pocos grupos con gran cantidad de registros.
Implementé un script de Apache Pig. Cuando ejecuto el script, resulta en muchos mapeadores para un paso específico, pero solo tiene un reductor para ese paso. Debido a esta condición (muchos mapeadores, un reductor) el clúster Hadoop está casi inactivo mientras se ejecuta el único reductor. Para utilizar mejor los recursos del clúster, me gustaría tener muchos reductores corriendo en paralelo.
Incluso si configuré el paralelismo en el script de Pig utilizando el comando SET DEFAULT_PARALLEL, aún así tengo solo 1 reductor.
La parte del código que emite el problema es la siguiente:
SET DEFAULT_PARALLEL 5;
inputData = LOAD ''input_data.txt'' AS (group_name:chararray, item:int);
inputDataGrouped = GROUP inputData BY (group_name);
-- The GeneratePairsUDF generates a bag containing pairs of integers, e.g. {(1, 5), (1, 8), ..., (8, 5)}
pairs = FOREACH inputDataGrouped GENERATE GeneratePairsUDF(inputData.item) AS pairs_bag;
pairsFlat = FOREACH pairs GENERATE FLATTEN(pairs_bag) AS (item1:int, item2:int);
Los alias ''inputData'' y ''inputDataGrouped'' se calculan en el asignador.
Los ''pares'' y ''pairsFlat'' en el reductor.
Si cambio el script eliminando la línea con el comando FLATTEN (pairsFlat = FOREACH pares GENERATE FLATTEN (pairs_bag) AS (item1: int, item2: int);) entonces la ejecución da como resultado 5 reductores (y por lo tanto en una ejecución paralela) .
Parece que el comando FLATTEN es el problema y evita que se creen muchos reductores.
¿Cómo podría alcanzar el mismo resultado de FLATTEN pero hacer que el script se ejecute en paralelo (con muchos reductores)?
Editar:
EXPLICAR plan al tener dos FOREACH (como arriba):
Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-32
| |
| Project[chararray][0] - scope-33
|
|---inputData: New For Each(false,false)[bag] - scope-29
| |
| Cast[chararray] - scope-24
| |
| |---Project[bytearray][0] - scope-23
| |
| Cast[int] - scope-27
| |
| |---Project[bytearray][1] - scope-26
|
|---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-22--------
Reduce Plan
pairsFlat: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42
|
|---pairsFlat: New For Each(true)[bag] - scope-41
| |
| Project[bag][0] - scope-39
|
|---pairs: New For Each(false)[bag] - scope-38
| |
| POUserFunc(GeneratePairsUDF)[bag] - scope-36
| |
| |---Project[bag][1] - scope-35
| |
| |---Project[bag][1] - scope-34
|
|---inputDataGrouped: Package[tuple]{chararray} - scope-31--------
Global sort: false
EXPLICAR el plan al tener solo una FOREACH con FLATTEN envolviendo el UDF:
Map Plan
inputDataGrouped: Local Rearrange[tuple]{chararray}(false) - scope-29
| |
| Project[chararray][0] - scope-30
|
|---inputData: New For Each(false,false)[bag] - scope-26
| |
| Cast[chararray] - scope-21
| |
| |---Project[bytearray][0] - scope-20
| |
| Cast[int] - scope-24
| |
| |---Project[bytearray][1] - scope-23
|
|---inputData: Load(file:///input_data.txt:org.apache.pig.builtin.PigStorage) - scope-19--------
Reduce Plan
pairs: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---pairs: New For Each(true)[bag] - scope-35
| |
| POUserFunc(GeneratePairsUDF)[bag] - scope-33
| |
| |---Project[bag][1] - scope-32
| |
| |---Project[bag][1] - scope-31
|
|---inputDataGrouped: Package[tuple]{chararray} - scope-28--------
Global sort: false
Intenté "configurar predeterminado paralelo" y "PARALELO 100", pero no tuve suerte. Cerdo todavía usa 1 reductor.
Resultó que tengo que generar un número aleatorio de 1 a 100 para cada registro y agrupar estos registros por ese número aleatorio.
Estamos perdiendo el tiempo en la agrupación, pero es mucho más rápido para mí porque ahora puedo usar más reductores.
Aquí está el código (EL ENVIADOR es mi propia UDF):
tmpRecord = FOREACH record GENERATE (int)(RANDOM()*100.0) as rnd, data;
groupTmpRecord = GROUP tmpRecord BY rnd;
result = FOREACH groupTmpRecord GENERATE FLATTEN(SUBMITTER(tmpRecord));
No hay garantía si el cerdo usa el valor DEFAULT_PARALLEL de configuración para cada paso del script de cerdo. Pruebe PARALELO junto con su paso específico de unión / grupo que le parezca que lleva tiempo (en su caso, paso de GRUPO).
inputDataGrouped = GROUP inputData BY (group_name) PARALLEL 67;
Si todavía no funciona, es posible que deba ver los datos para determinar si se trata de un problema de asimetría.
Para responder a su pregunta, primero debemos saber cuántos corales de reductores impone para lograr el proceso de reorganización global. Porque, según mi comprensión, Generar / Proyección no debería requerir un único reductor. No puedo decir lo mismo de Flatten. Sin embargo, sabemos por sentido común que, durante el aplanar, el objetivo es eliminar la nidificación de las tuplas de las bolsas y viceversa. Y para hacer eso, todas las tuplas pertenecientes a una bolsa definitivamente deberían estar disponibles en el mismo reductor. Podría estar equivocado. Pero, ¿alguien puede agregar algo aquí para obtener una respuesta de este usuario, por favor?