python apache-nifi

Python Script usando ExecuteStreamCommand



apache-nifi (1)

Después de hacer todo lo posible para encontrar preguntas anteriores y ejemplos relevantes para esta pregunta, y aún no encontrar las respuestas que estoy buscando, pensé que enviaría una pregunta yo mismo.

ExecuteStreamCommand me parece el procesador perfecto por las siguientes razones:

  • Puedo ejecutar cualquier script de Python y evitar Jython (de manera similar a ExecuteScript). Jython no es una opción para mí.
  • Puedo tomar FlowFiles. Esto es necesario ya que mi script está hecho para consumir la salida de un procesador anterior. Además, me gusta la idea de mantener los datos bajo "Administración de NiFi".
  • Escribe un "estado de ejecución" que será útil para el enrutamiento.

En pocas palabras, lo que estoy tratando de hacer con ExecuteStreamCommand es:

  • Ingerir la salida de un procesador anterior (una araña Scrapy que genera un archivo de texto con líneas JSON para ser exactos)
  • Llamar a un script de python (por ejemplo, python3 my_script.py )
  • Cargue el FlowFile que se ingirió en mi script de Python.
  • Seleccione el contenido del FlowFile.
  • Opere en el contenido del FlowFile dentro de Python.
  • Imprima una versión actualizada del FlowFile original o cree una nueva.
  • Continuar con mi flujo de NiFi con el FlowFile actualizado / nuevo.

Por razones de claridad, actualmente no entiendo:

  • Cómo llamar al script python (desde el procesador ExecuteStreamCommand)
  • Cómo cargar el FlowFile desde Python
  • Cómo actualizar o crear un nuevo FlowFile desde Python
  • Cómo enviar el FlowFile actualizado de Python a NiFi.

He encontrado varios ejemplos para ExecuteScript, pero desafortunadamente estos no se traducen exactamente en el uso de ExecuteStreamCommand.

Gracias de antemano. Cualquier consejo es apreciado.


A partir de su pregunta, dice que necesita invocar el script Python sin usar los procesadores InvokeScriptedProcessor o ExecuteScript porque no puede usar Jython. Dado ese requisito, aún debe ser capaz de lograr su objetivo. Si bien requiere cierta familiaridad con el marco, toda esta información proviene de la documentación de ExecuteStreamCommand .

Su sección "Actualmente no entiendo":

  • Cómo llamar al script python (desde el procesador ExecuteStreamCommand)

    • En su procesador ExecuteStreamCommand , configure las propiedades Argumentos de comando y Ruta de comando con lo siguiente:

      • Argumentos de comando: any flags or args, delimited by ; argumento any flags or args, delimited by ; (es decir, /path/to/my_script.py )
      • Ruta de comando: /path/to/python3
  • Cómo cargar el FlowFile desde Python

    • El contenido del archivo de flujo se pasará a través de STDIN , por lo que en su secuencia de comandos de Python, procese esos datos de la misma manera que normalmente procesaría STDIN .
  • Cómo actualizar o crear un nuevo FlowFile desde Python
    • NiFi maneja la creación de archivos de flujo en el marco. Todos los datos pasados ​​por su script Python a STDOUT se completarán en el contenido del archivo de flujo resultante pasado a la relación de flujo de salida del procesador ExecuteStreamCommand . Su script no necesita tener conocimiento de los "archivos de flujo" en este caso. Si en su lugar usara los procesadores ISP o ES , podría usar la API de scripts NiFi que se inyecta automáticamente en los scripts para crear o actualizar el objeto de archivo de flujo.
  • Cómo enviar el FlowFile actualizado de Python a NiFi.
    • Nuevamente, simplemente escriba el contenido del archivo de flujo deseado en STDOUT desde su script, y (dado un código de estado de retorno de 0 ) NiFi generará un nuevo archivo de flujo con ese contenido. Si establece la propiedad de atributo de destino de salida de ESC en un valor no nulo, NiFi actualizará el archivo de flujo existente con un nuevo atributo del mismo nombre que contiene la salida del script.