google-app-engine - how - ndb put
La mejor práctica para consultar una gran cantidad de entidades ndb del almacén de datos (4)
Me encontré con un límite interesante con el almacén de datos de App Engine. Estoy creando un controlador para ayudarnos a analizar algunos datos de uso en uno de nuestros servidores de producción. Para realizar el análisis, necesito consultar y resumir más de 10.000 entidades extraídas del almacén de datos. El cálculo no es difícil, es solo un histograma de elementos que pasan un filtro específico de las muestras de uso. El problema que alcancé es que no puedo recuperar los datos del almacén de datos lo suficientemente rápido como para hacer ningún procesamiento antes de llegar a la fecha límite de la consulta.
He intentado todo lo que puedo pensar para dividir la consulta en llamadas RPC paralelas para mejorar el rendimiento, pero de acuerdo con las aplicaciones, parece que no puedo obtener las consultas para que realmente se ejecuten en paralelo. No importa qué método intente (ver a continuación), siempre parece que los RPC vuelven a caer en una cascada de próximas consultas secuenciales.
Nota: el código de consulta y análisis funciona, simplemente se ejecuta lentamente porque no puedo obtener datos lo suficientemente rápido del almacén de datos.
Fondo
No tengo una versión en vivo que pueda compartir, pero aquí está el modelo básico para la parte del sistema de la que estoy hablando:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
Puede pensar en las muestras como las veces que un usuario utiliza una capacidad de un nombre dado. (por ejemplo: ''systemA.feature_x''). Las etiquetas se basan en los detalles del cliente, la información del sistema y la función. ej .: [''winxp'', ''2.5.1'', ''systemA'', ''feature_x'', ''premium_account'']). Entonces las etiquetas forman un conjunto desnormalizado de tokens que podrían usarse para encontrar muestras de interés.
El análisis que trato de hacer consiste en tomar un intervalo de fechas y preguntar cuántas veces se utilizó una función del conjunto de características (quizás todas las características) por día (o por hora) por cuenta de cliente (empresa, no por usuario).
Entonces, la entrada al controlador será algo así como:
- Fecha de inicio
- Fecha final
- Tag (s)
La salida sería:
[{
''company_account'': <string>,
''counts'': [
{''timeperiod'': <iso8601 date>, ''count'': <int>}, ...
]
}, ...
]
Código común para consultas
Aquí hay algunos códigos en común para todas las consultas. La estructura general del manejador es un simple manejador get que usa webapp2 que configura los parámetros de consulta, ejecuta la consulta, procesa los resultados, crea datos para devolver.
# -- Build Query Object --- #
query_opts = {}
query_opts[''batch_size''] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data[''customer'']
addCountForPeriod(count_key, sample.timestamp)
Métodos probados
He intentado una variedad de métodos para intentar extraer datos del almacén de datos lo más rápido posible y en paralelo. Los métodos que he intentado hasta ahora incluyen:
A. Single Iteration
Este es más un caso base simple para compararlo con los otros métodos. Acabo de construir la consulta e iterar sobre todos los elementos permitiendo que ndb haga lo que hace para extraerlos uno después del otro.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
B. Gran alcance
La idea aquí fue ver si podía hacer una búsqueda muy grande.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
C. Async recupera el rango de tiempo
La idea aquí es reconocer que las muestras están bastante bien espaciadas a lo largo del tiempo, así que puedo crear un conjunto de consultas independientes que dividen la región de tiempo general en fragmentos e intentan ejecutar cada uno de estos en paralelo usando asincrónico:
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
D. Mapeo Async
Probé este método porque la documentación lo hizo sonar como ndb puede explotar cierto paralelismo automáticamente cuando se utiliza el método Query.map_async.
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data[''customer'']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
Salir
Probé una consulta de ejemplo para recopilar el tiempo de respuesta global y los rastreos de aplicaciones. Los resultados son:
A. Single Iteration
real: 15.645s
Este va secuencialmente a través de buscar lotes uno después del otro y luego recupera cada sesión de Memcache.
B. Gran alcance
real: 12.12s
Efectivamente lo mismo que la opción A pero un poco más rápido por alguna razón.
C. Async recupera el rango de tiempo
real: 15.251s
Parece proporcionar más paralelismo al inicio, pero parece ralentizarse por una secuencia de llamadas al siguiente durante la iteración de los resultados. Tampoco parece poder superponer las búsquedas de Memcache de sesión con las consultas pendientes.
D. Mapeo Async
real: 13.752s
Este es el más difícil de entender para mí. Parece q tiene una buena cantidad de superposición, pero todo parece extenderse en una cascada en lugar de en paralelo.
Recomendaciones
En base a todo esto, ¿qué me estoy perdiendo? ¿Acabo de llegar a un límite en App Engine o hay una mejor manera de desplegar una gran cantidad de entidades en paralelo?
No sé qué probar después. Pensé en reescribir el cliente para hacer varias solicitudes al motor de la aplicación en paralelo, pero esto parece una verdadera fuerza bruta. Realmente esperaría que el motor de la aplicación sea capaz de manejar este caso de uso, así que supongo que hay algo que me falta.
Actualizar
Al final encontré que la opción C era la mejor para mi caso. Pude optimizarlo para completar en 6.1 segundos. Aún no es perfecto, pero mucho mejor.
Después de recibir consejos de varias personas, descubrí que los siguientes elementos eran clave para comprender y tener en cuenta:
- Se pueden ejecutar múltiples consultas en paralelo
- Solo 10 RPC pueden estar en vuelo a la vez
- Intenta desnormalizar hasta el punto de que no hay consultas secundarias
- Es mejor dejar este tipo de tarea para reducir el mapa y las colas de tareas, no las consultas en tiempo real
Entonces, ¿qué hice para hacerlo más rápido?
- Particione el espacio de consulta desde el principio en función del tiempo. (Nota: cuanto más iguales sean las particiones en términos de entidades devueltas, mejor)
- Desnormalicé los datos más para eliminar la necesidad de la consulta de la sesión secundaria
- Hice uso de las operaciones asincrónicas ndb y wait_any () para superponer las consultas con el procesamiento
Todavía no obtengo el rendimiento que esperaba o me gusta, pero por el momento es viable. Solo desearía que fuera una mejor forma de obtener grandes cantidades de entidades secuenciales en la memoria rápidamente en manejadores.
Grandes operaciones de datos en App Engine se implementan mejor utilizando algún tipo de operación de reducción de mapa.
Aquí hay un video que describe el proceso, pero que incluye BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/
No parece que necesites BigQuery, pero probablemente quieras utilizar las porciones de Mapa y Reducir de la canalización.
La principal diferencia entre lo que está haciendo y la situación de reducción de mapas es que está iniciando una instancia y repitiendo las consultas, donde en mapreduce, tendría una instancia separada ejecutándose en paralelo para cada consulta. Necesitará una operación de reducción para "resumir" todos los datos, y escribir el resultado en algún lugar.
El otro problema que tienes es que deberías usar cursores para iterar. https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors
Si el iterador está utilizando un desplazamiento de consulta, será ineficaz, ya que un desplazamiento emite la misma consulta, pasa por alto una serie de resultados y le proporciona el siguiente conjunto, mientras que el cursor salta directamente al siguiente conjunto.
La nueva característica experimental de procesamiento de datos (una API de App Engine para MapReduce) parece muy adecuada para resolver este problema. Hace sharding automático para ejecutar múltiples procesos de trabajo en paralelo.
Tengo un problema similar y después de trabajar con el soporte de Google durante algunas semanas puedo confirmar que no hay una solución mágica al menos desde diciembre de 2017.
tl; dr: se puede esperar un rendimiento de 220 entidades / segundo para el SDK estándar que se ejecuta en la instancia B1 hasta 900 entidades / segundo para un SDK parcheado que se ejecuta en una instancia B8.
La limitación está relacionada con la CPU y el cambio del tipo instanciado impacta directamente en el rendimiento. Esto se confirma con resultados similares obtenidos en las instancias B4 y B4_1G
El mejor rendimiento que obtuve para una entidad Expando con aproximadamente 30 campos es:
Estándar GAE SDK
- Instancia B1: ~ 220 entidades / segundo
- Instancia B2: ~ 250 entidades / segundo
- Instancia B4: ~ 560 entidades / segundo
- Instancia B4_1G: ~ 560 entidades / segundo
- Instancia B8: ~ 650 entidades / segundo
Parche GAE SDK
- Instancia B1: ~ 420 entidades / segundo
- Instancia B8: ~ 900 entidades / segundo
Para el estándar GAE SDK probé varios enfoques, incluido el multi-threading, pero el mejor resultó ser fetch_async
con wait_any
. La biblioteca actual de NDB ya hace un gran trabajo al utilizar asincronías y futuros bajo el capó, por lo que cualquier intento de impulsar ese uso de hilos solo lo empeora.
Encontré dos enfoques interesantes para optimizar esto:
- Matt Faus: acelerando las lecturas del Datastore de GAE con la proyección de Protobuf
- Evan Jones: seguimiento de un error de rendimiento de Python en App Engine
Matt Faus explica el problema muy bien:
GAE SDK proporciona una API para leer y escribir objetos derivados de sus clases en el almacén de datos. Esto le ahorra el trabajo aburrido de validar los datos sin procesar devueltos por el almacén de datos y volver a empaquetarlos en un objeto fácil de usar. En particular, GAE utiliza búferes de protocolo para transmitir datos sin procesar de la tienda a la máquina frontend que lo necesita. El SDK es responsable de decodificar este formato y devolver un objeto limpio a su código. Esta utilidad es excelente, pero a veces hace un poco más de lo que le gustaría. [...] Utilizando nuestra herramienta de creación de perfiles, descubrí que el 50% del tiempo dedicado a recuperar estas entidades era durante la fase de decodificación de protobuf a python-object. Esto significa que la CPU en el servidor frontend fue un cuello de botella en estas lecturas del almacén de datos.
Ambos enfoques intentan reducir el tiempo dedicado a la protobuf en la decodificación de Python al reducir el número de campos decodificados.
Intenté ambos enfoques, pero solo tengo éxito con el de Matt. SDK interno cambió desde que Evan publicó su solución. Tuve que cambiar un poco el código publicado por Matt here , pero fue bastante fácil: si hay interés, puedo publicar el código final.
Para una entidad Expando regular con alrededor de 30 campos utilicé la solución de Matt para decodificar solo campos pares y obtuve una mejora significativa.
En conclusión, es necesario planificar en consecuencia y no se espera poder procesar mucho más que unas pocas entidades en una solicitud GAE "en tiempo real".
Un procesamiento grande como este no debería realizarse en una solicitud del usuario, que tiene un límite de tiempo de 60 s. En cambio, debe hacerse en un contexto que admita solicitudes de larga ejecución. La cola de tareas admite solicitudes de hasta 10 minutos y (creo) restricciones de memoria normales (las instancias F1, por defecto, tienen 128 MB de memoria ). Para límites aún más altos (sin tiempo de espera de solicitud, 1GB + de memoria), use backends .
Aquí hay algo que probar: configurar una URL que, cuando se acceda, active una tarea de cola de tareas. Devuelve una página web que sondea cada ~ 5s a otra URL que responde con verdadero / falso si la tarea de cola de tareas ya se ha completado. La cola de tareas procesa los datos, lo que puede llevar unos 10 segundos y guarda el resultado en el almacén de datos, ya sea como datos calculados o como página web procesada. Una vez que la página inicial detecta que se ha completado, el usuario se redirige a la página, que recupera los resultados ahora calculados del almacén de datos.