algorithm - que - ¿Cómo optimizar este código de Haskell resumiendo los números primos en tiempo sublineal?
números primos del 1 al 100 (4)
Este código mío evalúa la suma a 2⋅10 ^ 9 en 0.3 segundos y la suma a 10 ^ 12 (18435588552550705911377) en 19.6 segundos (si se le da suficiente RAM).
import Control.DeepSeq
import qualified Control.Monad as ControlMonad
import qualified Data.Array as Array
import qualified Data.Array.ST as ArrayST
import qualified Data.Array.Base as ArrayBase
primeLucy :: (Integer -> Integer) -> (Integer -> Integer) -> Integer -> (Integer->Integer)
primeLucy f sf n = g
where
r = fromIntegral $ integerSquareRoot n
ni = fromIntegral n
loop from to c = let go i = ControlMonad.when (to<=i) (c i >> go (i-1)) in go from
k = ArrayST.runSTArray $ do
k <- ArrayST.newListArray (-r,r) $ force $
[sf (div n (toInteger i)) - sf 1|i<-[r,r-1..1]] ++
[0] ++
[sf (toInteger i) - sf 1|i<-[1..r]]
ControlMonad.forM_ (takeWhile (<=r) primes) $ /p -> do
l <- ArrayST.readArray k (p-1)
let q = force $ f (toInteger p)
let adjust = /i j -> do { v <- ArrayBase.unsafeRead k (i+r); w <- ArrayBase.unsafeRead k (j+r); ArrayBase.unsafeWrite k (i+r) $!! v+q*(l-w) }
loop (-1) (-div r p) $ /i -> adjust i (i*p)
loop (-div r p-1) (-min r (div ni (p*p))) $ /i -> adjust i (div (-ni) (i*p))
loop r (p*p) $ /i -> adjust i (div i p)
return k
g :: Integer -> Integer
g m
| m >= 1 && m <= integerSquareRoot n = k Array.! (fromIntegral m)
| m >= integerSquareRoot n && m <= n && div n (div n m)==m = k Array.! (fromIntegral (negate (div n m)))
| otherwise = error $ "Function not precalculated for value " ++ show m
primeSum :: Integer -> Integer
primeSum n = (primeLucy id (/m -> div (m*m+m) 2) n) n
Si su función integerSquareRoot
está integerSquareRoot
(como se dice, algunos), puede reemplazarla aquí por floor . sqrt . fromIntegral
floor . sqrt . fromIntegral
floor . sqrt . fromIntegral
.
Explicación:
Como su nombre indica, se basa en una generalización del famoso método de "Lucy Hedgehog" que finalmente descubrió el cartel original.
Te permite calcular muchas sumas de la forma. (con p primo) sin enumerar todos los primos hasta N y en el tiempo O (N ^ 0.75).
Sus entradas son la función f (es decir, id
si desea la suma principal), su función sumatoria sobre todos los enteros (es decir, en ese caso, la suma de los primeros m enteros o div (m*m+m) 2
), y N.
PrimeLucy
devuelve una función de búsqueda (con p prime) restringido a ciertos valores de n: .
El problema 10 del Proyecto Euler es encontrar la suma de todos los números primos que se encuentran debajo de n .
Lo resolví simplemente resumiendo los números primos generados por el tamiz de Eratóstenes. Luego me encontré con una solución mucho más eficiente por Lucy_Hedgehog (sub-lineal!).
Para n = 2⋅10 ^ 9 :
El código de Python (de la cita anterior) se ejecuta en 1.2 segundos en Python 2.7.3.
El código C ++ (mío) se ejecuta en aproximadamente 0,3 segundos (compilado con g ++ 4.8.4).
Reimplanté el mismo algoritmo en Haskell, ya que lo estoy aprendiendo:
import Data.List
import Data.Map (Map, (!))
import qualified Data.Map as Map
problem10 :: Integer -> Integer
problem10 n = (sieve (Map.fromList [(i, i * (i + 1) `div` 2 - 1) | i <- vs]) 2 r vs) ! n
where vs = [n `div` i | i <- [1..r]] ++ reverse [1..n `div` r - 1]
r = floor (sqrt (fromIntegral n))
sieve :: Map Integer Integer -> Integer -> Integer -> [Integer] -> Map Integer Integer
sieve m p r vs | p > r = m
| otherwise = sieve (if m ! p > m ! (p - 1) then update m vs p else m) (p + 1) r vs
update :: Map Integer Integer -> [Integer] -> Integer -> Map Integer Integer
update m vs p = foldl'' decrease m (map (/v -> (v, sumOfSieved m v p)) (takeWhile (>= p*p) vs))
decrease :: Map Integer Integer -> (Integer, Integer) -> Map Integer Integer
decrease m (k, v) = Map.insertWith (flip (-)) k v m
sumOfSieved :: Map Integer Integer -> Integer -> Integer -> Integer
sumOfSieved m v p = p * (m ! (v `div` p) - m ! (p - 1))
main = print $ problem10 $ 2*10^9
Lo compilé con ghc -O2 10.hs
y lo ghc -O2 10.hs
con el time ./10
.
Da la respuesta correcta, pero tarda unos 7 segundos.
Lo compilé con ghc -prof -fprof-auto -rtsopts 10
y lo ./10 +RTS -p -h
con ./10 +RTS -p -h
.
10.prof muestra que la decrease
lleva 52.2% de tiempo y 67.5% de asignaciones.
Después de ejecutar hp2ps 10.hp
obtuve tal perfil de pila:
Una vez más parece que la decrease
toma la mayor parte del montón. GHC versión 7.6.3.
¿Cómo optimizarías el tiempo de ejecución de este código Haskell?
Actualización 13.06.17:
tried reemplazar Data.Map
inmutable con Data.HashTable.IO.BasicHashTable
mutable del paquete Data.HashTable.IO.BasicHashTable
, pero probablemente esté haciendo algo mal, ya que para minúscula = 30 ya lleva demasiado tiempo, aproximadamente 10 segundos. Que pasa
Actualización 18.06.17:
Curioso sobre los problemas de rendimiento de HashTable es una buena lectura. Tomé el code de Sherh usando Data.HashTable.ST.Linear
mutable, pero dejé caer Data.Judy
en su lugar . Se ejecuta en 1.1 segundos, todavía relativamente lento.
He hecho algunas pequeñas mejoras para que se ejecute en 3.4-3.5
segundos en mi máquina. El uso de IntMap.Strict
ayudó mucho. Aparte de eso, solo realicé manualmente algunas optimizaciones de ghc
solo para estar seguro. Y haz que el código Haskell esté más cerca del código Python desde tu enlace. Como siguiente paso, podrías intentar usar un HashMap
mutable. Pero no estoy seguro ... IntMap
no puede ser mucho más rápido que un contenedor mutable porque es uno inmutable. Aunque todavía estoy sorprendido por su eficiencia. Espero que esto se pueda implementar más rápido.
Aquí está el código:
import Data.List (foldl'')
import Data.IntMap.Strict (IntMap, (!))
import qualified Data.IntMap.Strict as IntMap
p :: Int -> Int
p n = (sieve (IntMap.fromList [(i, i * (i + 1) `div` 2 - 1) | i <- vs]) 2 r vs) ! n
where vs = [n `div` i | i <- [1..r]] ++ [n'', n'' - 1 .. 1]
r = floor (sqrt (fromIntegral n) :: Double)
n'' = n `div` r - 1
sieve :: IntMap Int -> Int -> Int -> [Int] -> IntMap Int
sieve m'' p'' r vs = go m'' p''
where
go m p | p > r = m
| m ! p > m ! (p - 1) = go (update m vs p) (p + 1)
| otherwise = go m (p + 1)
update :: IntMap Int -> [Int] -> Int -> IntMap Int
update s vs p = foldl'' decrease s (takeWhile (>= p2) vs)
where
sp = s ! (p - 1)
p2 = p * p
sumOfSieved v = p * (s ! (v `div` p) - sp)
decrease m v = IntMap.adjust (subtract $ sumOfSieved v) v m
main :: IO ()
main = print $ p $ 2*10^(9 :: Int)
ACTUALIZAR:
Usando tablas hashtables
mutables, he logrado lograr un rendimiento de hasta ~5.5sec
en Haskell con code .
Además, utilicé vectores sin caja en lugar de listas en varios lugares. Linear
hashing Linear
parece ser el más rápido. Creo que esto se puede hacer incluso más rápido. Noté la opción hasthables
en el paquete hasthables
. No estoy seguro de haber logrado configurarlo correctamente, pero incluso sin que se ejecute tan rápido.
ACTUALIZACIÓN 2 (19.06.2017)
Me las arreglé para hacerlo 3x
más rápido que la mejor solución de @Krom (usando mi código + su mapa) eliminando Judy hashmap en absoluto. En su lugar solo se utilizan matrices simples. Puede tener la misma idea si observa que las claves para S
hashmap son una secuencia de 1
a n''
o n div i
para i
de 1
a r
. Así que podemos representar a HashMap como dos matrices que hacen búsquedas en una matriz dependiendo de la clave de búsqueda.
Mi código + Judy HashMap
$ time ./judy
95673602693282040
real 0m0.590s
user 0m0.588s
sys 0m0.000s
Mi código + mi mapa disperso
$ time ./sparse
95673602693282040
real 0m0.203s
user 0m0.196s
sys 0m0.004s
Esto se puede hacer incluso más rápido si en lugar de IOUArray
vectores ya generados y la biblioteca de Vector
se utiliza y readArray
se reemplaza por unsafeRead
. Pero no creo que esto deba hacerse si no está realmente interesado en optimizar esto tanto como sea posible.
La comparación con esta solución es engañosa y no es justa. Espero que las mismas ideas implementadas en Python y C ++ sean aún más rápidas. Pero la solución @Krom con hashmap cerrado ya está haciendo trampa porque usa una estructura de datos personalizada en lugar de una estándar. Al menos puedes ver que los mapas hash estándar y más populares en Haskell no son tan rápidos. El uso de mejores algoritmos y mejores estructuras de datos ad-hoc puede ser mejor para tales problemas.
Primero, como línea de base, los tiempos de los enfoques existentes en mi máquina:
Programa original publicado en la pregunta:
time stack exec primorig 95673602693282040 real 0m4.601s user 0m4.387s sys 0m0.251s
En segundo lugar, la versión con
Data.IntMap.Strict
desde triedtime stack exec primIntMapStrict 95673602693282040 real 0m2.775s user 0m2.753s sys 0m0.052s
Código Shershs con Data.Judy cayó here
time stack exec prim-hash2 95673602693282040 real 0m0.945s user 0m0.955s sys 0m0.028s
Tu solución de pitón.
Lo compilé con
python -O -m py_compile problem10.py
y el tiempo:
time python __pycache__/problem10.cpython-36.opt-1.pyc 95673602693282040 real 0m1.163s user 0m1.160s sys 0m0.003s
Su versión de C ++:
$ g++ -O2 --std=c++11 p10.cpp -o p10 $ time ./p10 sum(2000000000) = 95673602693282040 real 0m0.314s user 0m0.310s sys 0m0.003s
No me molesté en proporcionar una línea de base para slow.hs, ya que no quería esperar a que se completara cuando se ejecutara con un argumento de 2*10^9
.
Desempeño subsecundario
El siguiente programa se ejecuta en menos de un segundo en mi máquina.
Utiliza un hashmap enrollado a mano, que utiliza hash cerrado con sondeo lineal y usa alguna variante de hash function knuths, consulte here .
Ciertamente, está algo adaptado al caso, ya que la función de búsqueda, por ejemplo, espera que las claves buscadas estén presentes.
Tiempos:
time stack exec prim
95673602693282040
real 0m0.725s
user 0m0.714s
sys 0m0.047s
Primero implementé mi hashmap enrollado a mano simplemente para codificar las teclas con
key `mod` size
y seleccionó un tamaño varias veces mayor que la entrada esperada, pero el programa tardó 22 o más en completarse.
Finalmente, fue cuestión de elegir una función hash que fuera buena para la carga de trabajo.
Aquí está el programa:
import Data.Maybe
import Control.Monad
import Data.Array.IO
import Data.Array.Base (unsafeRead)
type Number = Int
data Map = Map { keys :: IOUArray Int Number
, values :: IOUArray Int Number
, size :: !Int
, factor :: !Int
}
newMap :: Int -> Int -> IO Map
newMap s f = do
k <- newArray (0, s-1) 0
v <- newArray (0, s-1) 0
return $ Map k v s f
storeKey :: IOUArray Int Number -> Int -> Int -> Number -> IO Int
storeKey arr s f key = go ((key * f) `mod` s)
where
go :: Int -> IO Int
go ind = do
v <- readArray arr ind
go2 v ind
go2 v ind
| v == 0 = do { writeArray arr ind key; return ind; }
| v == key = return ind
| otherwise = go ((ind + 1) `mod` s)
loadKey :: IOUArray Int Number -> Int -> Int -> Number -> IO Int
loadKey arr s f key = s `seq` key `seq` go ((key *f) `mod` s)
where
go :: Int -> IO Int
go ix = do
v <- unsafeRead arr ix
if v == key then return ix else go ((ix + 1) `mod` s)
insertIntoMap :: Map -> (Number, Number) -> IO Map
insertIntoMap m@(Map ks vs s f) (k, v) = do
ix <- storeKey ks s f k
writeArray vs ix v
return m
fromList :: Int -> Int -> [(Number, Number)] -> IO Map
fromList s f xs = do
m <- newMap s f
foldM insertIntoMap m xs
(!) :: Map -> Number -> IO Number
(!) (Map ks vs s f) k = do
ix <- loadKey ks s f k
readArray vs ix
mupdate :: Map -> Number -> (Number -> Number) -> IO ()
mupdate (Map ks vs s fac) i f = do
ix <- loadKey ks s fac i
old <- readArray vs ix
let x'' = f old
x'' `seq` writeArray vs ix x''
r'' :: Number -> Number
r'' = floor . sqrt . fromIntegral
vs'' :: Integral a => a -> a -> [a]
vs'' n r = [n `div` i | i <- [1..r]] ++ reverse [1..n `div` r - 1]
vss'' n r = r + n `div` r -1
list'' :: Int -> Int -> [Number] -> IO Map
list'' s f vs = fromList s f [(i, i * (i + 1) `div` 2 - 1) | i <- vs]
problem10 :: Number -> IO Number
problem10 n = do
m <- list'' (19*vss) (19*vss+7) vs
nm <- sieve m 2 r vs
nm ! n
where vs = vs'' n r
vss = vss'' n r
r = r'' n
sieve :: Map -> Number -> Number -> [Number] -> IO Map
sieve m p r vs | p > r = return m
| otherwise = do
v1 <- m ! p
v2 <- m ! (p - 1)
nm <- if v1 > v2 then update m vs p else return m
sieve nm (p + 1) r vs
update :: Map -> [Number] -> Number -> IO Map
update m vs p = foldM (decrease p) m $ takeWhile (>= p*p) vs
decrease :: Number -> Map -> Number -> IO Map
decrease p m k = do
v <- sumOfSieved m k p
mupdate m k (subtract v)
return m
sumOfSieved :: Map -> Number -> Number -> IO Number
sumOfSieved m v p = do
v1 <- m ! (v `div` p)
v2 <- m ! (p - 1)
return $ p * (v1 - v2)
main = do { n <- problem10 (2*10^9) ; print n; } -- 2*10^9
No soy un profesional con el hash y ese tipo de cosas, por lo que ciertamente se puede mejorar mucho. Tal vez nosotros, los Haskellers, debamos mejorar los mapas hash de estante o proporcionar algunos más simples.
Mi mapa de hash, código Shershs
Si conecto mi hashmap en el código de Shershs (vea la respuesta a continuación), vea here estamos incluso a
time stack exec prim-hash2
95673602693282040
real 0m0.601s
user 0m0.604s
sys 0m0.034s
¿Por qué es lento. Es lento?
Si leyó la fuente para la insert
de la función en Data.HashTable.ST.Basic
, verá que elimina el antiguo par de valores clave e inserta uno nuevo. No busca el "lugar" para el valor y lo muta, como uno podría imaginar, si uno lee que es una tabla hash "mutable". Aquí, la tabla hash en sí misma es mutable, por lo que no es necesario que copie la tabla hash completa para insertar un nuevo par de valores clave, pero los lugares de valor para los pares no lo son. No sé si esa es toda la historia de slow.hs es lento, pero supongo que es una parte muy importante de esto.
Algunas mejoras menores.
Esa es la idea que seguí al intentar mejorar su programa la primera vez.
Vea, no necesita una asignación mutable de claves a valores. Su conjunto de claves es fijo. Desea una asignación de claves a lugares mutables. (Que es, por cierto, lo que obtienes de C ++ por defecto).
Y así traté de llegar a eso. IntMap IORef
de Data.IntMap.Strict
y Data.IORef
y obtuve un tiempo de
tack exec prim
95673602693282040
real 0m2.134s
user 0m2.141s
sys 0m0.028s
Pensé que tal vez ayudaría trabajar con valores no encajonados y para obtener eso, usé IOUArray Int Int
con 1 elemento cada uno en lugar de IORef
y obtuve esos tiempos:
time stack exec prim
95673602693282040
real 0m2.015s
user 0m2.018s
sys 0m0.038s
No hay mucha diferencia y, por lo tanto, traté de deshacerme de la verificación de límites en las matrices de 1 elemento utilizando unsafeRead
y unsafeWrite
y obtuve un tiempo de
time stack exec prim
95673602693282040
real 0m1.845s
user 0m1.850s
sys 0m0.030s
que fue lo mejor que obtuve usando Data.IntMap.Strict
.
Por supuesto, ejecuté cada programa varias veces para ver si los tiempos son estables y las diferencias en el tiempo de ejecución no son solo ruido.
Parece que estas son solo micro-optimizaciones.
Y aquí está el programa que se ejecutó más rápido sin utilizar una estructura de datos enrollada a mano:
import qualified Data.IntMap.Strict as M
import Control.Monad
import Data.Array.IO
import Data.Array.Base (unsafeRead, unsafeWrite)
type Number = Int
type Place = IOUArray Number Number
type Map = M.IntMap Place
tupleToRef :: (Number, Number) -> IO (Number, Place)
tupleToRef = traverse (newArray (0,0))
insertRefs :: [(Number, Number)] -> IO [(Number, Place)]
insertRefs = traverse tupleToRef
fromList :: [(Number, Number)] -> IO Map
fromList xs = M.fromList <$> insertRefs xs
(!) :: Map -> Number -> IO Number
(!) m i = unsafeRead (m M.! i) 0
mupdate :: Map -> Number -> (Number -> Number) -> IO ()
mupdate m i f = do
let place = m M.! i
old <- unsafeRead place 0
let x'' = f old
-- make the application of f strict
x'' `seq` unsafeWrite place 0 x''
r'' :: Number -> Number
r'' = floor . sqrt . fromIntegral
vs'' :: Integral a => a -> a -> [a]
vs'' n r = [n `div` i | i <- [1..r]] ++ reverse [1..n `div` r - 1]
list'' :: [Number] -> IO Map
list'' vs = fromList [(i, i * (i + 1) `div` 2 - 1) | i <- vs]
problem10 :: Number -> IO Number
problem10 n = do
m <- list'' vs
nm <- sieve m 2 r vs
nm ! n
where vs = vs'' n r
r = r'' n
sieve :: Map -> Number -> Number -> [Number] -> IO Map
sieve m p r vs | p > r = return m
| otherwise = do
v1 <- m ! p
v2 <- m ! (p - 1)
nm <- if v1 > v2 then update m vs p else return m
sieve nm (p + 1) r vs
update :: Map -> [Number] -> Number -> IO Map
update m vs p = foldM (decrease p) m $ takeWhile (>= p*p) vs
decrease :: Number -> Map -> Number -> IO Map
decrease p m k = do
v <- sumOfSieved m k p
mupdate m k (subtract v)
return m
sumOfSieved :: Map -> Number -> Number -> IO Number
sumOfSieved m v p = do
v1 <- m ! (v `div` p)
v2 <- m ! (p - 1)
return $ p * (v1 - v2)
main = do { n <- problem10 (2*10^9) ; print n; } -- 2*10^9
Si hace un perfil de eso, verá que pasa la mayor parte del tiempo en la función de búsqueda personalizada (!)
, No sabe cómo mejorarlo más. Intentar en línea (!)
Con {-# INLINE (!) #-}
no dio mejores resultados; tal vez ghc ya hizo esto.
Prueba esto y hazme saber lo rápido que es:
-- sum of primes
import Control.Monad (forM_, when)
import Control.Monad.ST
import Data.Array.ST
import Data.Array.Unboxed
sieve :: Int -> UArray Int Bool
sieve n = runSTUArray $ do
let m = (n-1) `div` 2
r = floor . sqrt $ fromIntegral n
bits <- newArray (0, m-1) True
forM_ [0 .. r `div` 2 - 1] $ /i -> do
isPrime <- readArray bits i
when isPrime $ do
let a = 2*i*i + 6*i + 3
b = 2*i*i + 8*i + 6
forM_ [a, b .. (m-1)] $ /j -> do
writeArray bits j False
return bits
primes :: Int -> [Int]
primes n = 2 : [2*i+3 | (i, True) <- assocs $ sieve n]
main = do
print $ sum $ primes 1000000
Puedes ejecutarlo en ideone . Mi algoritmo es el Tamiz de Eratóstenes, y debería ser bastante rápido para n pequeña. Para n = 2,000,000,000, el tamaño de la matriz puede ser un problema, en cuyo caso necesitará usar un tamiz segmentado. Ver mi blog para más información sobre el Tamiz de Eratóstenes. Consulte esta respuesta para obtener información sobre un tamiz segmentado (pero no en Haskell, desafortunadamente).