apache spark - leyendo el archivo json en pyspark
apache-spark spark-streaming (2)
Soy nuevo en PySpark, a continuación se muestra mi formato de archivo JSON de kafka.
{
"header": {
"platform":"atm",
"version":"2.0"
}
"details":[
{
"abc":"3",
"def":"4"
},
{
"abc":"5",
"def":"6"
},
{
"abc":"7",
"def":"8"
}
]
}
¿Cómo puedo leer los detalles de todos los "abc"
"def"
en detalles y agregar esto a una nueva lista como esta [(1,2),(3,4),(5,6),(7,8)]
. La nueva lista se usará para crear un marco de datos de chispa. ¿Cómo puedo hacer esto en pyspark? Probé el siguiente código.
parsed = messages.map(lambda (k,v): json.loads(v))
list = []
summed = parsed.map(lambda detail:list.append((String([''mcc'']), String([''mid'']), String([''dsrc'']))))
output = summed.collect()
print output
Produce el error ''demasiados valores para descomprimir''
Mensaje de error a continuación en la declaración summed.collect ()
16/09/12 12:46:10 INFO deprecation: mapred.task.is.map está en desuso. En su lugar, use mapreduce.task.ismap 16/09/12 12:46:10 INFO deprecation: mapred.task.partition está en desuso. En su lugar, use mapreduce.task.partition 16/09/12 12:46:10 INFO deprecation: mapred.job.id está en desuso. En su lugar, use mapreduce.job.id 16/09/12 12:46:10 ERROR Executor: Excepción en la tarea 1.0 en la etapa 0.0 (TID 1) org.apache.spark.api.python.PythonException: Traceback (la última llamada fue la última ): Archivo "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", línea 111, en el proceso principal () Archivo "/usr/hdp/2.3.4.0 -3485 / spark / python / lib / pyspark.zip / pyspark / worker.py ", línea 106, en proceso serializer.dump_stream (func (split_index, iterator), archivo de salida) File" /usr/hdp/2.3.4.0-3485 /spark/python/lib/pyspark.zip/pyspark/serializers.py ", línea 263, en dump_stream vs = list (itertools.islice (iterator, batch)) File" ", línea 1, en ValueError: demasiados valores para deshacer
De acuerdo con la información en los comentarios, cada fila en los mensajes RDD contiene una línea del archivo json
u''{'',
u'' "header": {'',
u'' "platform":"atm",''
Su código está fallando en la siguiente línea:
parsed = messages.map(lambda (k,v): json.loads(v))
Su código toma una línea como: ''{'' y trata de convertirla en clave, valor y ejecutar json.loads (valor)
Está claro que python / spark no podrá dividir un char ''{'' en el par clave-valor.
El comando json.loads () debe ejecutarse en un objeto de datos json completo
Esta tarea específica podría lograrse más fácilmente con python puro
En primer lugar, el json no es válido. Después del encabezado a ,
falta.
Dicho eso, tomemos esto json:
{"header":{"platform":"atm","version":"2.0"},"details":[{"abc":"3","def":"4"},{"abc":"5","def":"6"},{"abc":"7","def":"8"}]}
Esto puede ser procesado por:
>>> df = sqlContext.jsonFile(''test.json'')
>>> df.first()
Row(details=[Row(abc=''3'', def=''4''), Row(abc=''5'', def=''6''), Row(abc=''7'', def=''8'')], header=Row(platform=''atm'', version=''2.0''))
>>> df = df.flatMap(lambda row: row[''details''])
PythonRDD[38] at RDD at PythonRDD.scala:43
>>> df.collect()
[Row(abc=''3'', def=''4''), Row(abc=''5'', def=''6''), Row(abc=''7'', def=''8'')]
>>> df.map(lambda entry: (int(entry[''abc'']), int(entry[''def'']))).collect()
[(3, 4), (5, 6), (7, 8)]
¡Espero que esto ayude!