tutorial spark pig hadoop mapreduce bigdata apache-pig

spark - map reduce hadoop



Agrupar una bolsa de datos por valores idénticos en el cerdo (2)

Creé el siguiente script Pig para filtrar las frases de una colección de documentos web (Common Crawl) que mencionan un título de película (de un archivo de datos predefinido de títulos de películas), aplicar análisis de sentimiento en esas oraciones y agrupar esos sentimientos por película.

register ../commoncrawl-examples/lib/*.jar; set mapred.task.timeout= 1000; register ../commoncrawl-examples/dist/lib/commoncrawl-examples-1.0.1-HM.jar; register ../dist/lib/movierankings-1.jar register ../lib/piggybank.jar; register ../lib/stanford-corenlp-full-2014-01-04/stanford-corenlp-3.3.1.jar; register ../lib/stanford-corenlp-full-2014-01-04/stanford-corenlp-3.3.1-models.jar; register ../lib/stanford-corenlp-full-2014-01-04/ejml-0.23.jar; register ../lib/stanford-corenlp-full-2014-01-04/joda-time.jar; register ../lib/stanford-corenlp-full-2014-01-04/jollyday.jar; register ../lib/stanford-corenlp-full-2014-01-04/xom.jar; DEFINE IsNotWord com.moviereviewsentimentrankings.IsNotWord; DEFINE IsMovieDocument com.moviereviewsentimentrankings.IsMovieDocument; DEFINE ToSentenceMoviePairs com.moviereviewsentimentrankings.ToSentenceMoviePairs; DEFINE ToSentiment com.moviereviewsentimentrankings.ToSentiment; DEFINE MoviesInDocument com.moviereviewsentimentrankings.MoviesInDocument; DEFINE SequenceFileLoader org.apache.pig.piggybank.storage.SequenceFileLoader(); -- LOAD pages, movies and words pages = LOAD ''../data/textData-*'' USING SequenceFileLoader as (url:chararray, content:chararray); movies_fltr_grp = LOAD ''../data/movie_fltr_grp_2/part-*'' as (group: chararray,movies_fltr: {(movie: chararray)}); -- FILTER pages containing movie movie_pages = FILTER pages BY IsMovieDocument(content, movies_fltr_grp.movies_fltr); -- SPLIT pages containing movie in sentences and create movie-sentence pairs movie_sentences = FOREACH movie_pages GENERATE flatten(ToSentenceMoviePairs(content, movies_fltr_grp.movies_fltr)) as (content:chararray, movie:chararray); -- Calculate sentiment for each movie-sentence pair movie_sentiment = FOREACH movie_sentences GENERATE flatten(ToSentiment(movie, content)) as (movie:chararray, sentiment:int); -- GROUP movie-sentiment pairs by movie movie_sentiment_grp_tups = GROUP movie_sentiment BY movie; -- Reformat and print movie-sentiment pairs movie_sentiment_grp = FOREACH movie_sentiment_grp_tups GENERATE group, movie_sentiment.sentiment AS sentiments:{(sentiment: int)}; describe movie_sentiment_grp;

Las pruebas realizadas en un pequeño subconjunto del rastreo web demostraron que me daban pares de títulos de películas con una bolsa de datos de enteros (del 1 al 5, que representa muy negativo, negativo, neutral, positivo y muy positivo). Como último paso, me gustaría transformar estos datos en títulos de películas por parejas y una bolsa de datos que contenga tuplas con todos los enteros distintos existentes para este título de película y su recuento. La descripción movie_sentiment_grp al final del script devuelve:

movie_sentiment_grp: {group: chararray,sentiments: {(sentiment: int)}}

Así que, básicamente, probablemente necesite EXAMINAR cada elemento de movie_sentiment_grp y agrupar los sentimientos en bolsas de datos en grupos de valores idénticos y luego usar la función COUNT () para obtener la cantidad de elementos en cada grupo. Sin embargo, no pude encontrar nada sobre cómo agrupar una bolsa de datos de enteros en grupos de valores idénticos. ¿Alguien sabe como hacer esto?

Solución ficticia:

movie_sentiment_grp_cnt = FOREACH movie_sentiment_grp{ sentiments_grp = GROUP sentiments BY ?; }


Estabas en el camino correcto. movie_sentiment_grp está en el formato correcto y un FOREACH anidado sería correcto, excepto que no puede usar un GROUP en él. La solución es usar un UDF. Algo como esto:

myudfs.py

#!/usr/bin/python @outputSchema(''sentiments: {(sentiment:int, count:int)}'') def count_sentiments(BAG): res = {} for s in BAG: if s in res: res[s] += 1 else: res[s] = 1 return res.items()

Esta UDF se usa como:

Register ''myudfs.py'' using jython as myfuncs; movie_sentiment_grp_cnt = FOREACH movie_sentiment_grp GENERATE group, myfuncs.count_sentiments(sentiments) ;


Echa un vistazo a CountEach UDF desde Apache DataFu . Con una bolsa, se producirá una nueva bolsa de las distintas tuplas, con el recuento adjunto a cada tupla correspondiente.

El ejemplo de la documentación debería dejar esto en claro:

DEFINE CountEachFlatten datafu.pig.bags.CountEach(''flatten''); -- input: -- ({(A),(A),(C),(B)}) input = LOAD ''input'' AS (B: bag {T: tuple(alpha:CHARARRAY, numeric:INT)}); -- output_flatten: -- ({(A,2),(C,1),(B,1)}) output_flatten = FOREACH input GENERATE CountEachFlatten(B);

Para su caso:

DEFINE CountEachFlatten datafu.pig.bags.CountEach(''flatten''); movie_sentiment_grp_cnt = FOREACH movie_sentiment_grp GENERATE group, CountEach(sentiments);