java apache-spark amazon-kinesis

java - La muestra Apache Spark Kinesis no funciona



apache-spark amazon-kinesis (2)

Esto podría tener algo que ver con la cantidad de hilo de trabajo que tienes. Tuve el mismo problema cuando ejecuté la aplicación con --master local [2]. Pasé muchas horas buscando una respuesta y no encontré nada. Solo por curiosidad, cambié a --master local [4] y funcionó. No sé la causa raíz. Tal vez alguien más familiarizado con Spark pueda iluminarnos.

Nota: en mi caso, mi transmisión Kinesis tenía dos fragmentos. Entonces, la aplicación creó dos flujos de entrada, uno para cada fragmento.

Estoy intentando ejecutar el ejemplo de JavaKinesisWordCountASL .

El ejemplo parece conectarse a mi Kinesis Stream y obtiene datos de la transmisión (como se muestra en el registro a continuación). Sin embargo, Sparks no invoca la función de llamada pasada al método unionStreams.flatMap en el ejemplo y no imprime ningún conteo de palabras.

He intentado ejecutar usando tanto Java 8 como Java 7. Lo estoy ejecutando en una instancia de Ubuntu. El mismo ejemplo funciona en mi macbook.

14/11/15 01:59:42 Programador INFO.ReceiverTracker: Stream 1 recibió 0 bloques 14/11/15 01:59:42 INFO storage.MemoryStore: ensureFreeSpace (264) llamado con curMem = 3512, maxMem = 938244833 14 / 11/15 01:59:42 INFO storage.MemoryStore: Block input-0-1416016781800 almacenado como valores en la memoria (tamaño estimado 264.0 B, libre 894.8 MB) 14/11/15 01:59:42 INFO storage.BlockManagerInfo: Added input-0-1416016781800 en memoria en ip-10-80-91-13.ec2.internal: 39149 (tamaño: 264.0 B, libre: 894.8 MB) 14/11/15 01:59:42 INFO storage.BlockManagerMaster: actualizado información del bloque input-0-1416016781800 14/11/15 01:59:42 INFO scheduler.JobScheduler: trabajos agregados por tiempo 1416016782000 ms 14/11/15 01:59:42 INFO network.SendingConnection: iniciando conexión a [ip- 10-80-91-13.ec2.internal / 10.80.91.13: 39149] 14/11/15 01:59:42 INFO network.SendingConnection: conectado a [ip-10-80-91-13.ec2.internal / 10.80.91.13:39149], 1 mensaje pendiente 14/11/15 01:59:42 INFO network.ConnectionManager: conexión aceptada de [ip-10-80-91- 13.ec2.internal / 10.80.91.13: 56700] 14/11/15 01:59:42 WARN storage.BlockManager: Block input-0-1416016781800 ya existe en esta máquina; no volver a agregarlo 14/11/15 01:59:42 INFO receiver.BlockGenerator: Pushed block input-0-1416016781800 14/11/15 01:59:43 INFO storage.MemoryStore: ensureFreeSpace (256) llamado con curMem = 3776, maxMem = 938244833 14/11/15 01:59:43 INFO storage.MemoryStore: Block input-0-1416016782800 almacenado como valores en memoria (tamaño estimado 256.0 B, libre 894.8 MB) 14/11/15 01:59: 43 INFO storage.BlockManagerInfo: Se agregó input-0-1416016782800 en la memoria en ip-10-80-91-13.ec2.internal: 39149 (tamaño: 256.0 B, libre: 894.8 MB) 14/11/15 01:59: 43 INFO storage.BlockManagerMaster: información actualizada del bloque input-0-1416016782800 14/11/15 01:59:43 WARN storage.BlockManager: Block input-0-1416016782800 ya existe en esta máquina; no volver a agregarlo 14/11/15 01:59:43 INFO receiver.BlockGenerator: Pushed block input-0-1416016782800 14/11/15 01:59:44 INFO scheduler.ReceiverTracker: Stream 0 recibió 2 bloques 14/11 / 15 01:59:44 Programador INFO.ReceiverTracker: Stream 1 recibió 0 bloques 14/11/15 01:59:44 INFO scheduler.JobScheduler: trabajos añadidos por tiempo 1416016784000 ms 14/11/15 01:59:46 Programador INFO .ReceiverTracker: Stream 0 recibió 0 bloques 14/11/15 01:59:46 Programador INFO. ReceptorTracker: Stream 1 recibió 0 bloques 14/11/15 01:59:46 INFO scheduler.JobScheduler: Empleos añadidos por tiempo 1416016786000 ms 14 / 11/15 01:59:46 INFO impl.CWPublisherRunnable: publicó con éxito 17 datums. 14/11/15 01:59:46 INFO storage.MemoryStore: ensureFreeSpace (248) llamado con curMem = 4032, maxMem = 938244833 14/11/15 01:59:46 INFO storage.MemoryStore: entrada de bloque-1-1416016786000 almacenado como valores en la memoria (tamaño estimado 248.0 B, libre 894.8 MB) 14/11/15 01:59:46 INFO storage.BlockManagerInfo: Se agregó input-1-1416016786000 en la memoria en ip-10-80-91-13.ec2. interno: 39149 (tamaño: 248.0 B, libre: 894.8 MB) 14/11/15 01:59:46 INFO storage.BlockManagerMaster: Información actualizada de la entrada del bloque-1-1416016786000 14/11/15 01:59:46 WARN storage .BlockManager: la entrada de bloque-1-1416016786000 ya existe en esta máquina; no volver a agregarlo 14/11/15 01:59:46 INFO receiver.BlockGenerator: Pushed block input-1-1416016786000 14/11/15 01:59:46 INFO impl.CWPublisherRunnable: publicada con éxito 14 datums. 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 0 recibió 0 bloques 14/11/15 01:59:48 INFO storage.MemoryStore: ensureFreeSpace (264) llamado con curMem = 4280, maxMem = 938244833 14 / 11/15 01:59:48 Programador INFO.ReceiverTracker: Stream 1 recibió 1 bloque 14/11/15 01:59:48 INFO storage.MemoryStore: bloque input-0-1416016787800 almacenado como valores en la memoria (tamaño estimado 264.0 B, gratis 894.8 MB) 14/11/15 01:59:48 INFO storage.BlockManagerInfo: Se agregó input-0-1416016787800 en la memoria en ip-10-80-91-13.ec2.internal: 39149 (tamaño: 264.0 B, gratis : 894.8 MB) 14/11/15 01:59:48 INFO storage.BlockManagerMaster: información actualizada del bloque input-0-1416016787800 14/11/15 01:59:48 INFO scheduler.JobScheduler: trabajos añadidos por tiempo 1416016788000 ms 14 / 11/15 01:59:48 WARN storage.BlockManager: Block input-0-1416016787800 ya existe en esta máquina; no volver a agregarlo 14/11/15 01:59:48 INFO receiver.BlockGenerator: Pushed block input-0-1416016787800 14/11/15 01:59:50 INFO scheduler.ReceiverTracker: Stream 0 recibió 1 bloque 14/11 / 15 01:59:50 INFO planificador.ReceiverTracker: Stream 1 recibió 0 bloques 14/11/15 01:59:50 INFO scheduler.JobScheduler: trabajos añadidos por tiempo 1416016790000 ms 14/11/15 01:59:51 INFO storage .MemoryStore: ensureFreeSpace (264) llamado con curMem = 4544, maxMem = 938244833 14/11/15 01:59:51 INFO storage.MemoryStore: Block input-0-1416016790800 almacenado como valores en la memoria (tamaño estimado 264.0 B, free 894.8 MB) 14/11/15 01:59:51 INFO storage.BlockManagerInfo: Se agregó input-0-1416016790800 en la memoria en ip-10-80-91-13.ec2.internal: 39149 (tamaño: 264.0 B, libre: 894.8 MB) 14/11/15 01:59:51 INFO storage.BlockManagerMaster: Información actualizada del bloque input-0-1416016790800 14/11/15 01:59:51 WARN storage.BlockManager: El bloque input-0-1416016790800 ya existe en esta maquina; no volver a agregarlo 14/11/15 01:59:51 INFO receiver.BlockGenerator: Pushed block input-0-1416016790800


Gracias a la sugerencia de @ user3594557.

Hay dos notas importantes de https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#input-dstreams

Si el número de núcleos asignados a la aplicación es menor o igual que el número de canales / receptores DS, el sistema recibirá datos, pero no podrá procesarlos.

Cuando se ejecuta localmente, si la URL maestra se establece en "local", entonces solo hay un núcleo para ejecutar tareas. Eso es insuficiente para programas con solo una entrada DStream (las transmisiones de archivos están bien) ya que el receptor ocupará ese núcleo y no quedará núcleo para procesar los datos.