elasticsearch - example - ¿Cómo puedo crear un histograma de deltas de sello de tiempo?
elasticsearch filter aggregation (1)
Estamos almacenando pequeños documentos en ES que representan una secuencia de eventos para un objeto. Cada evento tiene un sello de fecha / hora. Necesitamos analizar el tiempo entre eventos para todos los objetos durante un período de tiempo.
Por ejemplo, imagine estos documentos de eventos json:
{"objeto": "uno", "evento": "inicio", "fecha y hora": "2016-02-09 11:23:01"}
{"objeto": "uno", "evento": "detener", "fecha y hora": "2016-02-09 11:25:01"}
{"objeto": "dos", "evento": "inicio", "fecha y hora": "2016-01-02 11:23:01"}
{"objeto": "dos", "evento": "detener", "fecha y hora": "2016-01-02 11:24:01"}
Lo que queremos obtener de esto es un histograma que traza los dos deltas de marca de tiempo resultantes (de inicio a fin): 2 minutos / 120 segundos para el objeto uno y 1 minuto / 60 segundos para el objeto dos.
En última instancia, queremos monitorear el tiempo entre los eventos de inicio y fin, pero requiere que calculemos el tiempo entre esos eventos, luego los agreguemos o los proporcionemos a la UI de Kibana para agregarlos / trazarlos. Idealmente, nos gustaría transmitir los resultados directamente a Kibana para que podamos evitar la creación de una interfaz de usuario personalizada.
Gracias de antemano por cualquier idea o sugerencia.
Dado que está abierto para usar Logstash, hay una forma de hacerlo usando el filtro aggregate
Tenga en cuenta que este es un complemento de comunidad que necesita instalarse primero. (es decir, no se envía con Logstash de forma predeterminada)
La idea principal del filtro aggregate
es fusionar dos líneas de registro "relacionadas". Puede configurar el complemento para que sepa qué significa "relacionado". En su caso, "relacionado" significa que ambos eventos deben compartir el mismo nombre de object
(es decir, one
o two
) y luego que el primer evento tiene su campo de event
con el valor de start
y el segundo evento tiene su campo de event
con el valor de stop
.
Cuando el filtro encuentra el evento de start
, almacena el campo de datetime
de ese evento en un mapa interno. Cuando se encuentra con el evento de stop
, calcula la diferencia de tiempo entre los dos tiempos de fecha y almacena la duración en segundos en el campo de nueva duration
.
input {
...
}
filter {
...other filters
if [event] == "start" {
aggregate {
task_id => "%{object}"
code => "map[''start''] = event[''datetime'']"
map_action => "create"
}
} else if [event] == "stop" {
aggregate {
task_id => "%{object}"
code => "map[''duration''] = event[''datetime''] - map[''start'']"
end_of_task => true
timeout => 120
}
}
}
output {
elasticsearch {
...
}
}
Tenga en cuenta que puede ajustar el valor de timeout
(en este caso 120 segundos) para que se ajuste mejor a sus necesidades. Cuando haya transcurrido el tiempo de espera y aún no se haya producido ningún evento de detención, se abandonará el evento de inicio existente.