Natural Language Processing  Text Information Extraction  Sequences  Hidden Markov Models
Table of Contents
Introduction
You are climatologists in the year 2799, studying the history of global warming. You can’t find any records of Baltimore weather, but you do find my diary, in which I assiduously recorded how much ice cream I ate each day [..]. What can you figure out from this about the weather that summer?  Jason Eisner [EIS02 p.3]
This notebook deals with Hidden Markov Models (HMMs). A Hidden Markov Model describes a sequence of observed events, emitted from an underlying sequence of hidden states. Throughout this notebook, you will
 compute the likelihood of sequences
 decode the best hidden sequence given the observations, i.e. figure out the most likely weather sequence given the log of ice cream
 estimate HMM parameters to fit observed data, i.e. estimate the likelihood of weather transitions and ice cream consumption from just the log of ice cream
Requirements
Knowledge
Chaper 8 and Appendix A of Speech and Language Processing by Jurafsky/Martin [JUR18] discusses Markov Chains and Hidden Markov Models in depth and presents the algorithms you will implement in the exercises.
Python Modules
import numpy as np
Data
This cells sets up various functions for testing.
def test_HMM_and_data():
A = np.array([[0.0,0.5,0.5,0.0], # from start
[0.0,0.8,0.1,0.1], # from cold
[0.0,0.1,0.8,0.1], # from hot
[0.0,0.0,0.0,1.0]]) # from end
A_desc = ["start", "cold", "hot", "end"]
B = np.array([
[0.0,0.0,0.0], # from start
[0.7,0.2,0.1], # from cold
[0.1,0.2,0.7], # from hot
[0.0,0.0,0.0]] # from end
)
B = B[1:1]
B_desc = ["one", "two", "three"]
# data for tests
x = np.array([2, 3, 3, 2, 3, 2, 3, 2, 2, 3, 1, 3, 3, 1, 1, 1, 2, 1, 1, 1, 3, 1, 2,
1, 1, 1, 2, 3, 3, 2, 3, 2, 2], dtype=int)
x = x1
return A,B,x
def test_sample_sequence(A,B,hidden=None, observed=None):
assert hidden[0] == 0, "The first element in the hidden state should be 0 (START)"
assert hidden[1] == A.shape[0]1, "The last element in the hidden state sequences should be the last index of A (END)"
assert len(hidden) == len(observed)+2, "The hidden sequence should be 2 elements longer than the observed sequence, since it includes a START and END state without observations."
def test_forward_algo(forward_algorithm):
A,B,x = test_HMM_and_data()
prob, trellis = forward_algorithm(A, B, x)
np.testing.assert_almost_equal(prob, 9.1276e19, 22)
np.testing.assert_almost_equal(trellis[0,0], 0.1, 4)
np.testing.assert_almost_equal(trellis[2,0], 0.00135, 5)
np.testing.assert_almost_equal(trellis[5,0], 8.71549e5, 9)
np.testing.assert_almost_equal(trellis[12,0], 5.70827e9, 9)
np.testing.assert_almost_equal(trellis[19,0], 1.3157e10, 14)
np.testing.assert_almost_equal(trellis[26,0], 3.1912e14, 13)
np.testing.assert_almost_equal(trellis[32,0], 2.0498e18, 22)
np.testing.assert_almost_equal(trellis[0,1], 0.1, 4)
np.testing.assert_almost_equal(trellis[2,1], 0.03591, 5)
np.testing.assert_almost_equal(trellis[5,1], 5.30337e4, 8)
np.testing.assert_almost_equal(trellis[12,1], 1.37864e7, 11)
np.testing.assert_almost_equal(trellis[19,1], 2.7819e12, 15)
np.testing.assert_almost_equal(trellis[26,1], 4.6599e15, 18)
np.testing.assert_almost_equal(trellis[32,1], 7.0777e18, 22)
def test_viterbi(viterbi_algorithm):
A,B,x = test_HMM_and_data()
trellis, trace = viterbi_algorithm(A, B, x)
assert trace == [0] + [2]*13 + [1]*14 + [2]*6 + [3]
# with adjusted indices
np.testing.assert_almost_equal(trellis[0][1], 0.1, 4)
np.testing.assert_almost_equal(trellis[5][1], 5.62e05, 7)
np.testing.assert_almost_equal(trellis[6][1], 4.50e06, 8)
np.testing.assert_almost_equal(trellis[15][1], 1.99e09, 11)
np.testing.assert_almost_equal(trellis[16][1], 3.18e10, 12)
np.testing.assert_almost_equal(trellis[22][1], 4.00e13, 15)
np.testing.assert_almost_equal(trellis[24][1], 1.26e13, 15)
np.testing.assert_almost_equal(trellis[28][1], 7.20e17, 19)
np.testing.assert_almost_equal(trellis[29][1], 1.15e17, 19)
np.testing.assert_almost_equal(trellis[31][1], 7.90e19, 21)
np.testing.assert_almost_equal(trellis[32][1], 1.26e19, 21)
np.testing.assert_almost_equal(trellis[0][2], 0.1, 4)
np.testing.assert_almost_equal(trellis[3][2], 0.00502, 5)
np.testing.assert_almost_equal(trellis[5][2], 0.00045, 5)
np.testing.assert_almost_equal(trellis[11][2], 1.62e07, 9)
np.testing.assert_almost_equal(trellis[17][2], 3.18e12, 14)
np.testing.assert_almost_equal(trellis[18][2], 1.78e12, 14)
np.testing.assert_almost_equal(trellis[22][2], 5.00e14, 16)
np.testing.assert_almost_equal(trellis[27][2], 7.87e16, 18)
np.testing.assert_almost_equal(trellis[28][2], 4.41e16, 18)
np.testing.assert_almost_equal(trellis[29][2], 7.06e17, 19)
np.testing.assert_almost_equal(trellis[32][2], 1.01e18, 20)
def test_backward_algo():
A,B,x = test_HMM_and_data()
prob, trellis = backward_algorithm(A, B, x)
np.testing.assert_almost_equal(prob, 9.1276e19, 21)
np.testing.assert_almost_equal(trellis[0][0], 1.1780e18, 22)
np.testing.assert_almost_equal(trellis[2][0], 7.2496e18, 22)
np.testing.assert_almost_equal(trellis[5][0], 3.3422e16, 20)
np.testing.assert_almost_equal(trellis[12][0], 3.5380e11, 15)
np.testing.assert_almost_equal(trellis[19][0], 6.77837e9, 14)
np.testing.assert_almost_equal(trellis[26][0], 1.44877e5, 10)
np.testing.assert_almost_equal(trellis[32][0], 0.1, 4)
np.testing.assert_almost_equal(trellis[0][1], 7.9496e18, 22)
np.testing.assert_almost_equal(trellis[2][1], 2.5145e17, 21)
np.testing.assert_almost_equal(trellis[5][1], 1.6662e15, 19)
np.testing.assert_almost_equal(trellis[12][1], 5.1558e12, 16)
np.testing.assert_almost_equal(trellis[19][1], 7.52345e9, 14)
np.testing.assert_almost_equal(trellis[26][1], 9.66609e5, 9)
np.testing.assert_almost_equal(trellis[32][1], 0.1, 4)
def test_total_data_likelihood(fn):
A,B,x = test_HMM_and_data()
z = np.array([0] + [2]*13 + [1]*14 + [2]*6 + [3])
np.testing.assert_almost_equal(fn(A,B,x,z), 1.0114871426573873e19)
def test_backward_algo(fn):
A,B,x = test_HMM_and_data()
prob, trellis = fn(A, B, x)
np.testing.assert_almost_equal(prob, 9.1276e19, 21)
np.testing.assert_almost_equal(trellis[0][0], 1.1780e18, 22)
np.testing.assert_almost_equal(trellis[2][0], 7.2496e18, 22)
np.testing.assert_almost_equal(trellis[5][0], 3.3422e16, 20)
np.testing.assert_almost_equal(trellis[12][0], 3.5380e11, 15)
np.testing.assert_almost_equal(trellis[19][0], 6.77837e9, 14)
np.testing.assert_almost_equal(trellis[26][0], 1.44877e5, 10)
np.testing.assert_almost_equal(trellis[32][0], 0.1, 4)
np.testing.assert_almost_equal(trellis[0][1], 7.9496e18, 22)
np.testing.assert_almost_equal(trellis[2][1], 2.5145e17, 21)
np.testing.assert_almost_equal(trellis[5][1], 1.6662e15, 19)
np.testing.assert_almost_equal(trellis[12][1], 5.1558e12, 16)
np.testing.assert_almost_equal(trellis[19][1], 7.52345e9, 14)
np.testing.assert_almost_equal(trellis[26][1], 9.66609e5, 9)
np.testing.assert_almost_equal(trellis[32][1], 0.1, 4)
def test_e_step(func):
A,B,x = test_HMM_and_data()
gamma,xi,likelihood = func(A,B,x)
# test gamma row sums
msg = "All rows in gamma should sum to 1"
actual = gamma.sum(axis=1)
np.testing.assert_allclose(actual, np.ones_like(actual),err_msg=msg)
# xi i>j (not start/end should be all zeros)
msg = "xi[0,:,:] represents the start of the sequence. Only transitions from "
msg += "START to a state should be nonzero here"
actual = xi[0,1:1,1:1]
desired = np.zeros_like(actual)
np.testing.assert_allclose(actual,desired, err_msg=msg)
# start probabilities should sum to one
actual = xi[0,0,1:1].sum()
np.testing.assert_almost_equal(actual,1)
# xi end should be z > end
msg = "xi[1,:,:] represents the end of the sequence. Only transitions from states"
msg += " to END should be nonzero."
actual = xi[1,1:1,1:1]
np.testing.assert_allclose(actual,np.zeros_like(actual),err_msg=msg)
# xi end probs should be 1
msg = "Transitions to END in xi[1,:,:] should sum to 1. since the sequence surely"
msg += " ends."
actual = xi[1,:,1].sum()
np.testing.assert_almost_equal(actual,1,err_msg=msg)
# test some values in the body
desired_xi2 = np.zeros((4,4))
desired_xi2[1:1,1:1] = [[0.00572, 0.01736], [0.005, 0.97192]]
np.testing.assert_allclose(xi[2], desired_xi2, atol=1e5)
desired_xi13 = np.zeros((4,4))
desired_xi13[1:1,1:1] = [[2.20678e01, 5.82381e04], [6.66216e01, 1.12524e01]]
np.testing.assert_allclose(xi[13], desired_xi13, atol=1e5)
np.testing.assert_allclose(gamma[1],[0.02307, 0.97693],atol=1e5)
np.testing.assert_allclose(gamma[14],[0.97972, 0.02028],atol=1e5)
np.testing.assert_allclose(gamma[30],[0.04471, 0.95529],atol=1e5)
def test_m_step(efunc,mfunc):
A,B,_ = test_HMM_and_data()
X = [[1, 2, 1, 2, 1, 2, 2, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 2, 2], [0, 2, 2, 0, 0], [0]]
ga = np.concatenate([efunc(A,B,x)[0] for x in X])
xa = np.concatenate([efunc(A,B,x)[1] for x in X])
A_pred, B_pred = mfunc(ga,xa,X,B.shape[1])
# shapes must match
assert A.shape == A_pred.shape, "Shape of fitted A does not match shape of true A"
assert B.shape == B_pred.shape, "Shape of fitted B does not match shape of true B"
# test validity of A and B
msg = "A does not seem to be a valid transition matrix."
actual = A_pred.sum(axis=1)
np.testing.assert_allclose(actual,np.ones_like(actual),err_msg=msg)
msg = "B does not seem to be valid observation probabilities."
actual = B_pred.sum(axis=1)
np.testing.assert_allclose(actual,np.ones_like(actual),err_msg=msg)
# test values
np.testing.assert_allclose(A_pred[0], [0., 0.55131, 0.44869, 0.],5)
np.testing.assert_allclose(A_pred[1], [0., 0.83557, 0.07308, 0.09135],5)
np.testing.assert_allclose(A_pred[2], [0., 0.1583, 0.74347, 0.09823],5)
np.testing.assert_allclose(A_pred[3], [0., 0., 0., 1.],1)
np.testing.assert_allclose(B_pred[0], [0.81538, 0.12816, 0.05645],5)
np.testing.assert_allclose(B_pred[1], [0.09149, 0.29802, 0.61049],5)
Hidden Markov Models
From Markov Chains to Hidden Markov Models
A Markov Chain models a sequence of states $z_1 \dots z_T$ or equivalently $z_{1:T}$. Each state $z_t$ represents a random variable that takes on a value from a vocabulary of states at time $t$.
A transition matrix $A$ where $A_{ij}$ is the probability of moving from state $i$ to state $j$.
Example: We're on a vacation. Each day, the state of the weather can be sunny, cloudy or raining. In our diary, we note down today's weather.
In a Hidden Markov Model, this sequence of states exists but we don't observe it directly, it is hidden. At each time point $t$ of the sequence, we observe an event $x_t$ (also called an emission) emitted from the state $z_t$. This produces a sequence of hidden states $z_{1:T}$ and a matching sequence of observed events $x_{1:T}$.
Example: On each day of our vacation, we choose an activity from either hiking, going to the beach or visiting the museum. In our diary, we note down today's activity but not the weather.
Properties
There are two important properties here:
 Markov property: The probability of the current state depends only on the single previous state, and no further previous states or observations.
 Output independence: The probability of an observation depends only on the single current state, and no previous states or observations.
