signal monotonic interpolate interpld from extrapolate code python numpy scipy interpolation

python - interpolate - scipy monotonic cubic interpolation



Interpolación lineal rápida en Numpy/Scipy "a lo largo de un camino" (3)

Digamos que tengo datos de estaciones meteorológicas en 3 altitudes (conocidas) en una montaña. Específicamente, cada estación registra una medición de temperatura en su ubicación cada minuto. Tengo dos tipos de interpolación que me gustaría realizar. Y me gustaría poder realizar cada uno rápidamente.

Así que vamos a configurar algunos datos:

import numpy as np from scipy.interpolate import interp1d import pandas as pd import seaborn as sns np.random.seed(0) N, sigma = 1000., 5 basetemps = 70 + (np.random.randn(N) * sigma) midtemps = 50 + (np.random.randn(N) * sigma) toptemps = 40 + (np.random.randn(N) * sigma) alltemps = np.array([basetemps, midtemps, toptemps]).T # note transpose! trend = np.sin(4 / N * np.arange(N)) * 30 trend = trend[:, np.newaxis] altitudes = np.array([500, 1500, 4000]).astype(float) finaltemps = pd.DataFrame(alltemps + trend, columns=altitudes) finaltemps.index.names, finaltemps.columns.names = [''Time''], [''Altitude''] finaltemps.plot()

Genial, entonces nuestras temperaturas se ven así:

Interpolar todos los tiempos a la misma altitud:

Creo que este es bastante sencillo. Digamos que quiero obtener la temperatura a una altitud de 1,000 para cada momento. Solo puedo usar métodos de interpolación scipy :

interping_function = interp1d(altitudes, finaltemps.values) interped_to_1000 = interping_function(1000) fig, ax = plt.subplots(1, 1, figsize=(8, 5)) finaltemps.plot(ax=ax, alpha=0.15) ax.plot(interped_to_1000, label=''Interped'') ax.legend(loc=''best'', title=finaltemps.columns.name)

Esto funciona muy bien. Y veamos acerca de la velocidad:

%%timeit res = interp1d(altitudes, finaltemps.values)(1000) #-> 1000 loops, best of 3: 207 µs per loop

Interpolar "a lo largo de un camino":

Así que ahora tengo un segundo problema relacionado. Digamos que conozco la altitud de un grupo de excursionistas en función del tiempo, y quiero calcular la temperatura en su ubicación (en movimiento) mediante la interpolación lineal de mis datos a través del tiempo. En particular, los momentos en que sé la ubicación de la fiesta de senderismo son los mismos momentos en que sé las temperaturas en mis estaciones meteorológicas. Puedo hacer esto sin demasiado esfuerzo:

location = np.linspace(altitudes[0], altitudes[-1], N) interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) for i, loc in enumerate(location)]) fig, ax = plt.subplots(1, 1, figsize=(8, 5)) finaltemps.plot(ax=ax, alpha=0.15) ax.plot(interped_along_path, label=''Interped'') ax.legend(loc=''best'', title=finaltemps.columns.name)

Así que esto funciona realmente bien, pero es importante tener en cuenta que la línea clave de arriba está usando la comprensión de lista para ocultar una enorme cantidad de trabajo. En el caso anterior, scipy crea una única función de interpolación y la evalúa una vez en una gran cantidad de datos. En este caso, scipy realidad está construyendo N funciones de interpolación individuales y evaluando cada una con una pequeña cantidad de datos. Esto se siente inherentemente ineficiente. Hay un bucle for acechando aquí (en la lista de comprensión) y además, esto simplemente se siente flácido.

No en vano, esto es mucho más lento que el caso anterior:

%%timeit res = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) for i, loc in enumerate(location)]) #-> 10 loops, best of 3: 145 ms per loop

Entonces el segundo ejemplo corre 1,000 más lento que el primero. Es decir, la idea de que el trabajo pesado es el paso "hacer una función de interpolación lineal" ... que ocurre 1.000 veces en el segundo ejemplo, pero solo una vez en el primero.

Entonces, la pregunta: ¿hay una mejor manera de abordar el segundo problema? Por ejemplo, ¿hay una buena manera de configurarlo con interpolación bidimensional (que tal vez podría manejar el caso en el que los momentos en que se conocen las ubicaciones de los excursionistas no son los tiempos en que se han muestreado las temperaturas)? ¿O hay una manera particularmente hábil de manejar las cosas aquí donde los tiempos se alinean? ¿U otro?


Para un punto fijo en el tiempo, puede utilizar la siguiente función de interpolación:

