spark read functions example scala apache-spark apache-spark-sql spark-dataframe

scala - read - spark streaming



cómo filtrar un valor nulo de la base de datos de chispa (9)

Aquí hay una solución para chispa en Java. Para seleccionar filas de datos que contengan nulos. Cuando tienes datos del conjunto de datos, haces:

Dataset<Row> containingNulls = data.where(data.col("COLUMN_NAME").isNull())

Para filtrar los datos sin nulos lo haces:

Dataset<Row> withoutNulls = data.where(data.col("COLUMN_NAME").isNotNull())

A menudo, los marcos de datos contienen columnas de tipo Cadena donde en lugar de nulos tenemos cadenas vacías como "". Para filtrar tales datos también lo hacemos:

Dataset<Row> withoutNullsAndEmpty = data.where(data.col("COLUMN_NAME").isNotNull().and(data.col("COLUMN_NAME").notEqual("")))

Creé un marco de datos en chispa con el siguiente esquema:

root |-- user_id: long (nullable = false) |-- event_id: long (nullable = false) |-- invited: integer (nullable = false) |-- day_diff: long (nullable = true) |-- interested: integer (nullable = false) |-- event_owner: long (nullable = false) |-- friend_id: long (nullable = false)

Y los datos se muestran a continuación:

+----------+----------+-------+--------+----------+-----------+---------+ | user_id| event_id|invited|day_diff|interested|event_owner|friend_id| +----------+----------+-------+--------+----------+-----------+---------+ | 4236494| 110357109| 0| -1| 0| 937597069| null| | 78065188| 498404626| 0| 0| 0| 2904922087| null| | 282487230|2520855981| 0| 28| 0| 3749735525| null| | 335269852|1641491432| 0| 2| 0| 1490350911| null| | 437050836|1238456614| 0| 2| 0| 991277599| null| | 447244169|2095085551| 0| -1| 0| 1579858878| null| | 516353916|1076364848| 0| 3| 1| 3597645735| null| | 528218683|1151525474| 0| 1| 0| 3433080956| null| | 531967718|3632072502| 0| 1| 0| 3863085861| null| | 627948360|2823119321| 0| 0| 0| 4092665803| null| | 811791433|3513954032| 0| 2| 0| 415464198| null| | 830686203| 99027353| 0| 0| 0| 3549822604| null| |1008893291|1115453150| 0| 2| 0| 2245155244| null| |1239364869|2824096896| 0| 2| 1| 2579294650| null| |1287950172|1076364848| 0| 0| 0| 3597645735| null| |1345896548|2658555390| 0| 1| 0| 2025118823| null| |1354205322|2564682277| 0| 3| 0| 2563033185| null| |1408344828|1255629030| 0| -1| 1| 804901063| null| |1452633375|1334001859| 0| 4| 0| 1488588320| null| |1625052108|3297535757| 0| 3| 0| 1972598895| null| +----------+----------+-------+--------+----------+-----------+---------+

Quiero filtrar las filas que tienen valores nulos en el campo de "friend_id".

scala> val aaa = test.filter("friend_id is null") scala> aaa.count

Tengo: res52: Long = 0 que es obvio no está bien. ¿Cuál es la forma correcta de conseguirlo?

Una pregunta más, quiero reemplazar los valores en el campo friend_id. Quiero reemplazar nulo por 0 y 1 por cualquier otro valor, excepto nulo. El código que puedo averiguar es:

val aaa = train_friend_join.select($"user_id", $"event_id", $"invited", $"day_diff", $"interested", $"event_owner", ($"friend_id" != null)?1:0)

Este código tampoco funciona. ¿Alguien puede decirme cómo puedo arreglarlo? Gracias


De la pista de Michael Kopaniov, debajo de las obras.

df.where(df("id").isNotNull).show


Digamos que tiene esta configuración de datos (para que los resultados sean reproducibles):

// declaring data types case class Company(cName: String, cId: String, details: String) case class Employee(name: String, id: String, email: String, company: Company) // setting up example data val e1 = Employee("n1", null, "[email protected]", Company("c1", "1", "d1")) val e2 = Employee("n2", "2", "[email protected]", Company("c1", "1", "d1")) val e3 = Employee("n3", "3", "[email protected]", Company("c1", "1", "d1")) val e4 = Employee("n4", "4", "[email protected]", Company("c2", "2", "d2")) val e5 = Employee("n5", null, "[email protected]", Company("c2", "2", "d2")) val e6 = Employee("n6", "6", "[email protected]", Company("c2", "2", "d2")) val e7 = Employee("n7", "7", "[email protected]", Company("c3", "3", "d3")) val e8 = Employee("n8", "8", "[email protected]", Company("c3", "3", "d3")) val employees = Seq(e1, e2, e3, e4, e5, e6, e7, e8) val df = sc.parallelize(employees).toDF