with the abbreviation $z_{1:t}= z_1, z_2, \dots, z_t$
Overview of Components
 A sequence of discrete hidden states $z_{1:T}$, here we use indices: $z_t \in \{1,2,\dots K\}$
 A sequence of observations $x_{1:T} = x_1, x_2, \dots ,x_t ,\dots, x_T$
 The number of time steps of a sequence $T$
$A$
Transition MatrixMatrix $A$ encodes the probability for the transitions $z_{t1}=i \rightarrow z_t=j$:
with
 $\forall i : \sum_j A_{i,j}=1$
The transition probabilities are constant over time (stationarity).
Emission probabilities
If the observations are discrete we can use an observation matrix $B$ with elements
The emission probabilities are constant over time (stationary).
Start States
There are different conventions to represent the distribution over the start states $t=1$.
One way is to use an explicit notation for the start distribution:
Another convention uses a special start state $z = start$ and encodes the distributions of $z_{t=1}$ in the matrix $A$.
With start state $z=0$ we make the following assertions about the transition matrix:
 $A_{0i}$ is equivalent to the start distribution $\pi$
 $A_{j0}$ = 0, there are no transitions to the start state.
End States
There are different kinds of Markov Chains:

fixed length

variable length
For the variable length Markov Chains we need a probability at each time step $t$ that the chain ends. Either we use an explicit end probability $p_{end}$ and $1p_{end}$ for the probability that the chain proceeds.
Or we use a special end state (index: $K$) and encode the end probability in the matrix $A$ (in the elements $A_{iK}$).
Problems
Jurafsky's Speech and Language Processing outlines the three fundamental problems associated with Hidden Markov Models (p.3 in Appendix A). Variable names are adjusted to match those used previously in this notebook.
Problem 1: Likelihood Given an HMM $\theta = (A,B)$ and an observation sequence $x_{1:T}$, determine the likelihood $P(z_{1:T} \mid \theta)$
Problem 2: Decoding Given an observation sequence $x_{1:T}$, determine the best hidden state sequence $z_{1:T}$.
Problem 3: Learning Given an observation sequence $x_{1:T}$ and the set of states in the HMM, learn the HMM parameters $A$ and $B$.
Throughout these exercises, you will first encode our vacation example into an HMM. Then, you will compute the forward and backward probability (also known as $\alpha$ and $\beta$ probability) to assess the likelihood of data for a given HMM. (Problem 1)
Next, you will implement the Viterbi Algorithm to decode the best hidden state sequence (Problem 2).
Finally, you will tackle Problem 3. You will use your implementations of the forward and backward probability and add the gamma and xi probability to your toolset. You can then implement the BaumWelch algorithm to estimate the most likely parameters for an HMM given a list of observation sequences.
Exercises
Encoding the Data
The diagram shows a Hidden Markov Model of a vacation example adapted from [KEL18]. The lefthand side of the diagram shows the Markov Model describing the transitions between weather states. Remember the state transitions are not directly observed in an HMM. The righthand side shows the observation probabilities given the weather state.
 Write the matrices $A$ and $B$ for the vacation example in numpy.
 Encode the start distribution in the matrix $A$ with start state $z=0$.
 Which conditions must hold for the matrices $A$ and $B$? Test if the properties hold.
