hadoop - ecocolmena - campo y abejas radio web
No se puede cargar correctamente los datos de avro de twitter en la tabla de colmenas (1)
¡Necesitamos su ayuda!
Intento un ejercicio trivial para obtener los datos de Twitter y luego cargarlos en Hive para su análisis. Aunque puedo obtener datos en HDFS usando canal (usando Twitter 1% Firehose Source) y también puedo cargar los datos en la tabla Hive.
Pero no pude ver todas las columnas que esperaba que estuvieran allí en los datos de Twitter, como user_location, user_description, user_friends_count, user_description, user_statuses_count. El esquema derivado de Avro solo contiene dos columnas, encabezado y cuerpo.
Debajo están los pasos que he hecho:
1) crear un agente de canalización con la siguiente conf:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type =org.apache.flume.source.twitter.TwitterSource
#a1.sources.r1.type = com.cloudera.flume.source.TwitterSource
a1.sources.r1.consumerKey =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.consumerSecret =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.accessToken =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.accessTokenSecret =XXXXXXXXXXXXXXXXXXXXXXXXXXXX
a1.sources.r1.keywords = bigdata, healthcare, oozie
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.192.128:8020/hdp/apps/2.2.0.0-2041/flume/twitter
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.inUsePrefix = _
a1.sinks.k1.hdfs.fileSuffix = .avro
# added for invalid block size error
a1.sinks.k1.serializer = avro_event
#a1.sinks.k1.deserializer.schemaType = LITERAL
# added for exception java.io.IOException:org.apache.avro.AvroTypeException: Found Event, expecting Doc
#a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.rollSize = 67108864
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 30
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) Derivar el esquema del archivo de datos avro, no tengo idea de por qué el esquema derivado del archivo de datos avro solo tiene dos columnas encabezado y cuerpo:
java -jar avro-tools-1.7.7.jar getschema FlumeData.14315982 30978.avro
{
"type" : "record",
"name" : "Event",
"fields" : [ {
"name" : "headers",
"type" : {
"type" : "map",
"values" : "string"
}
}, {
"name" : "body",
"type" : "bytes"
} ]
}
3) Ejecute el agente anterior y obtenga los datos en HDFS, descubra el esquema de los datos avro y cree una tabla Hive como:
CREATE EXTERNAL TABLE TwitterData
ROW FORMAT SERDE ''org.apache.hadoop.hive.serde2.avro.AvroSerDe''
WITH SERDEPROPERTIES (''avro.schema.literal''=''
{
"type" : "record",
"name" : "Event",
"fields" : [ {
"name" : "headers",
"type" : {
"type" : "map",
"values" : "string"
}
}, {
"name" : "body",
"type" : "bytes"
} ]
}
'')
STORED AS
INPUTFORMAT ''org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat''
OUTPUTFORMAT ''org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat''
LOCATION ''hdfs://192.168.192.128:8020/hdp/apps/2.2.0.0-2041/flume/twitter''
;
4) Describir la tabla de la colmena:
hive> describe twitterdata;
OK
headers map<string,string> from deserializer
body binary from deserializer
Time taken: 0.472 seconds, Fetched: 2 row(s)
5) Consultar la tabla: cuando consulto la tabla, veo los datos binarios en la columna ''cuerpo'' y la información del esquema real en la columna ''encabezado''.
select * from twitterdata limit 1;
OK
{"type":"record","name":"Doc","doc":"adoc","fields":[{"name":"id","type":"string"},{"name":"user_friends_count","type":["int","null"]},{"name":"user_location","type":["string","null"]},{"name":"user_description","type":["string","null"]},{"name":"user_statuses_count","type":["int","null"]},{"name":"user_followers_count","type":["int","null"]},{"name":"user_name","type":["string","null"]},{"name":"user_screen_name","type":["string","null"]},{"name":"created_at","type":["string","null"]},{"name":"text","type":["string","null"]},{"name":"retweet_count","type":["long","null"]},{"name":"retweeted","type":["boolean","null"]},{"name":"in_reply_to_user_id","type":["long","null"]},{"name":"source","type":["string","null"]},{"name":"in_reply_to_status_id","type":["long","null"]},{"name":"media_url_https","type":["string","null"]},{"name":"expanded_url","type":["string","null"]}]}�1|$���)]''��G�$598792495703543808�Bあいたぁぁぁぁぁぁぁ!�~�ゆっけ0725Yukken(2015-05-14T10:10:30Z<ん?なんか意味違うわ�<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>�1|$���)]''��
Time taken: 2.24 seconds, Fetched: 1 row(s)
¿Cómo creo una tabla de sección con todas las columnas en el esquema actual como se muestra en la columna ''encabezado''. Me refiero a todas las columnas como user_location, user_description, user_friends_count, user_description, user_statuses_count?
¿No debería el esquema derivado del archivo de datos avro contener más columnas?
¿Hay algún problema con la fuente de flume-avro que utilicé en el agente de flujo (org.apache.flume.source.twitter.TwitterSource)?
Gracias por leer todo ..
Gracias Farrukh, he hecho que el error fue la configuración ''a1.sinks.k1.serializer = avro_event'', cambié esto a ''a1.sinks.k1.serializer = text'', y pude cargar los datos en Hive . Pero ahora el problema es recuperar los datos de Hive, obtengo el siguiente error al hacerlo:
hive> describe twitterdata_09062015;
OK
id string from deserializer
user_friends_count int from deserializer
user_location string from deserializer
user_description string from deserializer
user_statuses_count int from deserializer
user_followers_count int from deserializer
user_name string from deserializer
user_screen_name string from deserializer
created_at string from deserializer
text string from deserializer
retweet_count bigint from deserializer
retweeted boolean from deserializer
in_reply_to_user_id bigint from deserializer
source string from deserializer
in_reply_to_status_id bigint from deserializer
media_url_https string from deserializer
expanded_url string from deserializer
select count(1) as num_rows from TwitterData_09062015;
Query ID = root_20150609130404_10ef21db-705a-4e94-92b7-eaa58226ee2e
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1433857038961_0003, Tracking URL = http://sandbox.hortonworks.com:8088/proxy/application_14338570 38961_0003/
Kill Command = /usr/hdp/2.2.0.0-2041/hadoop/bin/hadoop job -kill job_1433857038961_0003
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
* 13:04:36,856 Stage-1 map = 0%, reduce = 0%
* 13:05:09,576 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1433857038961_0003 with errors
Error during job, obtaining debugging information...
Examining task ID: task_1433857038961_0003_m_000000 (and more) from job job_1433857038961_0003
Task with the most failures(4):
Task ID:
task_1433857038961_0003_m_000000
URL:
http://sandbox.hortonworks.com:8088/taskdetails.jsp?jobid=job_1433857038961_0003&tipid=task_1433857038961_0003_m_0 00000
Diagnostic Messages for this Task:
Error: java.io.IOException: java.io.IOException: org.apache.avro.AvroRuntimeException: java.io.IOException: Block si ze invalid or too large for this implementation: -40
at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHand lerChain.java:121)
Aquí hay un proceso paso a paso que solía descargar tweets y cargarlos en la colmena
Agente de flujo
##TwitterAgent for collecting Twitter data to Hadoop HDFS #####
TwitterAgent.sources = Twitter
TwitterAgent.channels = FileChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = FileChannel
TwitterAgent.sources.Twitter.consumerKey = *************
TwitterAgent.sources.Twitter.consumerSecret = **********
TwitterAgent.sources.Twitter.accessToken = ************
TwitterAgent.sources.Twitter.accessTokenSecret = ***********
TwitterAgent.sources.Twitter.maxBatchSize = 50000
TwitterAgent.sources.Twitter.maxBatchDurationMillis = 100000
TwitterAgent.sources.Twitter.keywords = Apache, Hadoop, Mapreduce, hadooptutorial, Hive, Hbase, MySql
TwitterAgent.sinks.HDFS.channel = FileChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://nn1.itbeams.com:9000/user/flume/tweets/avrotweets
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
# you do not need to mentioned avro format here. just mention Text
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 200000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 2000000
TwitterAgent.channels.FileChannel.type = file
TwitterAgent.channels.FileChannel.checkpointDir = /var/log/flume/checkpoint/
TwitterAgent.channels.FileChannel.dataDirs = /var/log/flume/data/
Creé el esquema avro en el archivo avsc. Una vez que haya creado, coloque este archivo en hadoop en su carpeta de usuario como / user / youruser /.
{"type":"record",
"name":"Doc",
"doc":"adoc",
"fields":[{"name":"id","type":"string"},
{"name":"user_friends_count","type":["int","null"]},
{"name":"user_location","type":["string","null"]},
{"name":"user_description","type":["string","null"]},
{"name":"user_statuses_count","type":["int","null"]},
{"name":"user_followers_count","type":["int","null"]},
{"name":"user_name","type":["string","null"]},
{"name":"user_screen_name","type":["string","null"]},
{"name":"created_at","type":["string","null"]},
{"name":"text","type":["string","null"]},
{"name":"retweet_count","type":["long","null"]},
{"name":"retweeted","type":["boolean","null"]},
{"name":"in_reply_to_user_id","type":["long","null"]},
{"name":"source","type":["string","null"]},
{"name":"in_reply_to_status_id","type":["long","null"]},
{"name":"media_url_https","type":["string","null"]},
{"name":"expanded_url","type":["string","null"]}
Tweets cargados en la tabla de la colmena. Si guarda el código en el archivo hql, sería genial.
CREATE TABLE tweetsavro
ROW FORMAT SERDE
''org.apache.hadoop.hive.serde2.avro.AvroSerDe''
STORED AS INPUTFORMAT
''org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat''
OUTPUTFORMAT
''org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat''
TBLPROPERTIES (''avro.schema.url''=''hdfs:///user/youruser/examples/schema/twitteravroschema.avsc'') ;
LOAD DATA INPATH ''/user/flume/tweets/avrotweets/FlumeData.*'' OVERWRITE INTO TABLE tweetsavro;
tweetsavro mesa en colmena
hive> describe tweetsavro;
OK
id string from deserializer
user_friends_count int from deserializer
user_location string from deserializer
user_description string from deserializer
user_statuses_count int from deserializer
user_followers_count int from deserializer
user_name string from deserializer
user_screen_name string from deserializer
created_at string from deserializer
text string from deserializer
retweet_count bigint from deserializer
retweeted boolean from deserializer
in_reply_to_user_id bigint from deserializer
source string from deserializer
in_reply_to_status_id bigint from deserializer
media_url_https string from deserializer
expanded_url string from deserializer
Time taken: 0.6 seconds, Fetched: 17 row(s)