Los datos son:

+----+----+---------+---------+ |name| id| email| company| +----+----+---------+---------+ | n1|null|[email protected]|[c1,1,d1]| | n2| 2|[email protected]|[c1,1,d1]| | n3| 3|[email protected]|[c1,1,d1]| | n4| 4|[email protected]|[c2,2,d2]| | n5|null|[email protected]|[c2,2,d2]| | n6| 6|[email protected]|[c2,2,d2]| | n7| 7|[email protected]|[c3,3,d3]| | n8| 8|[email protected]|[c3,3,d3]| +----+----+---------+---------+

Ahora para filtrar empleados con identificaciones null , harás -

df.filter("id is null").show

que te mostrará correctamente lo siguiente:

+----+----+---------+---------+ |name| id| email| company| +----+----+---------+---------+ | n1|null|[email protected]|[c1,1,d1]| | n5|null|[email protected]|[c2,2,d2]| +----+----+---------+---------+

Llegando a la segunda parte de su pregunta, puede reemplazar los identificadores null con 0 y otros valores con 1 con esto:

df.withColumn("id", when($"id".isNull, 0).otherwise(1)).show

Esto resulta en:

+----+---+---------+---------+ |name| id| email| company| +----+---+---------+---------+ | n1| 0|[email protected]|[c1,1,d1]| | n2| 1|[email protected]|[c1,1,d1]| | n3| 1|[email protected]|[c1,1,d1]| | n4| 1|[email protected]|[c2,2,d2]| | n5| 0|[email protected]|[c2,2,d2]| | n6| 1|[email protected]|[c2,2,d2]| | n7| 1|[email protected]|[c3,3,d3]| | n8| 1|[email protected]|[c3,3,d3]| +----+---+---------+---------+


Encontré una buena solución para eliminar las filas con cualquier valor nulo:

Dataset<Row> filtered = df.filter(row -> !row.anyNull());

En caso de que uno esté interesado en el otro caso, simplemente llame a row.anyNull() . (Spark 2.1.0 usando la API de Java)


Hay dos formas de hacerlo: creando una condición de filtro 1) Manualmente 2) Dinámicamente.

Ejemplo de marco de datos:

val df = spark.createDataFrame(Seq( (0, "a1", "b1", "c1", "d1"), (1, "a2", "b2", "c2", "d2"), (2, "a3", "b3", null, "d3"), (3, "a4", null, "c4", "d4"), (4, null, "b5", "c5", "d5") )).toDF("id", "col1", "col2", "col3", "col4") +---+----+----+----+----+ | id|col1|col2|col3|col4| +---+----+----+----+----+ | 0| a1| b1| c1| d1| | 1| a2| b2| c2| d2| | 2| a3| b3|null| d3| | 3| a4|null| c4| d4| | 4|null| b5| c5| d5| +---+----+----+----+----+

1) Crear la condición del filtro manualmente, es decir, usar DataFrame where o función de filter

df.filter(col("col1").isNotNull && col("col2").isNotNull).show

o

df.where("col1 is not null and col2 is not null").show

Resultado:

+---+----+----+----+----+ | id|col1|col2|col3|col4| +---+----+----+----+----+ | 0| a1| b1| c1| d1| | 1| a2| b2| c2| d2| | 2| a3| b3|null| d3| +---+----+----+----+----+

2) Crear una condición de filtro dinámicamente : esto es útil cuando no queremos que ninguna columna tenga un valor nulo y haya un gran número de columnas, que es principalmente el caso.

Para crear la condición del filtro manualmente en estos casos perderá mucho tiempo. En el siguiente código, estamos incluyendo todas las columnas dinámicamente usando la función de map y reduce en las columnas de DataFrame:

val filterCond = df.columns.map(x=>col(x).isNotNull).reduce(_ && _)

Cómo se ve filterCond :

filterCond: org.apache.spark.sql.Column = (((((id IS NOT NULL) AND (col1 IS NOT NULL)) AND (col2 IS NOT NULL)) AND (col3 IS NOT NULL)) AND (col4 IS NOT NULL))

Filtración:

val filteredDf = df.filter(filterCond)

Resultado:

+---+----+----+----+----+ | id|col1|col2|col3|col4| +---+----+----+----+----+ | 0| a1| b1| c1| d1| | 1| a2| b2| c2| d2| +---+----+----+----+----+


O como df.filter($"friend_id".isNotNull)


Utilizo el siguiente código para resolver mi pregunta. Funciona. Pero como todos sabemos, trabajo alrededor de la milla de un país para resolverlo. Entonces, ¿hay un atajo para eso? Gracias