A_desc = ["start", "sunny", "cloudy", "raining", "end"]
B_desc = ["beach", "hiking", "museum"]
A = None # as exercise
B = None # as exercise
Sample Function
Sample a hidden state sequence $z$ and a matching observed event sequence $x$ from an HMM. The hidden state sequence should start with 0 (the START state) and end with the final index in A (the END state)
Hint: You can use np.random.multinomial combined with np.argmax to pick an item from a multinomial distribution, e.g. 1 + np.argmax(np.random.multinomial(1, [1./6.]*6))
to roll a dice.
Note: Make sure you keep your indexing consistent. The first row of $A$ encodes the special START state while the first row of $B$ encodes the sunny weather state.
One approach would be to create $B\_$ which encodes the START and END state with observation probabilities 0 (there are no observations at START/END) Another would be a helper $A\_$ which is a subset of $A$ without the START/END state.
Throughout these exercises you'll likely find one or the other (or both) approaches useful, depending on the current task.
def sample_from_HMM(A, B):
# returns z, x (as 1d numpy arrays)
# the elements of z and x are the indices of the corresponding states
raise NotImplementedError()
return z, x
z,x = sample_from_HMM(A,B)
print([A_desc[i] for i in z])
print([B_desc[i] for i in x])
Your implementation should pass the following test (defined in Data). If it passes the test, the cell output is empty, otherwise it shows a traceback of the failure.
test_sample_sequence(A,B,hidden=z,observed=x)
Forward Algorithm
The forward probability $\alpha_{t,j}$ tells us the probability of being in state $j$ at time point $t$ given the observed sequence $x_{1:T}$ and the parameters $A$ and $B$ of the HMM. In the final step it tells us the likelihood of the sequence $x_{1:T}$ given the HMM. It is implemented using a dynamic programming approach with complexity $O(K^2 T)$. We divide the algorithm into an initialisation, recursion and finalisation step and store the intermediate results in a table.
with
 the observation sequence $x_1 \dots x_T$
 parameters of the HMM $\theta =\{A, B\}$
