mexico ecocolmena campo abejas hadoop avro flume-ng flume-twitter

hadoop - ecocolmena - campo y abejas radio web



No se puede cargar correctamente los datos de avro de twitter en la tabla de colmenas (1)

¡Necesitamos su ayuda!

Intento un ejercicio trivial para obtener los datos de Twitter y luego cargarlos en Hive para su análisis. Aunque puedo obtener datos en HDFS usando canal (usando Twitter 1% Firehose Source) y también puedo cargar los datos en la tabla Hive.

Pero no pude ver todas las columnas que esperaba que estuvieran allí en los datos de Twitter, como user_location, user_description, user_friends_count, user_description, user_statuses_count. El esquema derivado de Avro solo contiene dos columnas, encabezado y cuerpo.

Debajo están los pasos que he hecho:

1) crear un agente de canalización con la siguiente conf:

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type =org.apache.flume.source.twitter.TwitterSource #a1.sources.r1.type = com.cloudera.flume.source.TwitterSource a1.sources.r1.consumerKey =XXXXXXXXXXXXXXXXXXXXXXXXXXXX a1.sources.r1.consumerSecret =XXXXXXXXXXXXXXXXXXXXXXXXXXXX a1.sources.r1.accessToken =XXXXXXXXXXXXXXXXXXXXXXXXXXXX a1.sources.r1.accessTokenSecret =XXXXXXXXXXXXXXXXXXXXXXXXXXXX a1.sources.r1.keywords = bigdata, healthcare, oozie # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.192.128:8020/hdp/apps/2.2.0.0-2041/flume/twitter a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.inUsePrefix = _ a1.sinks.k1.hdfs.fileSuffix = .avro # added for invalid block size error a1.sinks.k1.serializer = avro_event #a1.sinks.k1.deserializer.schemaType = LITERAL # added for exception java.io.IOException:org.apache.avro.AvroTypeException: Found Event, expecting Doc #a1.sinks.k1.serializer.compressionCodec = snappy a1.sinks.k1.hdfs.batchSize = 1000 a1.sinks.k1.hdfs.rollSize = 67108864 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollInterval = 30 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

2) Derivar el esquema del archivo de datos avro, no tengo idea de por qué el esquema derivado del archivo de datos avro solo tiene dos columnas encabezado y cuerpo:

java -jar avro-tools-1.7.7.jar getschema FlumeData.14315982 30978.avro { "type" : "record", "name" : "Event", "fields" : [ { "name" : "headers", "type" : { "type" : "map", "values" : "string" } }, { "name" : "body", "type" : "bytes" } ] }

3) Ejecute el agente anterior y obtenga los datos en HDFS, descubra el esquema de los datos avro y cree una tabla Hive como:

CREATE EXTERNAL TABLE TwitterData ROW FORMAT SERDE ''org.apache.hadoop.hive.serde2.avro.AvroSerDe'' WITH SERDEPROPERTIES (''avro.schema.literal''='' { "type" : "record", "name" : "Event", "fields" : [ { "name" : "headers", "type" : { "type" : "map", "values" : "string" } }, { "name" : "body", "type" : "bytes" } ] } '') STORED AS INPUTFORMAT ''org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'' OUTPUTFORMAT ''org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'' LOCATION ''hdfs://192.168.192.128:8020/hdp/apps/2.2.0.0-2041/flume/twitter'' ;

4) Describir la tabla de la colmena:

hive> describe twitterdata; OK headers map<string,string> from deserializer body binary from deserializer Time taken: 0.472 seconds, Fetched: 2 row(s)

5) Consultar la tabla: cuando consulto la tabla, veo los datos binarios en la columna ''cuerpo'' y la información del esquema real en la columna ''encabezado''.

select * from twitterdata limit 1; OK {"type":"record","name":"Doc","doc":"adoc","fields":[{"name":"id","type":"string"},{"name":"user_friends_count","type":["int","null"]},{"name":"user_location","type":["string","null"]},{"name":"user_description","type":["string","null"]},{"name":"user_statuses_count","type":["int","null"]},{"name":"user_followers_count","type":["int","null"]},{"name":"user_name","type":["string","null"]},{"name":"user_screen_name","type":["string","null"]},{"name":"created_at","type":["string","null"]},{"name":"text","type":["string","null"]},{"name":"retweet_count","type":["long","null"]},{"name":"retweeted","type":["boolean","null"]},{"name":"in_reply_to_user_id","type":["long","null"]},{"name":"source","type":["string","null"]},{"name":"in_reply_to_status_id","type":["long","null"]},{"name":"media_url_https","type":["string","null"]},{"name":"expanded_url","type":["string","null"]}]}�1|$���)]''��G�$598792495703543808�Bあいたぁぁぁぁぁぁぁ!�~�ゆっけ0725Yukken(2015-05-14T10:10:30Z<ん?なんか意味違うわ�<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>�1|$���)]''�� Time taken: 2.24 seconds, Fetched: 1 row(s)

¿Cómo creo una tabla de sección con todas las columnas en el esquema actual como se muestra en la columna ''encabezado''. Me refiero a todas las columnas como user_location, user_description, user_friends_count, user_description, user_statuses_count?

¿No debería el esquema derivado del archivo de datos avro contener más columnas?

¿Hay algún problema con la fuente de flume-avro que utilicé en el agente de flujo (org.apache.flume.source.twitter.TwitterSource)?

Gracias por leer todo ..

Gracias Farrukh, he hecho que el error fue la configuración ''a1.sinks.k1.serializer = avro_event'', cambié esto a ''a1.sinks.k1.serializer = text'', y pude cargar los datos en Hive . Pero ahora el problema es recuperar los datos de Hive, obtengo el siguiente error al hacerlo:

