structtype spark saveastable read python mapreduce apache-spark

python - saveastable - PySpark Drop Rows



structtype pyspark (6)

¿Cómo se eliminan las filas de un RDD en PySpark? Particularmente la primera fila, ya que tiende a contener nombres de columna en mis conjuntos de datos. Al leer la API, parece que no puedo encontrar una manera fácil de hacer esto. Por supuesto, podría hacerlo a través de Bash / HDFS, pero solo quiero saber si esto se puede hacer desde PySpark.


AFAIK no hay una manera ''fácil'' de hacer esto.

Esto debería hacer el truco, sin embargo:

val header = data.first val rows = data.filter(line => line != header)


Específico para PySpark:

Según @maasg, puedes hacer esto:

header = rdd.first() rdd.filter(lambda line: line != header)

pero no es técnicamente correcto, ya que es posible que excluyas las líneas que contienen datos, así como el encabezado. Sin embargo, esto parece funcionar para mí:

def remove_header(itr_index, itr): return iter(list(itr)[1:]) if itr_index == 0 else itr rdd.mapPartitionsWithIndex(remove_header)

Similar:

rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])

Soy nuevo en Spark, así que no puedo comentar inteligentemente cuál será el más rápido.


He probado con spark2.1. Supongamos que desea eliminar las primeras 14 filas sin saber el número de columnas que tiene el archivo.

sc = spark.sparkContext lines = sc.textFile("s3://location_of_csv") parts = lines.map(lambda l: l.split(",")) parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn es una función df. Por lo tanto, a continuación no funcionará en el estilo RDD como se usa en el caso anterior.

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)


Hice algunos perfiles con varias soluciones y tengo los siguientes

Configuración de cluster

Agrupaciones

  • Cluster 1: 4 Cores 16 GB
  • Cluster 2: 4 Cores 16 GB
  • Cluster 3: 4 Cores 16 GB
  • Cluster 4: 2 Cores 8 GB

Datos

7 millones de filas, 4 columnas

#Solution 1 # Time Taken : 40 ms data=sc.TextFile(''file1.txt'') firstRow=data.first() data=data.filter(lambda row:row != firstRow) #Solution 2 # Time Taken : 3 seconds data=sc.TextFile(''file1.txt'') def dropFirstRow(index,iterator): return iter(list(iterator)[1:]) if index==0 else iterator data=data.mapPartitionsWithIndex(dropFirstRow) #Solution 3 # Time Taken : 0.3 seconds data=sc.TextFile(''file1.txt'') def dropFirstRow(index,iterator): if(index==0): for subIndex,item in enumerate(iterator): if subIndex > 0: yield item else: yield iterator data=data.mapPartitionsWithIndex(dropFirstRow)

Creo que la Solución 3 es la más escalable.


Personalmente, creo que usar un filtro para deshacerse de estas cosas es la forma más fácil. Pero por tu comentario tengo otro enfoque. Aproveche el RDD para que cada partición sea una matriz (supongo que tiene 1 archivo por partición, y cada archivo tiene la fila ofensiva en la parte superior) y luego solo omita el primer elemento (esto es con la api de Scala).

data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index

Tenga en cuenta que una de las grandes características de los RDD es que son inmutables, por lo que, naturalmente, eliminar una fila es algo difícil de hacer.

ACTUALIZACIÓN: Mejor solución.
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
Igual que el glom, pero no tiene la sobrecarga de poner todo en una matriz, ya que x es un iterador en este caso


Una forma sencilla de lograr esto en PySpark (API de Python), asumiendo que está utilizando Python 3:

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()