1. Initialisation
For the alpha probabilities of the first time point we weigh in
 the initial probability of each state $j$
 the observation probability of the first emission $x_0$ given $j$
2. Recursion
The recursion step simulates a step forward in the sequence. We consider every possible path we could have taken through the hidden sequence so far. Then we make a transition to $j$ and observe $x_t$. Thanks to the Markov assumption, we only need to look 1 step into the past. So, we weigh in:
 The probability of the sequence at the previous time step $t1$. This is given in the previous row for each possible previous state $i$, hence the dynamic programming approach.
 The transition probability from state $i$ to $j$
 The observation probability $x_t$ given the state $j$
By summing over all the possible previous states $i$ we obtain the full probability $\alpha_{t,j}$. Formally:
3. Finalisation
The final row shows the likelihood of being in state $j$ at the final time point. Having reached the end of the sequence, we weigh in the end probabilities.
The likelihood of the entire sequence is calculated by summing over all states.
def forward_algorithm(A, B, x):
"""
Parameters

A : np.array; dtype=float
Transition probabilities of the hidden states i>j: $A_{ij}$
B : np.array; dtype=float
Observation probabilities from hidden state j to
observation index k. i.e. j>k:$B_{jk}$
Returns

likelihood : float
probability p(xA, B)
alpha: np.array, dtype=float
with entries $alpha_{tj} = p(x_0, x_1, .., x_t, z_t=j  A, B)$
$0\leq t \leq T1$ with $T$ observations.
"""
# computes the likelihood of the HHM (A,B) given the observation
# sequence of observations x
raise NotImplementedError()
# Your duty
return likelihood, alpha
A,B,x = test_HMM_and_data()
forward_algorithm(A,B,x)
Your implementation should pass the following test.
test_forward_algo(forward_algorithm)
Numerically Stable Forward Algorithm
Multiplication of small numbers can result in an underflow error, so it's common to work with logprobabilities:
The problem is that the forward algorithm requires us to both multiply and add probabilities. While multiplication of log probabilities is convenient, addition is cumbersome. We'd also have to forfeit multiplying and adding in one go using np.dot()
.
The solution is to scale the probability of a vector $v=\alpha_t$ e.g. by s=1/np.sum(v)
or s=np.max(v)
.
Some caution is required since we're using a dynamic programming approach where we reuse the value of the previous row. Consider what happens when we fill the table top to bottom:
No scaling
Scaling
After filling each row, we multiply by some factor $s_t$ e.g. the sum of the row
Since we always read the value of the previous row, we also factor in the scaling factor of the previous row. To recover the true probabilities we must cancel out all the previous scaling factors.
E.g.
In log space:
Formally:
def forward_algorithm_(A, B, x):
"""
Parameters

A_ : np.array; dtype=float
Transition probabilities of the hidden states i>j: $A_{ij}$
B : np.array; dtype=float
Observation probabilities from hidden state j to
observation index k. i.e. j>k:$B_{jk}$
Returns

log likelihood : float
log of the probability p(xA, B)
log alpha: np.array, dtype=float
with entries $log_alpha_{tj} = \log p(x_0, x_1, .., x_t, z_t=j  A, B)$
$0\leq t \leq T1$ with $T$ observations.
"""
raise NotImplementedError()
return log_likelihood, log_alpha
A,B,x = test_HMM_and_data()
forward_algorithm_(A,B,x)
The two implementations of the forward algorithm should return the same result.
A,B,x = test_HMM_and_data()
likelihood, alpha = forward_algorithm(A, B, x)
log_likelihood, log_alpha = forward_algorithm_(A, B, x)
np.testing.assert_allclose(log_likelihood, np.log(likelihood))
np.testing.assert_allclose(log_alpha, np.log(alpha))
Viterbi Algorithm
When you've grasped the forward algorithm it's only a small step to implement the Viterbi algorithm which addresses Problem 2 of decoding. The Viterbi Algorithm reveals the best hidden state sequence $z$ for a given observed sequence $x$ and the HMM parameters.
1. Initialisation
Similarly to the forward algorithm, we weigh in the start probability and probability of the first emission for each state $j$.
viterbi_{0, j} = A_{0j} B_{j,k=x_0}\\
backpointer_{0,j} = 0
2. Recursion
The recursion step simulates a step forward in the sequence. Like before, we consider the paths we could have taken so far, move to state $j$ and observe an event.
Unlike before, we don't sum over all previous states $i$. We calculate the likelihood of moving from $i$ to $j$ and observing $z_t$ for every $i$, producing a vector of likelihoods $\vec{v}$. In the viterbi table, we store the max of $\vec{v}$ as the likelihood of the best sequence so far. In the backpointer, we store the argmax of $\vec{v}$ as a pointer to the previous state on this path.
\vec{v} = viterbi_{t1, i} A_{i,j}B_{j,k=x_t} \\
viterbi_{t,j} = \max_{i=1}^K \vec{v} \\
backpointer_{t,j} = argmmax_{i=1}^K \vec{v}
3. Finalisation
Again, we finish by weighing in the end probabilities.
The likelihood of the best sequence is the maximum of our final $\vec{v}$.
We write the argmax of the final $\vec{v}$ into the last row of the backpointer.
A sequence of 3 observations may produce the following backpointer.
We iterate through the rows of the backpointer bottom to top. In each row we obtain the current state and a pointer to the previous state. We repeat this until we reach the beginning of the sequence, which points to the previous state $0$.
In our example we trace the best sequence $0 \rightarrow 1 \rightarrow 2 \rightarrow 1$ and append the special END state.
def viterbi_algorithm(A, B, x):
"""
Parameters

A_ : np.array; dtype=float
Transition probabilities of the hidden states i>j: $A_{ij}$
B : np.array; dtype=float
Observation probabilities from hidden state j to
observation index k. i.e. j>k:$B_{jk}$
x : list of int
Sequence of observations
Returns

v : np.array; dtype=float
Viterbi table
trace : list; dtype=int
The most likely sequence of hidden states z, with a prepended
START state (0) and appended END state (last index in A)
"""
raise NotImplementedError()
return v, trace
A,B,x = test_HMM_and_data()
v, trace = viterbi_algorithm(A,B,x)
v
trace
test_viterbi(viterbi_algorithm)
Total Data Likelihood
Compute the total data likelihood of a sequence, i.e.:
Note: Multiplying many probabilities directly together is numerically unstable. If you want to implement a numerically stable version of total_data_likelihood
you should use log probabilities.
def total_data_likelihood(A, B, x, z):
raise NotImplementedError()
test_total_data_likelihood(total_data_likelihood)
Backward Algorithm
The backward probability $\beta$ is defined as
Note:
 the $z_t=j$ is now on the "conditioning side" (in comparision to $\alpha_{t, j} = p(x_1, x_2, \dots, x_t, z_t=j \mid \theta)$)