def filter_null(field : Any) : Int = field match { case null => 0 case _ => 1 } val test = train_event_join.join( user_friends_pair, train_event_join("user_id") === user_friends_pair("user_id") && train_event_join("event_owner") === user_friends_pair("friend_id"), "left" ).select( train_event_join("user_id"), train_event_join("event_id"), train_event_join("invited"), train_event_join("day_diff"), train_event_join("interested"), train_event_join("event_owner"), user_friends_pair("friend_id") ).rdd.map{ line => ( line(0).toString.toLong, line(1).toString.toLong, line(2).toString.toLong, line(3).toString.toLong, line(4).toString.toLong, line(5).toString.toLong, filter_null(line(6)) ) }.toDF("user_id", "event_id", "invited", "day_diff", "interested", "event_owner", "creator_is_friend")


para la primera pregunta, es correcto que esté filtrando nulos y, por lo tanto, el recuento es cero.

para el segundo reemplazo: usar como abajo:

val options = Map("path" -> "...//ex.csv", "header" -> "true") val dfNull = spark.sqlContext.load("com.databricks.spark.csv", options) scala> dfNull.show +----------+----------+-------+--------+----------+-----------+---------+ | user_id| event_id|invited|day_diff|interested|event_owner|friend_id| +----------+----------+-------+--------+----------+-----------+---------+ | 4236494| 110357109| 0| -1| 0| 937597069| null| | 78065188| 498404626| 0| 0| 0| 2904922087| null| | 282487230|2520855981| 0| 28| 0| 3749735525| null| | 335269852|1641491432| 0| 2| 0| 1490350911| null| | 437050836|1238456614| 0| 2| 0| 991277599| null| | 447244169|2095085551| 0| -1| 0| 1579858878| a| | 516353916|1076364848| 0| 3| 1| 3597645735| b| | 528218683|1151525474| 0| 1| 0| 3433080956| c| | 531967718|3632072502| 0| 1| 0| 3863085861| null| | 627948360|2823119321| 0| 0| 0| 4092665803| null| | 811791433|3513954032| 0| 2| 0| 415464198| null| | 830686203| 99027353| 0| 0| 0| 3549822604| null| |1008893291|1115453150| 0| 2| 0| 2245155244| null| |1239364869|2824096896| 0| 2| 1| 2579294650| d| |1287950172|1076364848| 0| 0| 0| 3597645735| null| |1345896548|2658555390| 0| 1| 0| 2025118823| null| |1354205322|2564682277| 0| 3| 0| 2563033185| null| |1408344828|1255629030| 0| -1| 1| 804901063| null| |1452633375|1334001859| 0| 4| 0| 1488588320| null| |1625052108|3297535757| 0| 3| 0| 1972598895| null| +----------+----------+-------+--------+----------+-----------+---------+ dfNull.withColumn("friend_idTmp", when($"friend_id".isNull, "1").otherwise("0")).drop($"friend_id").withColumnRenamed("friend_idTmp", "friend_id").show +----------+----------+-------+--------+----------+-----------+---------+ | user_id| event_id|invited|day_diff|interested|event_owner|friend_id| +----------+----------+-------+--------+----------+-----------+---------+ | 4236494| 110357109| 0| -1| 0| 937597069| 1| | 78065188| 498404626| 0| 0| 0| 2904922087| 1| | 282487230|2520855981| 0| 28| 0| 3749735525| 1| | 335269852|1641491432| 0| 2| 0| 1490350911| 1| | 437050836|1238456614| 0| 2| 0| 991277599| 1| | 447244169|2095085551| 0| -1| 0| 1579858878| 0| | 516353916|1076364848| 0| 3| 1| 3597645735| 0| | 528218683|1151525474| 0| 1| 0| 3433080956| 0| | 531967718|3632072502| 0| 1| 0| 3863085861| 1| | 627948360|2823119321| 0| 0| 0| 4092665803| 1| | 811791433|3513954032| 0| 2| 0| 415464198| 1| | 830686203| 99027353| 0| 0| 0| 3549822604| 1| |1008893291|1115453150| 0| 2| 0| 2245155244| 1| |1239364869|2824096896| 0| 2| 1| 2579294650| 0| |1287950172|1076364848| 0| 0| 0| 3597645735| 1| |1345896548|2658555390| 0| 1| 0| 2025118823| 1| |1354205322|2564682277| 0| 3| 0| 2563033185| 1| |1408344828|1255629030| 0| -1| 1| 804901063| 1| |1452633375|1334001859| 0| 4| 0| 1488588320| 1| |1625052108|3297535757| 0| 3| 0| 1972598895| 1| +----------+----------+-------+--------+----------+-----------+---------+


df.where(df.col("friend_id").isNull)