cassandra apache-spark datastax-enterprise

Cómo cargar Spark Cassandra Connector en el shell?



apache-spark datastax-enterprise (6)

Editar: Las cosas son un poco más fáciles ahora

Para obtener instrucciones detalladas, consulte el sitio web del proyecto https://github.com/datastax/spark-cassandra-connector/blob/master/doc/13_spark_shell.md

O puede usar Spark-Packages para cargar la Biblioteca (no todas las versiones publicadas) http://spark-packages.org/package/datastax/spark-cassandra-connector

> $SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M3-s_2.10

Lo siguiente asume que está ejecutando con OSS Apache C *

Querrá iniciar la clase con el conjunto de ruta de clase de controlador para incluir todas las bibliotecas de conectores

Citaré una publicación de blog de la ilustre Amy Tobey

La forma más fácil que he encontrado es establecer la ruta de clases y luego reiniciar el contexto en REPL con las clases necesarias importadas para hacer que sc.cassandraTable () sea visible. Los métodos recién cargados no se mostrarán en la finalización de pestañas. No sé por qué.

/opt/spark/bin/spark-shell --driver-class-path $(echo /path/to/connector/*.jar |sed ''s/ /:/g'')

Imprimirá un montón de información de registro y luego presentará el indicador scala>.

scala> sc.stop

Ahora que el contexto está detenido, es hora de importar el conector.

scala> import com.datastax.spark.connector._ scala> val conf = new SparkConf() scala> conf.set("cassandra.connection.host", "node1.pc.datastax.com") scala> val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf) scala> val table = sc.cassandraTable("keyspace", "table") scala> table.count

Si está ejecutando con DSE <4.5.1

Existe un pequeño problema con DSE Classloader y las convenciones de nomenclatura de paquetes anteriores que le impedirán encontrar las nuevas bibliotecas de spark-connector. Debería poder evitar esto eliminando la línea que especifica el cargador DSE Class en los scripts que comienzan con spark-shell.

Estoy tratando de usar Spark Cassandra Connector en Spark 1.1.0.

He creado con éxito el archivo jar desde la rama principal en GitHub y he conseguido que las demostraciones incluidas funcionen. Sin embargo, cuando intento cargar los archivos jar en el spark-shell no puedo importar ninguna de las clases del paquete com.datastax.spark.connector .

He intentado usar la opción --jars en spark-shell y agregar el directorio con el archivo jar a CLASSPATH de Java. Ninguna de estas opciones funciona. De hecho, cuando uso la opción --jars , la salida de registro muestra que el jar de Datastax se está cargando, pero todavía no puedo importar nada de com.datastax .

Pude cargar el conector Tuplejump Calliope Cassandra en la spark-shell utilizando --jars , así que sé que está funcionando. Es solo el conector Datastax el que me falla.


Entiendo. Debajo está lo que hice:

$ git clone https://github.com/datastax/spark-cassandra-connector.git $ cd spark-cassandra-connector $ sbt/sbt assembly $ $SPARK_HOME/bin/spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/connector-assembly-1.2.0-SNAPSHOT.jar

En Scala,

scala> sc.stop scala> import com.datastax.spark.connector._ scala> import org.apache.spark.SparkContext scala> import org.apache.spark.SparkContext._ scala> import org.apache.spark.SparkConf scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "my cassandra host") scala> val sc = new SparkContext("spark://spark host:7077", "test", conf)


Los siguientes pasos describen cómo configurar un servidor con un Nodo Spark y un Nodo Cassandra.

Configurando Open Source Spark

Esto supone que ya tienes la configuración de Cassandra.

Paso 1: descarga y configura Spark

Go to http://spark.apache.org/downloads.html.

a) Para simplificar, utilizaremos uno de los paquetes de Spark precompilados. Elija Spark versión 2.0.0 y Preconstruido para Hadoop 2.7 y luego Direct Download. Esto descargará un archivo con los binarios creados para Spark.

b) Extraiga esto a un directorio de su elección. Pondré el mío en ~ / apps / spark-1.2

c) Test Spark está trabajando al abrir el Shell

Paso 2: prueba que Spark funciona

a) cd en el directorio de Spark Ejecutar "./bin/spark-shell". Esto abrirá el programa de shell interactivo Spark

b) Si todo funcionó, debería mostrar este mensaje: "scala>"

Ejecuta un simple cálculo:

sc.parallelize (1 a 50) .sum ( + ) que debería dar salida a 1250.

c) ¡Felicidades Spark está funcionando! Salga de la carcasa Spark con el comando "exit"

El Conector Spark Cassandra

Para conectar Spark a un clúster Cassandra, el Conector Cassandra deberá agregarse al proyecto Spark. DataStax proporciona su propio Cassandra Connector en GitHub y lo usaremos.

  1. Clona el repositorio de Spark Cassandra Connector:

    https://github.com/datastax/spark-cassandra-connector

  2. cd en "spark-cassandra-connector" Construye el Spark Cassandra Connector ejecutando el comando

    ./sbt/sbt Dscala-2.11 = ensamblaje verdadero

Esto debería generar archivos jar compilados en el directorio llamado "destino". Habrá dos archivos jar, uno para Scala y otro para Java. El contenedor que nos interesa es: "spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar", el de Scala. Mueva el archivo jar a un directorio fácil de encontrar: puse el mío en ~ / apps / spark-1.2 / jars

Para cargar el conector en la Spark Shell:

inicia el shell con este comando:

../bin/spark-shell -jars ~ / apps / spark-1.2 / jars / spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar

Conecte el contexto de chispa al clúster de Cassandra y detenga el contexto predeterminado:

sc.stop

Importe los archivos jar necesarios:

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf

Haga un nuevo SparkConf con los detalles de conexión de Cassandra:

val conf = new SparkConf (true) .set ("spark.cassandra.connection.host", "localhost")

Crea un nuevo contexto de chispa:

val sc = new SparkContext (conf)

Ahora tiene un nuevo SparkContext que está conectado a su clúster Cassandra.


Para acceder a Cassandra desde la chispa-carcasa, he construido un conjunto de cassandra-spark-driver con todas las dependencias (un "uberjar"). Proporcionándolo a la chispa-shell usando la opción --jars como esta:

spark-shell --jars spark-cassandra-assembly-1.0.0-SNAPSHOT-jar-with-dependencies.jar

Estaba enfrentando el mismo problema descrito aquí y este método es simple y conveniente (en lugar de cargar la larga lista de dependencias)

Creé una esencia con el archivo POM que puedes download . Usando el pom para crear el uberjar debes hacer:

mvn package

Si está utilizando sbt, mire en el complemento sbt-assembly.


Si desea evitar detener / iniciar el contexto en el shell, también puede agregarlo a sus propiedades de chispa en:

{spark_install} /conf/spark-defaults.conf

spark.cassandra.connection.host=192.168.10.10


Código completo de Spark-Cassandra-Connector en JAVA con Window-7,8,10 Usefull.

import com.datastax.driver.core.Session; import com.datastax.spark.connector.cql.CassandraConnector; import com.google.common.base.Optional; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; import spark_conn.Spark_connection; import java.io.Serializable; import java.math.BigDecimal; import java.text.MessageFormat; import java.util.*; import static com.datastax.spark.connector.CassandraJavaUtil.*; public class App implements Serializable { private transient SparkConf conf; private App(SparkConf conf) { this.conf = conf; } private void run() { JavaSparkContext sc = new JavaSparkContext(conf); generateData(sc); compute(sc); showResults(sc); sc.stop(); } private void generateData(JavaSparkContext sc) { CassandraConnector connector = CassandraConnector.apply(sc.getConf()); // Prepare the schema try{ Session session=connector.openSession(); session.execute("DROP KEYSPACE IF EXISTS java_api"); session.execute("CREATE KEYSPACE java_api WITH replication = {''class'': ''SimpleStrategy'', ''replication_factor'': 1}"); session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)"); session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)"); session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)"); }catch(Exception e){System.out.println(e);} // Prepare the products hierarchy List<Product> products = Arrays.asList( new Product(0, "All products", Collections.<Integer>emptyList()), new Product(1, "Product A", Arrays.asList(0)), new Product(4, "Product A1", Arrays.asList(0, 1)), new Product(5, "Product A2", Arrays.asList(0, 1)), new Product(2, "Product B", Arrays.asList(0)), new Product(6, "Product B1", Arrays.asList(0, 2)), new Product(7, "Product B2", Arrays.asList(0, 2)), new Product(3, "Product C", Arrays.asList(0)), new Product(8, "Product C1", Arrays.asList(0, 3)), new Product(9, "Product C2", Arrays.asList(0, 3)) ); JavaRDD<Product> productsRDD = sc.parallelize(products); javaFunctions(productsRDD, Product.class). saveToCassandra("java_api", "products"); JavaRDD<Sale> salesRDD = productsRDD.filter (new Function<Product, Boolean>() { @Override public Boolean call(Product product) throws Exception { return product.getParents().size() == 2; } }).flatMap(new FlatMapFunction<Product, Sale>() { @Override public Iterable<Sale> call(Product product) throws Exception { Random random = new Random(); List<Sale> sales = new ArrayList<>(1000); for (int i = 0; i < 1000; i++) { sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble()))); } return sales; } }); javaFunctions(salesRDD, Sale.class).saveToCassandra ("java_api", "sales"); } private void compute(JavaSparkContext sc) { JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc) .cassandraTable("java_api", "products", Product.class) .keyBy(new Function<Product, Integer>() { @Override public Integer call(Product product) throws Exception { return product.getId(); } }); JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc) .cassandraTable("java_api", "sales", Sale.class) .keyBy(new Function<Sale, Integer>() { @Override public Integer call(Sale sale) throws Exception { return sale.getProduct(); } }); JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD); JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() { @Override public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception { Tuple2<Sale, Product> saleWithProduct = input._2(); List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1); allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice())); for (Integer parentProduct : saleWithProduct._2().getParents()) { allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice())); } return allSales; } }); JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() { @Override public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception { return v1.add(v2); } }).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() { @Override public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception { return new Summary(input._1(), input._2()); } }); javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries"); } private void showResults(JavaSparkContext sc) { JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc) .cassandraTable("java_api", "summaries", Summary.class) .keyBy(new Function<Summary, Integer>() { @Override public Integer call(Summary summary) throws Exception { return summary.getProduct(); } }); JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc) .cassandraTable("java_api", "products", Product.class) .keyBy(new Function<Product, Integer>() { @Override public Integer call(Product product) throws Exception { return product.getId(); } }); List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray(); for (Tuple2<Product, Optional<Summary>> result : results) { System.out.println(result); } } public static void main(String[] args) { // if (args.length != 2) { // System.err.println("Syntax: com.datastax.spark.demo.App <Spark Master URL> <Cassandra contact point>"); // System.exit(1); // } // SparkConf conf = new SparkConf(true) // .set("spark.cassandra.connection.host", "127.0.1.1") // .set("spark.cassandra.auth.username", "cassandra") // .set("spark.cassandra.auth.password", "cassandra"); //SparkContext sc = new SparkContext("spark://127.0.1.1:9045", "test", conf); //return ; /* try{ SparkConf conf = new SparkConf(true); conf.setAppName("Spark-Cassandra Integration"); conf.setMaster("yarn-cluster"); conf.set("spark.cassandra.connection.host", "192.168.1.200"); conf.set("spark.cassandra.connection.rpc.port", "9042"); conf.set("spark.cassandra.connection.timeout_ms", "40000"); conf.set("spark.cassandra.read.timeout_ms", "200000"); System.out.println("Hi.......Main Method1111..."); conf.set("spark.cassandra.auth.username","cassandra"); conf.set("spark.cassandra.auth.password","cassandra"); System.out.println("Connected Successful...!/n"); App app = new App(conf); app.run(); }catch(Exception e){System.out.println(e);}*/ SparkConf conf = new SparkConf(); conf.setAppName("Java API demo"); // conf.setMaster(args[0]); // conf.set("spark.cassandra.connection.host", args[1]); conf.setMaster("spark://192.168.1.117:7077"); conf.set("spark.cassandra.connection.host", "192.168.1.200"); conf.set("spark.cassandra.connection.port", "9042"); conf.set("spark.ui.port","4040"); conf.set("spark.cassandra.auth.username","cassandra"); conf.set("spark.cassandra.auth.password","cassandra"); App app = new App(conf); app.run(); } public static class Product implements Serializable { private Integer id; private String name; private List<Integer> parents; public Product() { } public Product(Integer id, String name, List<Integer> parents) { this.id = id; this.name = name; this.parents = parents; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public List<Integer> getParents() { return parents; } public void setParents(List<Integer> parents) { this.parents = parents; } @Override public String toString() { return MessageFormat.format("Product''{''id={0}, name=''''{1}'''', parents={2}''}''", id, name, parents); } } public static class Sale implements Serializable { private UUID id; private Integer product; private BigDecimal price; public Sale() { } public Sale(UUID id, Integer product, BigDecimal price) { this.id = id; this.product = product; this.price = price; } public UUID getId() { return id; } public void setId(UUID id) { this.id = id; } public Integer getProduct() { return product; } public void setProduct(Integer product) { this.product = product; } public BigDecimal getPrice() { return price; } public void setPrice(BigDecimal price) { this.price = price; } @Override public String toString() { return MessageFormat.format("Sale''{''id={0}, product={1}, price={2}''}''", id, product, price); } } public static class Summary implements Serializable { private Integer product; private BigDecimal summary; public Summary() { } public Summary(Integer product, BigDecimal summary) { this.product = product; this.summary = summary; } public Integer getProduct() { return product; } public void setProduct(Integer product) { this.product = product; } public BigDecimal getSummary() { return summary; } public void setSummary(BigDecimal summary) { this.summary = summary; } @Override public String toString() { return MessageFormat.format("Summary''{''product={0}, summary={1}''}''", product, summary); } } }