So the backward probability is the probability for the observations from time $t+1$ to the end $x_{t+1:T}$ if we are at $t$ in state $j$ and given the HMM parameters $\theta$.
The implementation is analogous to the forward algorithm.
1. Initialisation
The final row is equivalent to the end probablities.
2. Recursion
In the recursion we weigh in a transition from $i$ to $j$ and the observation $x_{t+1}$ given $j$ and sum for all next states $j$.
3. Finalisation
Finally, we weigh in the start probabilities and the probability of the first emission. By summing over all states we obtain the likelihood of the sequence.
def backward_algorithm(A, B, x):
"""
Parameters

A_ : np.array; dtype=float
Transition probabilities of the hidden states i>j: $A_{ij}$
B : np.array; dtype=float
Observation probabilities from hidden state j to
observation index k. i.e. j>k:$B_{jk}$
Returns

likelihood : float
probability p(xA, B)
beta: np.array, dtype=float
with entries $beta_{ti} = p(x_t+1 , x_t+2, .. x_T, z_t=j  A, B)$
$0\leq t \leq T1$ with $T$ observations.
"""
raise NotImplementedError()
return likelihood, beta
test_backward_algo(backward_algorithm)
You now have two approaches to calculate the observation likelihood, you can use either the forward or backward probability.
A,B,x = test_HMM_and_data()
likelihood_alpha, alpha = forward_algorithm(A,B,x)
likelihood_beta, beta = backward_algorithm(A,B,x)
print(likelihood_alpha, likelihood_beta)
assert np.isclose(likelihood_alpha,likelihood_beta,atol=1e20)
Gamma Probability
We now address the likelihood of a hidden sequence $p(z_t \mid x_{1:T}, \theta)$. Using the quotient rule we get:
The denominator is the observation likelihood. To solve the numerator, we introduce $\gamma_{t,i}$ as the probability of being in the hidden state $i$ at time point $t$. $\gamma_{t,i}$ factors in the forward and backward probability.
The gamma probability is used for
 Inference, e.g. for "Change Detection" $p(z_t \neq z_{t+1} \mid x_{1:T})$
 Sampling of $z_{1:T}$ given $x_{1:T}$
