python - interpolate - scipy monotonic cubic interpolation
Interpolación lineal rápida en Numpy/Scipy "a lo largo de un camino" (3)
Digamos que tengo datos de estaciones meteorológicas en 3 altitudes (conocidas) en una montaña. Específicamente, cada estación registra una medición de temperatura en su ubicación cada minuto. Tengo dos tipos de interpolación que me gustaría realizar. Y me gustaría poder realizar cada uno rápidamente.
Así que vamos a configurar algunos datos:
import numpy as np
from scipy.interpolate import interp1d
import pandas as pd
import seaborn as sns
np.random.seed(0)
N, sigma = 1000., 5
basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
alltemps = np.array([basetemps, midtemps, toptemps]).T # note transpose!
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
altitudes = np.array([500, 1500, 4000]).astype(float)
finaltemps = pd.DataFrame(alltemps + trend, columns=altitudes)
finaltemps.index.names, finaltemps.columns.names = [''Time''], [''Altitude'']
finaltemps.plot()
Genial, entonces nuestras temperaturas se ven así:
Interpolar todos los tiempos a la misma altitud:
Creo que este es bastante sencillo. Digamos que quiero obtener la temperatura a una altitud de 1,000 para cada momento. Solo puedo usar métodos de interpolación scipy
:
interping_function = interp1d(altitudes, finaltemps.values)
interped_to_1000 = interping_function(1000)
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_to_1000, label=''Interped'')
ax.legend(loc=''best'', title=finaltemps.columns.name)
Esto funciona muy bien. Y veamos acerca de la velocidad:
%%timeit
res = interp1d(altitudes, finaltemps.values)(1000)
#-> 1000 loops, best of 3: 207 µs per loop
Interpolar "a lo largo de un camino":
Así que ahora tengo un segundo problema relacionado. Digamos que conozco la altitud de un grupo de excursionistas en función del tiempo, y quiero calcular la temperatura en su ubicación (en movimiento) mediante la interpolación lineal de mis datos a través del tiempo. En particular, los momentos en que sé la ubicación de la fiesta de senderismo son los mismos momentos en que sé las temperaturas en mis estaciones meteorológicas. Puedo hacer esto sin demasiado esfuerzo:
location = np.linspace(altitudes[0], altitudes[-1], N)
interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc)
for i, loc in enumerate(location)])
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_along_path, label=''Interped'')
ax.legend(loc=''best'', title=finaltemps.columns.name)
Así que esto funciona realmente bien, pero es importante tener en cuenta que la línea clave de arriba está usando la comprensión de lista para ocultar una enorme cantidad de trabajo. En el caso anterior, scipy
crea una única función de interpolación y la evalúa una vez en una gran cantidad de datos. En este caso, scipy
realidad está construyendo N
funciones de interpolación individuales y evaluando cada una con una pequeña cantidad de datos. Esto se siente inherentemente ineficiente. Hay un bucle for acechando aquí (en la lista de comprensión) y además, esto simplemente se siente flácido.
No en vano, esto es mucho más lento que el caso anterior:
%%timeit
res = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc)
for i, loc in enumerate(location)])
#-> 10 loops, best of 3: 145 ms per loop
Entonces el segundo ejemplo corre 1,000 más lento que el primero. Es decir, la idea de que el trabajo pesado es el paso "hacer una función de interpolación lineal" ... que ocurre 1.000 veces en el segundo ejemplo, pero solo una vez en el primero.
Entonces, la pregunta: ¿hay una mejor manera de abordar el segundo problema? Por ejemplo, ¿hay una buena manera de configurarlo con interpolación bidimensional (que tal vez podría manejar el caso en el que los momentos en que se conocen las ubicaciones de los excursionistas no son los tiempos en que se han muestreado las temperaturas)? ¿O hay una manera particularmente hábil de manejar las cosas aquí donde los tiempos se alinean? ¿U otro?
Para un punto fijo en el tiempo, puede utilizar la siguiente función de interpolación:
g(a) = cc[0]*abs(a-aa[0]) + cc[1]*abs(a-aa[1]) + cc[2]*abs(a-aa[2])
donde a
es la altitud del excursionista, aa
el vector con las 3 altitudes
medición y cc
es un vector con los coeficientes. Hay tres cosas a tener en cuenta:
- Para las temperaturas dadas (todos los
alltemps
) correspondientes aaa
, la determinación decc
se puede hacer resolviendo una ecuación de matriz lineal utilizandonp.linalg.solve()
. -
g(a)
es fácil de vectorizar para un (N,) dimensionala
y (N, 3) dimensionalcc
(incluyendonp.linalg.solve()
respectivamente). -
g(a)
se denomina núcleo de spline univariado de primer orden (para tres puntos). El uso deabs(a-aa[i])**(2*d-1)
cambiaría el orden de spline ad
. Este enfoque podría interpretarse como una versión simplificada de un Proceso Gaussiano en Aprendizaje Automático .
Entonces el código sería:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
# generate temperatures
np.random.seed(0)
N, sigma = 1000, 5
trend = np.sin(4 / N * np.arange(N)) * 30
alltemps = np.array([tmp0 + trend + sigma*np.random.randn(N)
for tmp0 in [70, 50, 40]])
# generate attitudes:
altitudes = np.array([500, 1500, 4000]).astype(float)
location = np.linspace(altitudes[0], altitudes[-1], N)
def doit():
""" do the interpolation, improved version for speed """
AA = np.vstack([np.abs(altitudes-a_i) for a_i in altitudes])
# This is slighty faster than np.linalg.solve(), because AA is small:
cc = np.dot(np.linalg.inv(AA), alltemps)
return (cc[0]*np.abs(location-altitudes[0]) +
cc[1]*np.abs(location-altitudes[1]) +
cc[2]*np.abs(location-altitudes[2]))
t_loc = doit() # call interpolator
# do the plotting:
fg, ax = plt.subplots(num=1)
for alt, t in zip(altitudes, alltemps):
ax.plot(t, label="%d feet" % alt, alpha=.5)
ax.plot(t_loc, label="Interpolation")
ax.legend(loc="best", title="Altitude:")
ax.set_xlabel("Time")
ax.set_ylabel("Temperature")
fg.canvas.draw()
Midiendo el tiempo da:
In [2]: %timeit doit()
10000 loops, best of 3: 107 µs per loop
Actualización: reemplacé la lista de comprensión original en doit()
para importar la velocidad en un 30% (para N=1000
).
Además, tal como se solicitó para la comparación, el bloque de código de referencia de @ moarningsun en mi máquina:
10 loops, best of 3: 110 ms per loop
interp_checked
10000 loops, best of 3: 83.9 µs per loop
scipy_interpn
1000 loops, best of 3: 678 µs per loop
Output allclose:
[True, True, True]
Tenga en cuenta que N=1000
es un número relativamente pequeño. Usando N=100000
produce los resultados:
interp_checked
100 loops, best of 3: 8.37 ms per loop
%timeit doit()
100 loops, best of 3: 5.31 ms per loop
Esto muestra que este enfoque se escala mejor para N
grande que el enfoque interp_checked
.
Una interpolación lineal entre dos valores y1
, y2
en las ubicaciones x1
y x2
, con respecto al punto xi
es simplemente:
yi = y1 + (y2-y1) * (xi-x1) / (x2-x1)
Con algunas expresiones Numpy vectorizadas podemos seleccionar los puntos relevantes del conjunto de datos y aplicar la función anterior:
I = np.searchsorted(altitudes, location)
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
El problema es que algunos puntos se encuentran en los límites de (o incluso fuera de) el rango conocido, lo que debe tenerse en cuenta:
I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode=''clip''))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1 # Prevent index-errors
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan
Afortunadamente, Scipy ya proporciona interpolación ND, que también se encarga de los tiempos de desajuste, por ejemplo:
from scipy.interpolate import interpn
time = np.arange(len(alltemps))
M = 150
hiketime = np.linspace(time[0], time[-1], M)
location = np.linspace(altitudes[0], altitudes[-1], M)
xI = np.column_stack((hiketime, location))
yI = interpn((time, altitudes), alltemps, xI)
Aquí hay un código de referencia (sin pandas
realidad, pero sí incluí la solución de la otra respuesta):
import numpy as np
from scipy.interpolate import interp1d, interpn
def original():
return np.array([interp1d(altitudes, alltemps[i, :])(loc)
for i, loc in enumerate(location)])
def OP_self_answer():
return np.diagonal(interp1d(altitudes, alltemps)(location))
def interp_checked():
I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode=''clip''))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1 # Prevent index-errors
x1 = altitudes[I-1]
x2 = altitudes[I]
time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]
xI = location
yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan
return yI
def scipy_interpn():
time = np.arange(len(alltemps))
xI = np.column_stack((time, location))
yI = interpn((time, altitudes), alltemps, xI)
return yI
N, sigma = 1000., 5
basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
alltemps = np.array([basetemps, midtemps, toptemps]).T + trend
altitudes = np.array([500, 1500, 4000], dtype=float)
location = np.linspace(altitudes[0], altitudes[-1], N)
funcs = [original, interp_checked, scipy_interpn]
for func in funcs:
print(func.func_name)
%timeit func()
from itertools import combinations
outs = [func() for func in funcs]
print(''Output allclose:'')
print([np.allclose(out1, out2) for out1, out2 in combinations(outs, 2)])
Con el siguiente resultado en mi sistema:
original
10 loops, best of 3: 184 ms per loop
OP_self_answer
10 loops, best of 3: 89.3 ms per loop
interp_checked
1000 loops, best of 3: 224 µs per loop
scipy_interpn
1000 loops, best of 3: 1.36 ms per loop
Output allclose:
[True, True, True, True, True, True]
La interfaz de interpn
sufre un poco en términos de velocidad en comparación con el método más rápido, pero por su generalidad y facilidad de uso es definitivamente el camino a seguir.
Voy a ofrecer un poco de progreso. En el segundo caso (interpolando "a lo largo de un camino") estamos realizando muchas funciones de interpolación diferentes. Una cosa que podríamos intentar es hacer solo una función de interpolación (una que haga interpolación en la dimensión de altitud en todos los tiempos como en el primer caso anterior) y evaluar esa función una y otra vez (de forma vectorializada). Eso nos daría mucha más información de la que queremos (nos daría una matriz de 1.000 x 1.000 en lugar de un vector de 1.000 elementos). Pero entonces nuestro resultado objetivo sería simplemente a lo largo de la diagonal. Entonces, la pregunta es, ¿llamar a una sola función en forma de argumentos más complejos se ejecuta más rápido que hacer muchas funciones y llamarlas con argumentos simples?
¡La respuesta es sí!
La clave es que la función de interpolación devuelta por scipy.interpolate.interp1d
es capaz de aceptar un numpy.ndarray
como su entrada. Por lo tanto, puede llamar a la función de interpolación muchas veces a la velocidad C introduciendo una entrada vectorial. Es decir, es mucho más rápido que escribir un bucle for que llama a la función de interpolación una y otra vez en una entrada escalar. Entonces, mientras calculamos muchos puntos de datos que terminamos tirando, ahorramos aún más tiempo al no construir muchas funciones de interpolación diferentes que apenas usamos.
old_way = interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc)
for i, loc in enumerate(location)])
# look ma, no for loops!
new_way = np.diagonal(interp1d(altitudes, finaltemps.values)(location))
# note, `location` is a vector!
abs(old_way - new_way).max()
#-> 0.0
y todavía:
%%timeit
res = np.diagonal(interp1d(altitudes, finaltemps.values)(location))
#-> 100 loops, best of 3: 16.7 ms per loop
Así que este enfoque nos da un factor de 10 mejor! ¿Alguien puede hacerlo mejor? ¿O sugerir un enfoque completamente diferente?