python viterbi

Implementación de Python del algoritmo de Viterbi



(7)

Estoy haciendo un proyecto de Python en el que me gustaría usar el algoritmo de Viterbi. ¿Alguien sabe de una implementación completa de Python del algoritmo Viterbi? La corrección de la de Wikipedia parece estar en cuestión en la página de discusión. ¿Alguien tiene un puntero?


Acabo de corregir la pseudo implementación de Viterbi en Wikipedia . Desde la versión inicial (incorrecta), me tomó un tiempo averiguar dónde estaba yendo mal, pero finalmente lo logré, gracias en parte a la implementación de viterbi_path.m por parte de Kevin Murphy en el viterbi_path.m de herramientas de MatLab HMM .

En el contexto de un objeto HMM con variables como se muestra:

hmm = HMM() hmm.priors = np.array([0.5, 0.5]) # pi = prior probs hmm.transition = np.array([[0.75, 0.25], # A = transition probs. / 2 states [0.32, 0.68]]) hmm.emission = np.array([[0.8, 0.1, 0.1], # B = emission (observation) probs. / 3 obs modes [0.1, 0.2, 0.7]])

La función de Python para ejecutar el algoritmo Viterbi (mejor ruta) está a continuación:

def viterbi (self,observations): """Return the best path, given an HMM model and a sequence of observations""" # A - initialise stuff nSamples = len(observations[0]) nStates = self.transition.shape[0] # number of states c = np.zeros(nSamples) #scale factors (necessary to prevent underflow) viterbi = np.zeros((nStates,nSamples)) # initialise viterbi table psi = np.zeros((nStates,nSamples)) # initialise the best path table best_path = np.zeros(nSamples); # this will be your output # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b) viterbi[:,0] = self.priors.T * self.emission[:,observations(0)] c[0] = 1.0/np.sum(viterbi[:,0]) viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor psi[0] = 0; # C- Do the iterations for viterbi and psi for time>0 until T for t in range(1,nSamples): # loop through time for s in range (0,nStates): # loop through the states @(t-1) trans_p = viterbi[:,t-1] * self.transition[:,s] psi[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1)) viterbi[s,t] = viterbi[s,t]*self.emission[s,observations(t)] c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor viterbi[:,t] = c[t] * viterbi[:,t] # D - Back-tracking best_path[nSamples-1] = viterbi[:,nSamples-1].argmax() # last state for t in range(nSamples-1,0,-1): # states of (last-1)th to 0th time step best_path[t-1] = psi[best_path[t],t] return best_path


Aquí está el mío. Es parafraseado directamente de la implementación de pseudocódigo de wikipedia . Utiliza numpy para la comprensión de su ndarray pero, por lo demás, es una implementación pura de python3.

import numpy as np def viterbi(y, A, B, Pi=None): """ Return the MAP estimate of state trajectory of Hidden Markov Model. Parameters ---------- y : array (T,) Observation state sequence. int dtype. A : array (K, K) State transition matrix. See HiddenMarkovModel.state_transition for details. B : array (K, M) Emission matrix. See HiddenMarkovModel.emission for details. Pi: optional, (K,) Initial state probabilities: Pi[i] is the probability x[0] == i. If None, uniform initial distribution is assumed (Pi[:] == 1/K). Returns ------- x : array (T,) Maximum a posteriori probability estimate of hidden state trajectory, conditioned on observation sequence y under the model parameters A, B, Pi. T1: array (K, T) the probability of the most likely path so far T2: array (K, T) the x_j-1 of the most likely path so far """ # Cardinality of the state space K = A.shape[0] # Initialize the priors with default (uniform dist) if not given by caller Pi = Pi if Pi is not None else np.full(K, 1 / K) T = len(y) T1 = np.empty((K, T), ''d'') T2 = np.empty((K, T), ''B'') # Initilaize the tracking tables from first observation T1[:, 0] = Pi * B[:, y[0]] T2[:, 0] = 0 # Iterate throught the observations updating the tracking tables for i in range(1, T): T1[:, i] = np.max(T1[:, i - 1] * A.T * B[np.newaxis, :, y[i]].T, 1) T2[:, i] = np.argmax(T1[:, i - 1] * A.T, 1) # Build the output, optimal model trajectory x = np.empty(T, ''B'') x[-1] = np.argmax(T1[:, T - 1]) for i in reversed(range(1, T)): x[i - 1] = T2[x[i], i] return x, T1, T2