Implement a samplefunction which samples a hidden states $z_{t}$ from a given HMM ($A,B$) given the observations $x_{1:T}$. Note: Use $\gamma_{t,i}$:
def sample_z_t_from_HMM_given_x(A, B, x, t):
"""
Parameters

A : np.array; dtype=float
Transition probabilities of the hidden states i>j: $A_{ij}$
B : np.array; dtype=float
Observation probabilities from hidden state j to
observation index k. i.e. j>k:$B_{jk}$
x: np.array; dtype=int
Observations
t: int
Time index for the sample z_t. Starts with 0.
Returns

z_t : int
Index of a hidden state sample at time t.
"""
raise NotImplementedError()
A,B,x = test_HMM_and_data()
z_t_sample = sample_z_t_from_HMM_given_x(A, B, x, t=7)
print(A_desc[z_t_sample])
BaumWelch Algorithm
The BaumWelch Algorithm addresses Problem 3: Learning the best HMM parameters given the observations.
Our goal is Maximum Likelihood Estimation of the parameters $\theta = \{ A, B\}$
We estimate the parameters by maximizing the likelihood function $\mathcal L(\theta) = p(x_{1:T} \mid \theta)$. This is equivalent to minimizing the negative log likelihood of $x_{1:T}$.
Note: The likelihood function is a function of $\theta$. As a function of $\theta$ it's not a probability distribution, because we have in general:
In a naive implementation, we have to sum over all possible hidden sequences $z_{1:T}$:
The problem is the number of terms ($O(K^T)$) grows exponentially with the sequence length $T$.
The BaumWelch Algorithm is an Expectation Maximization Algorithm. In the EStep it uses the forward algorithm and the backward algorithm to compute the expectation of the complete data likelihood of the current model. So the BaumWelch algorithm is sometimes also called forward backward algorithm.
For a Markov model (with observed transitions) we could estimate the transition probabilities in the Maximum Likelihood Framework just by using the counts $C$ of the transitions:
here we would use the counts of all time steps $t$
We define the xi probability $\xi_{t,i,j}$ for the $l$th train sequence $\vec x^{(l)}$. It is the probability of being at time point t with the state $z_t = i$ and the next state $z_{t+1} = j$, given the observations and HMM parameters.
We can use this to estimate $\hat{A}$ for our data.
Exercise:
Show from the definition that
Solution
The denominator is the likelihood of the observed sequence.
Using the conditional independence properties of the HMM the numerator becomes:
q.e.d.
Helper functions
We need some training data: A collection $X_{1:L}$ of $L$ observation sequences sampled from an HMM.
def get_train_data(A, B, n=100):
"""
Given the parameters A and B, this function samples n
observation sequences and returns them in a list
Returns :

X : list of list of int; (length: n)
A list of n observation sequences.
"""
X = []
for i in range(n):
z, x = sample_from_HMM(A, B)
X.append(x)
return X
Generate a random $A$ and $B$ matrix as an initial estimation of the model.
def get_random_AB(size_A=(5,5), size_B=(3,3)):
"""
get_random_AB(size_A=(5,5), size_B=(3,3))
Returns a transition matrix A and likelihood matrix B with
randomly generated likelihoods. size_A and size_B specify
the shape of the matrix.
The transition matrix A encodes a special START and END
state in the first and last index.
A and B are valid, meaning the transitions i>* sum to 1 and so on.
"""
assert size_A[0]==size_A[1]
A_init = np.random.random(size_A)
A_init[0,0]=0
A_init[0,size_A[1]1]=0
A_init[:,0]=0
A_init[size_A[0]1,:size_A[1]1]=0
A_init = (A_init.T/A_init.sum(axis=1)).T
B_init = np.random.random(size_B)
B_init = (B_init.T/B_init.sum(axis=1)).T
return A_init, B_init
Return the average negative log likelihood of our training data. This is the likelihood function we want to minimise.
# short sequences have much higher likelihoods
# so we avg negative log likelihood (on the sequence length)
def avg_data_neg_log_likelihood(A_, B_, X):
"""
Parameters

A: : np.array; dtype=float
Transition probabilities of the hidden states i>j: $A_{ij}$
B_ : np.array; dtype=float
Observation probabilities from hidden state j to
observation index k. i.e. j>k:$B_{jk}$
X : Training data ; type = list of list of int
Each list in X is an observation sequence.
Each observation sequence is a list of int, representing the
observation at each time point of the sequence.
Returns

likelihood : float
The average data likelihood over all training sequences
given the parameters A and B as the negative log likelihood.
The likelihood of a single sequence is normalised by its length
(number of time points) since shorter sequences produce greater
likelihoods.
"""
sum_data_likelihood = 0.
for x in X:
likelihood, alpha = forward_algorithm(A_, B_, x)
sum_data_likelihood = np.log(likelihood) / len(x)
return sum_data_likelihood /len(X)
Implementation of the Baum Welch Algorithm
In the Estep, we compute the expectation of the complete data likelihood of the current model.
For a single observed sequence $x_{1:T}$ from the training data, return: 1. The $\gamma$ and $\xi$ probabilities across all time points 2. The likelihood of the sequence $p(x_{1:T} \mid \theta)$
def e_step(A,B,x):
"""
Parameters

A : np.array; dtype=float
Transition probabilities of the hidden states i>j: $A_{ij}$
B : np.array; dtype=float
Observation probabilities from hidden state j to
observation index k. i.e. j>k:$B_{jk}$
x : list of int
A single observation sequence, representing the
observation at each time point of the sequence.
Returns

gamma : np.array; dtype=float; shape=(n_time_points,n_hidden_states)
where n_time_points is the length of the observation sequence
and n_hidden_states is the number of hidden states (does not)
include the START and END state
gamma[t][i] is the probability of being in state i at time point
t of the observation sequence.
Thus, gamma[0] is equivalent to the initial probabilities of each
state and gamma[1] is equivalent to the end probabilities of each
state
xi: : np.array; dtype=float; shape=(n_time_points,n_states,n_states)
where n_states is the number of hidden states including a special
START and END state
and n_time_points is the length of the observation sequence + 1.
xi probability over all time points in a training sequence.
xi[t,i,j] is the probability of the state at t being i and
the state at (t+1) being j.
Note:
xi[0,i,j] represents the beginning of the sequence. Only
probabilities from the START state to a hidden state should
be nonzero here.
xi[1,i,j] represents the end of the sequence. Only
probabilities from a hidden state to END should be nonzero
here.
likelihood : np.array; dtype=float
The likelihood of the observation sequence x given the HMM
parameters A and B.
"""
return gamma,xi,likelihood
raise NotImplementedError()
Your implementation should pass the following test.
test_e_step(e_step)
In the MStep, you take the $\gamma$ and $\xi$ over all time points of all training sequences as input. You'll use these to estimate the next model $\theta^{S+1}$.
 To estimate $A_{i,j}$, count the transitions from $i$ to $j$ and divide by the transitions from $i$ to any state. We've computed the xi probabilities for this purpose.
 To estimate $B_{i,o}$ for state $i$ and observation $o$, count the number of times we're in state $o$ when the observation was $l$ and divide by the number of times we're in state $i$ overall. We've computed the gamma probabilities for this purpose.