g(a) = cc[0]*abs(a-aa[0]) + cc[1]*abs(a-aa[1]) + cc[2]*abs(a-aa[2])

donde a es la altitud del excursionista, aa el vector con las 3 altitudes medición y cc es un vector con los coeficientes. Hay tres cosas a tener en cuenta:

  1. Para las temperaturas dadas (todos los alltemps ) correspondientes a aa , la determinación de cc se puede hacer resolviendo una ecuación de matriz lineal utilizando np.linalg.solve() .
  2. g(a) es fácil de vectorizar para un (N,) dimensional a y (N, 3) dimensional cc (incluyendo np.linalg.solve() respectivamente).
  3. g(a) se denomina núcleo de spline univariado de primer orden (para tres puntos). El uso de abs(a-aa[i])**(2*d-1) cambiaría el orden de spline a d . Este enfoque podría interpretarse como una versión simplificada de un Proceso Gaussiano en Aprendizaje Automático .

Entonces el código sería:

import matplotlib.pyplot as plt import numpy as np import seaborn as sns # generate temperatures np.random.seed(0) N, sigma = 1000, 5 trend = np.sin(4 / N * np.arange(N)) * 30 alltemps = np.array([tmp0 + trend + sigma*np.random.randn(N) for tmp0 in [70, 50, 40]]) # generate attitudes: altitudes = np.array([500, 1500, 4000]).astype(float) location = np.linspace(altitudes[0], altitudes[-1], N) def doit(): """ do the interpolation, improved version for speed """ AA = np.vstack([np.abs(altitudes-a_i) for a_i in altitudes]) # This is slighty faster than np.linalg.solve(), because AA is small: cc = np.dot(np.linalg.inv(AA), alltemps) return (cc[0]*np.abs(location-altitudes[0]) + cc[1]*np.abs(location-altitudes[1]) + cc[2]*np.abs(location-altitudes[2])) t_loc = doit() # call interpolator # do the plotting: fg, ax = plt.subplots(num=1) for alt, t in zip(altitudes, alltemps): ax.plot(t, label="%d feet" % alt, alpha=.5) ax.plot(t_loc, label="Interpolation") ax.legend(loc="best", title="Altitude:") ax.set_xlabel("Time") ax.set_ylabel("Temperature") fg.canvas.draw()

Midiendo el tiempo da:

In [2]: %timeit doit() 10000 loops, best of 3: 107 µs per loop

Actualización: reemplacé la lista de comprensión original en doit() para importar la velocidad en un 30% (para N=1000 ).

Además, tal como se solicitó para la comparación, el bloque de código de referencia de @ moarningsun en mi máquina:

10 loops, best of 3: 110 ms per loop interp_checked 10000 loops, best of 3: 83.9 µs per loop scipy_interpn 1000 loops, best of 3: 678 µs per loop Output allclose: [True, True, True]

Tenga en cuenta que N=1000 es un número relativamente pequeño. Usando N=100000 produce los resultados:

interp_checked 100 loops, best of 3: 8.37 ms per loop %timeit doit() 100 loops, best of 3: 5.31 ms per loop

Esto muestra que este enfoque se escala mejor para N grande que el enfoque interp_checked .


Una interpolación lineal entre dos valores y1 , y2 en las ubicaciones x1 y x2 , con respecto al punto xi es simplemente:

yi = y1 + (y2-y1) * (xi-x1) / (x2-x1)

Con algunas expresiones Numpy vectorizadas podemos seleccionar los puntos relevantes del conjunto de datos y aplicar la función anterior:

I = np.searchsorted(altitudes, location) x1 = altitudes[I-1] x2 = altitudes[I] time = np.arange(len(alltemps)) y1 = alltemps[time,I-1] y2 = alltemps[time,I] xI = location yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)

El problema es que algunos puntos se encuentran en los límites de (o incluso fuera de) el rango conocido, lo que debe tenerse en cuenta:

I = np.searchsorted(altitudes, location) same = (location == altitudes.take(I, mode=''clip'')) out_of_range = ~same & ((I == 0) | (I == altitudes.size)) I[out_of_range] = 1 # Prevent index-errors x1 = altitudes[I-1] x2 = altitudes[I] time = np.arange(len(alltemps)) y1 = alltemps[time,I-1] y2 = alltemps[time,I] xI = location yI = y1 + (y2-y1) * (xI-x1) / (x2-x1) yI[out_of_range] = np.nan

Afortunadamente, Scipy ya proporciona interpolación ND, que también se encarga de los tiempos de desajuste, por ejemplo:

from scipy.interpolate import interpn time = np.arange(len(alltemps)) M = 150 hiketime = np.linspace(time[0], time[-1], M) location = np.linspace(altitudes[0], altitudes[-1], M) xI = np.column_stack((hiketime, location)) yI = interpn((time, altitudes), alltemps, xI)

Aquí hay un código de referencia (sin pandas realidad, pero sí incluí la solución de la otra respuesta):

import numpy as np from scipy.interpolate import interp1d, interpn def original(): return np.array([interp1d(altitudes, alltemps[i, :])(loc) for i, loc in enumerate(location)]) def OP_self_answer(): return np.diagonal(interp1d(altitudes, alltemps)(location)) def interp_checked(): I = np.searchsorted(altitudes, location) same = (location == altitudes.take(I, mode=''clip'')) out_of_range = ~same & ((I == 0) | (I == altitudes.size)) I[out_of_range] = 1 # Prevent index-errors x1 = altitudes[I-1] x2 = altitudes[I] time = np.arange(len(alltemps)) y1 = alltemps[time,I-1] y2 = alltemps[time,I] xI = location yI = y1 + (y2-y1) * (xI-x1) / (x2-x1) yI[out_of_range] = np.nan return yI def scipy_interpn(): time = np.arange(len(alltemps)) xI = np.column_stack((time, location)) yI = interpn((time, altitudes), alltemps, xI) return yI N, sigma = 1000., 5 basetemps = 70 + (np.random.randn(N) * sigma) midtemps = 50 + (np.random.randn(N) * sigma) toptemps = 40 + (np.random.randn(N) * sigma) trend = np.sin(4 / N * np.arange(N)) * 30 trend = trend[:, np.newaxis] alltemps = np.array([basetemps, midtemps, toptemps]).T + trend altitudes = np.array([500, 1500, 4000], dtype=float) location = np.linspace(altitudes[0], altitudes[-1], N) funcs = [original, interp_checked, scipy_interpn] for func in funcs: print(func.func_name) %timeit func() from itertools import combinations outs = [func() for func in funcs] print(''Output allclose:'') print([np.allclose(out1, out2) for out1, out2 in combinations(outs, 2)])

Con el siguiente resultado en mi sistema:

original 10 loops, best of 3: 184 ms per loop OP_self_answer 10 loops, best of 3: 89.3 ms per loop interp_checked 1000 loops, best of 3: 224 µs per loop scipy_interpn 1000 loops, best of 3: 1.36 ms per loop Output allclose: [True, True, True, True, True, True]

La interfaz de interpn sufre un poco en términos de velocidad en comparación con el método más rápido, pero por su generalidad y facilidad de uso es definitivamente el camino a seguir.


Voy a ofrecer un poco de progreso. En el segundo caso (interpolando "a lo largo de un camino") estamos realizando muchas funciones de interpolación diferentes. Una cosa que podríamos intentar es hacer solo una función de interpolación (una que haga interpolación en la dimensión de altitud en todos los tiempos como en el primer caso anterior) y evaluar esa función una y otra vez (de forma vectorializada). Eso nos daría mucha más información de la que queremos (nos daría una matriz de 1.000 x 1.000 en lugar de un vector de 1.000 elementos). Pero entonces nuestro resultado objetivo sería simplemente a lo largo de la diagonal. Entonces, la pregunta es, ¿llamar a una sola función en forma de argumentos más complejos se ejecuta más rápido que hacer muchas funciones y llamarlas con argumentos simples?

¡La respuesta es sí!

La clave es que la función de interpolación devuelta por scipy.interpolate.interp1d es capaz de aceptar un numpy.ndarray como su entrada. Por lo tanto, puede llamar a la función de interpolación muchas veces a la velocidad C introduciendo una entrada vectorial. Es decir, es mucho más rápido que escribir un bucle for que llama a la función de interpolación una y otra vez en una entrada escalar. Entonces, mientras calculamos muchos puntos de datos que terminamos tirando, ahorramos aún más tiempo al no construir muchas funciones de interpolación diferentes que apenas usamos.

old_way = interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) for i, loc in enumerate(location)]) # look ma, no for loops! new_way = np.diagonal(interp1d(altitudes, finaltemps.values)(location)) # note, `location` is a vector! abs(old_way - new_way).max() #-> 0.0

y todavía:

%%timeit res = np.diagonal(interp1d(altitudes, finaltemps.values)(location)) #-> 100 loops, best of 3: 16.7 ms per loop

Así que este enfoque nos da un factor de 10 mejor! ¿Alguien puede hacerlo mejor? ¿O sugerir un enfoque completamente diferente?