apache-spark pyspark spark-streaming

apache spark - leyendo el archivo json en pyspark



apache-spark spark-streaming (2)

Soy nuevo en PySpark, a continuación se muestra mi formato de archivo JSON de kafka.

{ "header": { "platform":"atm", "version":"2.0" } "details":[ { "abc":"3", "def":"4" }, { "abc":"5", "def":"6" }, { "abc":"7", "def":"8" } ] }

¿Cómo puedo leer los detalles de todos los "abc" "def" en detalles y agregar esto a una nueva lista como esta [(1,2),(3,4),(5,6),(7,8)] . La nueva lista se usará para crear un marco de datos de chispa. ¿Cómo puedo hacer esto en pyspark? Probé el siguiente código.

parsed = messages.map(lambda (k,v): json.loads(v)) list = [] summed = parsed.map(lambda detail:list.append((String([''mcc'']), String([''mid'']), String([''dsrc''])))) output = summed.collect() print output

Produce el error ''demasiados valores para descomprimir''

Mensaje de error a continuación en la declaración summed.collect ()

16/09/12 12:46:10 INFO deprecation: mapred.task.is.map está en desuso. En su lugar, use mapreduce.task.ismap 16/09/12 12:46:10 INFO deprecation: mapred.task.partition está en desuso. En su lugar, use mapreduce.task.partition 16/09/12 12:46:10 INFO deprecation: mapred.job.id está en desuso. En su lugar, use mapreduce.job.id 16/09/12 12:46:10 ERROR Executor: Excepción en la tarea 1.0 en la etapa 0.0 (TID 1) org.apache.spark.api.python.PythonException: Traceback (la última llamada fue la última ): Archivo "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", línea 111, en el proceso principal () Archivo "/usr/hdp/2.3.4.0 -3485 / spark / python / lib / pyspark.zip / pyspark / worker.py ", línea 106, en proceso serializer.dump_stream (func (split_index, iterator), archivo de salida) File" /usr/hdp/2.3.4.0-3485 /spark/python/lib/pyspark.zip/pyspark/serializers.py ", línea 263, en dump_stream vs = list (itertools.islice (iterator, batch)) File" ", línea 1, en ValueError: demasiados valores para deshacer


De acuerdo con la información en los comentarios, cada fila en los mensajes RDD contiene una línea del archivo json

u''{'', u'' "header": {'', u'' "platform":"atm",''

Su código está fallando en la siguiente línea:

parsed = messages.map(lambda (k,v): json.loads(v))

Su código toma una línea como: ''{'' y trata de convertirla en clave, valor y ejecutar json.loads (valor)

Está claro que python / spark no podrá dividir un char ''{'' en el par clave-valor.

El comando json.loads () debe ejecutarse en un objeto de datos json completo

Esta tarea específica podría lograrse más fácilmente con python puro


En primer lugar, el json no es válido. Después del encabezado a , falta.

Dicho eso, tomemos esto json:

{"header":{"platform":"atm","version":"2.0"},"details":[{"abc":"3","def":"4"},{"abc":"5","def":"6"},{"abc":"7","def":"8"}]}

Esto puede ser procesado por:

>>> df = sqlContext.jsonFile(''test.json'') >>> df.first() Row(details=[Row(abc=''3'', def=''4''), Row(abc=''5'', def=''6''), Row(abc=''7'', def=''8'')], header=Row(platform=''atm'', version=''2.0'')) >>> df = df.flatMap(lambda row: row[''details'']) PythonRDD[38] at RDD at PythonRDD.scala:43 >>> df.collect() [Row(abc=''3'', def=''4''), Row(abc=''5'', def=''6''), Row(abc=''7'', def=''8'')] >>> df.map(lambda entry: (int(entry[''abc'']), int(entry[''def'']))).collect() [(3, 4), (5, 6), (7, 8)]

¡Espero que esto ayude!