def m_step(gamma, xi, X, nb_observations):
"""
Parameters

Please refer to the docstring of e_step for details on gamma and xi.
gamma : np.array; dtype=float; shape=(n_total_time_points,n_hidden_states)
The gamma probabilities of every observation sequence in the
training data, concatenated along the time point axis.
xi : np.array; dtype=float; shape=(n_time_points,n_states,n_states)
The xi probabilites of every observation sequence in the
training data, concatenated along the time point axis.
X : Training data ; type = list of list of int
Each list in X is an observation sequence.
Each observation sequence is a list of int, representing the
observation at each time point of the sequence.
nb_observations : int
The number of observations in the observation vocabulary.
Used to infer the shape of B.
Returns

The parameters A and B of an HMM that maximize gamma and xi for
the training data X.
A_est : np.array; dtype=float
Transition probabilities of the hidden states i>j: $A_{ij}$
B_est : np.array; dtype=float
Observation probabilities from hidden state j to
observation index k. i.e. j>k:$B_{jk}$
"""
raise NotImplementedError()
return A_est,B_est
Your implementation should pass the following test if both your e_step
and m_step
functions work as expected.
test_m_step(e_step, m_step)
In the Baum Welch algorithm you're given a list of $L$ observation sequences for training $X_{1:L}$ where $x^{l}$ is the $l$th training sequence $x$.
You start off with an initial model $A$ and $B$ and iteratively adjust the parameters
In each iteration 1. Perform the Estep to find $\gamma$ and $\xi$ over all time points for all $x \in X$ given the current model $\theta$. 2. Perform the Mstep find the $A$ and $B$ that maximize the likelihood of the data. This yields the next model $\theta^{(s+1)}$, replacing the current $\theta^{(s)}$.
We iterate until convergence or until we've hit a maximum number of steps. Convergence occurs when the elementwise change from the current model $\theta^{(s)}$ to the next model $\theta^{(s+1)}$ next are all below a certain threshold.
def baum_welch(A_init,B_init,X,atol=0.001,max_steps=25):
"""
Given a list of observation sequences X, learn the best HMM parameters
A and B.
Parameters

A_init : np.array; dtype=float
Initial transition probabilites
B_init : np.array; dtype=float
Initial observation probabilities
X : Training data ; type = list of list of int
Each list in X is an observation sequence.
Each observation sequence is a list of int, representing the
observation at each time point of the sequence.
atol : float
Convergence occurs when the difference between \theta and
\theta' is below this threshold for all elements of A and B
max_steps : int
Stop iterating after a given maximum of steps, whether or not
convergence has occured.
Returns

A_fit, B_fit:
The parameters A and B of an HMM fit to the training data
"""
return A_fit, B_fit
# True A and B
A,B = get_random_AB()
# True observation sequences
X = get_train_data(A,B,n=4000)
# Get random A and B with matching shapes
A_init,B_init = get_random_AB(A.shape, B.shape)
# Plug into BaumWelch
A_fit,B_fit = baum_welch(A_init, B_init,X)
# Test data
X_test = get_train_data(A,B,n=10000)
print ("likelihood for real parameters: ", avg_data_neg_log_likelihood(A,B,X_test))
print ("likelihood for random parameters: ", avg_data_neg_log_likelihood(A_init, B_init, X_test))
print ("likelihood for fitted parameters: ", avg_data_neg_log_likelihood(A_fit, B_fit, X_test))
Summary
In this notebook you have implemented solutions for the fundamental problems associated with Hidden Markov Models. You have
 modeled a sequence of hidden states and observed events (weather and activities) as a Hidden Markov Model
 implemented the forward and backward algorithms to assess the likelihood of a hidden sequence
 implemented the Viterbi algorithm to decode the best hidden sequence given the observations and HMM parameters
 implemented the BaumWelch algorithm to learn the best HMM parameters given the observations
Literature
Licenses
Notebook License (CCBYSA 4.0)
Natural Language Processing  Text Information Extraction  Sequences  Hidden Markov Models
by Christian Herta and Diyar Oktay
is licensed under a Creative Commons AttributionShareAlike 4.0 International License.
Based on a work at https://gitlab.com/deep.TEACHING.
Code License (MIT)
The following license only applies to code cells of the notebook.
Copyright 2018 Christian Herta, Diyar Oktay
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.