Encontré el siguiente code en el repositorio de ejemplo de Inteligencia Artificial: Un Enfoque Moderno . ¿Es algo como esto lo que estás buscando?

def viterbi_segment(text, P): """Find the best segmentation of the string of characters, given the UnigramTextModel P.""" # best[i] = best probability for text[0:i] # words[i] = best word ending at position i n = len(text) words = [''''] + list(text) best = [1.0] + [0.0] * n ## Fill in the vectors best, words via dynamic programming for i in range(n+1): for j in range(0, i): w = text[j:i] if P[w] * best[i - len(w)] >= best[i]: best[i] = P[w] * best[i - len(w)] words[i] = w ## Now recover the sequence of best words sequence = []; i = len(words)-1 while i > 0: sequence[0:0] = [words[i]] i = i - len(words[i]) ## Return sequence of best words and overall probability return sequence, best[-1]


Esta es una pregunta antigua, pero ninguna de las otras respuestas era lo que necesitaba porque mi aplicación no tiene estados observados específicos.

Tomando después de @Rhubarb, también he vuelto a implementar la implementación de Matlab de Kevin Murphey (ver viterbi_path.m ), pero la he mantenido más cerca del original. He incluido un caso de prueba simple también.

import numpy as np def viterbi_path(prior, transmat, obslik, scaled=True, ret_loglik=False): ''''''Finds the most-probable (Viterbi) path through the HMM state trellis Notation: Z[t] := Observation at time t Q[t] := Hidden state at time t Inputs: prior: np.array(num_hid) prior[i] := Pr(Q[0] == i) transmat: np.ndarray((num_hid,num_hid)) transmat[i,j] := Pr(Q[t+1] == j | Q[t] == i) obslik: np.ndarray((num_hid,num_obs)) obslik[i,t] := Pr(Z[t] | Q[t] == i) scaled: bool whether or not to normalize the probability trellis along the way doing so prevents underflow by repeated multiplications of probabilities ret_loglik: bool whether or not to return the log-likelihood of the best path Outputs: path: np.array(num_obs) path[t] := Q[t] '''''' num_hid = obslik.shape[0] # number of hidden states num_obs = obslik.shape[1] # number of observations (not observation *states*) # trellis_prob[i,t] := Pr((best sequence of length t-1 goes to state i), Z[1:(t+1)]) trellis_prob = np.zeros((num_hid,num_obs)) # trellis_state[i,t] := best predecessor state given that we ended up in state i at t trellis_state = np.zeros((num_hid,num_obs), dtype=int) # int because its elements will be used as indicies path = np.zeros(num_obs, dtype=int) # int because its elements will be used as indicies trellis_prob[:,0] = prior * obslik[:,0] # element-wise mult if scaled: scale = np.ones(num_obs) # only instantiated if necessary to save memory scale[0] = 1.0 / np.sum(trellis_prob[:,0]) trellis_prob[:,0] *= scale[0] trellis_state[:,0] = 0 # arbitrary value since t == 0 has no predecessor for t in xrange(1, num_obs): for j in xrange(num_hid): trans_probs = trellis_prob[:,t-1] * transmat[:,j] # element-wise mult trellis_state[j,t] = trans_probs.argmax() trellis_prob[j,t] = trans_probs[trellis_state[j,t]] # max of trans_probs trellis_prob[j,t] *= obslik[j,t] if scaled: scale[t] = 1.0 / np.sum(trellis_prob[:,t]) trellis_prob[:,t] *= scale[t] path[-1] = trellis_prob[:,-1].argmax() for t in range(num_obs-2, -1, -1): path[t] = trellis_state[(path[t+1]), t+1] if not ret_loglik: return path else: if scaled: loglik = -np.sum(np.log(scale)) else: p = trellis_prob[path[-1],-1] loglik = np.log(p) return path, loglik if __name__==''__main__'': # Assume there are 3 observation states, 2 hidden states, and 5 observations priors = np.array([0.5, 0.5]) transmat = np.array([ [0.75, 0.25], [0.32, 0.68]]) emmat = np.array([ [0.8, 0.1, 0.1], [0.1, 0.2, 0.7]]) observations = np.array([0, 1, 2, 1, 0], dtype=int) obslik = np.array([emmat[:,z] for z in observations]).T print viterbi_path(priors, transmat, obslik) #=> [0 1 1 1 0] print viterbi_path(priors, transmat, obslik, scaled=False) #=> [0 1 1 1 0] print viterbi_path(priors, transmat, obslik, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -7.776472586614755) print viterbi_path(priors, transmat, obslik, scaled=False, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -8.0120386579275227)

