apache spark - El filtro SparkSQL Pushdown no funciona en Spark Cassandra Connector
apache-spark (1)
Tengo un esquema de tabla como
appname text,
randomnum int,
addedtime timestamp,
shortuuid text,
assetname text,
brandname text,
PRIMARY KEY ((appname, randomnum), addedtime, shortuuid)
addtime es la clave de agrupamiento
Ahora cuando estoy usando el filtro de inserción en el tiempo de adición de la clave del clúster, no veo que se aplique
val rdd = tabledf.filter("addedtime > ''" + _to + "''").explain
== Physical Plan ==
Filter (cast(addedtime#2 as string) > 2016-12-20 11:00:00)
De acuerdo con los documentos, debería aplicarse https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushdown-filter-examples
También funcionaba en el conector chispa cassandra 1.4 pero no con el último conector cassandra 1.6.0-M1. Por favor, hágamelo saber el problema
Análisis del problema
El problema parece ser la forma en que Catalyst está procesando la comparación.
Cuando haces
val rdd = tabledf.filter("addedtime > ''" + _to + "''").explain
Está emitiendo la columna de tiempo adicional a una Cadena y luego haciendo la comparación. Catalyst no presenta este predicado al Spark Cassandra Connector por lo que no hay forma de presionarlo.
INFO 2016-03-08 17:10:49,011 org.apache.spark.sql.cassandra.CassandraSourceRelation: Input Predicates: []
Filter (cast(addedtime#2 as string) > 2015-08-03)
Esto también es incorrecto porque está haciendo una comparación de cadenas (que léxicamente funcionará aquí pero no es realmente lo que quieres hacer). Así que esto parece un error en Catalyst ya que probablemente deberíamos presentar el predicado a la fuente incluso si hay un "emitir". Sin embargo, hay una solución que implica dar al optimizador de Catalyst lo que quiere ver.
Solución
Si en cambio damos una pista tipo
df.filter("addedtime > cast(''2015-08-03'' as timestamp)").explain
Entonces Spark generará la comparación correcta sin la cadena Cast
DEBUG 2016-03-08 17:11:09,792 org.apache.spark.sql.cassandra.CassandraSourceRelation: Basic Rules Applied:
C* Filters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]
Spark Filters []
== Physical Plan ==
Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@332464fe[appname#0,randomnum#1,addedtime#2,shortuuid#3] PushedFilters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]