apache-spark cassandra

apache spark - El filtro SparkSQL Pushdown no funciona en Spark Cassandra Connector



apache-spark (1)

Tengo un esquema de tabla como

appname text, randomnum int, addedtime timestamp, shortuuid text, assetname text, brandname text, PRIMARY KEY ((appname, randomnum), addedtime, shortuuid)

addtime es la clave de agrupamiento

Ahora cuando estoy usando el filtro de inserción en el tiempo de adición de la clave del clúster, no veo que se aplique

val rdd = tabledf.filter("addedtime > ''" + _to + "''").explain == Physical Plan == Filter (cast(addedtime#2 as string) > 2016-12-20 11:00:00)

De acuerdo con los documentos, debería aplicarse https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushdown-filter-examples

También funcionaba en el conector chispa cassandra 1.4 pero no con el último conector cassandra 1.6.0-M1. Por favor, hágamelo saber el problema


Análisis del problema

El problema parece ser la forma en que Catalyst está procesando la comparación.

Cuando haces

val rdd = tabledf.filter("addedtime > ''" + _to + "''").explain

Está emitiendo la columna de tiempo adicional a una Cadena y luego haciendo la comparación. Catalyst no presenta este predicado al Spark Cassandra Connector por lo que no hay forma de presionarlo.

INFO 2016-03-08 17:10:49,011 org.apache.spark.sql.cassandra.CassandraSourceRelation: Input Predicates: [] Filter (cast(addedtime#2 as string) > 2015-08-03)

Esto también es incorrecto porque está haciendo una comparación de cadenas (que léxicamente funcionará aquí pero no es realmente lo que quieres hacer). Así que esto parece un error en Catalyst ya que probablemente deberíamos presentar el predicado a la fuente incluso si hay un "emitir". Sin embargo, hay una solución que implica dar al optimizador de Catalyst lo que quiere ver.

Solución

Si en cambio damos una pista tipo

df.filter("addedtime > cast(''2015-08-03'' as timestamp)").explain

Entonces Spark generará la comparación correcta sin la cadena Cast

DEBUG 2016-03-08 17:11:09,792 org.apache.spark.sql.cassandra.CassandraSourceRelation: Basic Rules Applied: C* Filters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)] Spark Filters [] == Physical Plan == Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@332464fe[appname#0,randomnum#1,addedtime#2,shortuuid#3] PushedFilters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]