hive> describe twitterdata_09062015; OK id string from deserializer user_friends_count int from deserializer user_location string from deserializer user_description string from deserializer user_statuses_count int from deserializer user_followers_count int from deserializer user_name string from deserializer user_screen_name string from deserializer created_at string from deserializer text string from deserializer retweet_count bigint from deserializer retweeted boolean from deserializer in_reply_to_user_id bigint from deserializer source string from deserializer in_reply_to_status_id bigint from deserializer media_url_https string from deserializer expanded_url string from deserializer select count(1) as num_rows from TwitterData_09062015; Query ID = root_20150609130404_10ef21db-705a-4e94-92b7-eaa58226ee2e Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1433857038961_0003, Tracking URL = http://sandbox.hortonworks.com:8088/proxy/application_14338570 38961_0003/ Kill Command = /usr/hdp/2.2.0.0-2041/hadoop/bin/hadoop job -kill job_1433857038961_0003 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 * 13:04:36,856 Stage-1 map = 0%, reduce = 0% * 13:05:09,576 Stage-1 map = 100%, reduce = 100% Ended Job = job_1433857038961_0003 with errors Error during job, obtaining debugging information... Examining task ID: task_1433857038961_0003_m_000000 (and more) from job job_1433857038961_0003 Task with the most failures(4): Task ID: task_1433857038961_0003_m_000000 URL: http://sandbox.hortonworks.com:8088/taskdetails.jsp?jobid=job_1433857038961_0003&tipid=task_1433857038961_0003_m_0 00000 Diagnostic Messages for this Task: Error: java.io.IOException: java.io.IOException: org.apache.avro.AvroRuntimeException: java.io.IOException: Block si ze invalid or too large for this implementation: -40 at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHand lerChain.java:121)


Aquí hay un proceso paso a paso que solía descargar tweets y cargarlos en la colmena

Agente de flujo

##TwitterAgent for collecting Twitter data to Hadoop HDFS ##### TwitterAgent.sources = Twitter TwitterAgent.channels = FileChannel TwitterAgent.sinks = HDFS TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource TwitterAgent.sources.Twitter.channels = FileChannel TwitterAgent.sources.Twitter.consumerKey = ************* TwitterAgent.sources.Twitter.consumerSecret = ********** TwitterAgent.sources.Twitter.accessToken = ************ TwitterAgent.sources.Twitter.accessTokenSecret = *********** TwitterAgent.sources.Twitter.maxBatchSize = 50000 TwitterAgent.sources.Twitter.maxBatchDurationMillis = 100000 TwitterAgent.sources.Twitter.keywords = Apache, Hadoop, Mapreduce, hadooptutorial, Hive, Hbase, MySql TwitterAgent.sinks.HDFS.channel = FileChannel TwitterAgent.sinks.HDFS.type = hdfs TwitterAgent.sinks.HDFS.hdfs.path = hdfs://nn1.itbeams.com:9000/user/flume/tweets/avrotweets TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream # you do not need to mentioned avro format here. just mention Text TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text TwitterAgent.sinks.HDFS.hdfs.batchSize = 200000 TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 TwitterAgent.sinks.HDFS.hdfs.rollCount = 2000000 TwitterAgent.channels.FileChannel.type = file TwitterAgent.channels.FileChannel.checkpointDir = /var/log/flume/checkpoint/ TwitterAgent.channels.FileChannel.dataDirs = /var/log/flume/data/

Creé el esquema avro en el archivo avsc. Una vez que haya creado, coloque este archivo en hadoop en su carpeta de usuario como / user / youruser /.

{"type":"record", "name":"Doc", "doc":"adoc", "fields":[{"name":"id","type":"string"}, {"name":"user_friends_count","type":["int","null"]}, {"name":"user_location","type":["string","null"]}, {"name":"user_description","type":["string","null"]}, {"name":"user_statuses_count","type":["int","null"]}, {"name":"user_followers_count","type":["int","null"]}, {"name":"user_name","type":["string","null"]}, {"name":"user_screen_name","type":["string","null"]}, {"name":"created_at","type":["string","null"]}, {"name":"text","type":["string","null"]}, {"name":"retweet_count","type":["long","null"]}, {"name":"retweeted","type":["boolean","null"]}, {"name":"in_reply_to_user_id","type":["long","null"]}, {"name":"source","type":["string","null"]}, {"name":"in_reply_to_status_id","type":["long","null"]}, {"name":"media_url_https","type":["string","null"]}, {"name":"expanded_url","type":["string","null"]}

Tweets cargados en la tabla de la colmena. Si guarda el código en el archivo hql, sería genial.

CREATE TABLE tweetsavro ROW FORMAT SERDE ''org.apache.hadoop.hive.serde2.avro.AvroSerDe'' STORED AS INPUTFORMAT ''org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'' OUTPUTFORMAT ''org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'' TBLPROPERTIES (''avro.schema.url''=''hdfs:///user/youruser/examples/schema/twitteravroschema.avsc'') ; LOAD DATA INPATH ''/user/flume/tweets/avrotweets/FlumeData.*'' OVERWRITE INTO TABLE tweetsavro;

tweetsavro mesa en colmena

hive> describe tweetsavro; OK id string from deserializer user_friends_count int from deserializer user_location string from deserializer user_description string from deserializer user_statuses_count int from deserializer user_followers_count int from deserializer user_name string from deserializer user_screen_name string from deserializer created_at string from deserializer text string from deserializer retweet_count bigint from deserializer retweeted boolean from deserializer in_reply_to_user_id bigint from deserializer source string from deserializer in_reply_to_status_id bigint from deserializer media_url_https string from deserializer expanded_url string from deserializer Time taken: 0.6 seconds, Fetched: 17 row(s)