Tenga en cuenta que esta implementación no utiliza las probabilidades de emisión directamente, sino que utiliza una variable obslik . En general, las emissions[i,j] := Pr(observed_state == j | hidden_state == i) para un estado observado particular i , haciendo emissions.shape == (num_hidden_states, num_obs_states) .

Sin embargo, dada una secuencia de observations[t] := observation at time t , todo lo que requiere el algoritmo de Viterbi es la probabilidad de esa observación para cada estado oculto. Por lo tanto, obslik[i,t] := Pr(observations[t] | hidden_state == i) . El valor real del estado observado no es necesario.


He modificado la respuesta de @ Rhubarb para la condición en la que ya se conocen las probabilidades marginales (por ejemplo, calculando el algoritmo de avance hacia atrás).

def viterbi (transition_probabilities, conditional_probabilities): # Initialise everything num_samples = conditional_probabilities.shape[1] num_states = transition_probabilities.shape[0] # number of states c = np.zeros(num_samples) #scale factors (necessary to prevent underflow) viterbi = np.zeros((num_states,num_samples)) # initialise viterbi table best_path_table = np.zeros((num_states,num_samples)) # initialise the best path table best_path = np.zeros(num_samples).astype(np.int32) # this will be your output # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b) viterbi[:,0] = conditional_probabilities[:,0] c[0] = 1.0/np.sum(viterbi[:,0]) viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor # C- Do the iterations for viterbi and psi for time>0 until T for t in range(1, num_samples): # loop through time for s in range (0,num_states): # loop through the states @(t-1) trans_p = viterbi[:, t-1] * transition_probabilities[:,s] # transition probs of each state transitioning best_path_table[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1)) viterbi[s,t] = viterbi[s,t] * conditional_probabilities[s][t] c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor viterbi[:,t] = c[t] * viterbi[:,t] ## D - Back-tracking best_path[num_samples-1] = viterbi[:,num_samples-1].argmax() # last state for t in range(num_samples-1,0,-1): # states of (last-1)th to 0th time step best_path[t-1] = best_path_table[best_path[t],t] return best_path


Hmm puedo publicar el mío. Aunque no es bonito, por favor avíseme si necesita una aclaración. Escribí esto relativamente recientemente para una parte específica del etiquetado de voz.

class Trellis: trell = [] def __init__(self, hmm, words): self.trell = [] temp = {} for label in hmm.labels: temp[label] = [0,None] for word in words: self.trell.append([word,copy.deepcopy(temp)]) self.fill_in(hmm) def fill_in(self,hmm): for i in range(len(self.trell)): for token in self.trell[i][1]: word = self.trell[i][0] if i == 0: self.trell[i][1][token][0] = hmm.e(token,word) else: max = None guess = None c = None for k in self.trell[i-1][1]: c = self.trell[i-1][1][k][0] + hmm.t(k,token) if max == None or c > max: max = c guess = k max += hmm.e(token,word) self.trell[i][1][token][0] = max self.trell[i][1][token][1] = guess def return_max(self): tokens = [] token = None for i in range(len(self.trell)-1,-1,-1): if token == None: max = None guess = None for k in self.trell[i][1]: if max == None or self.trell[i][1][k][0] > max: max = self.trell[i][1][k][0] token = self.trell[i][1][k][1] guess = k tokens.append(guess) else: tokens.append(token) token = self.trell[i][1][token][1] tokens.reverse() return tokens