Auto Byte

Science AI

Pedro 思源参与

# 如何直观地理解条件随机场，并通过PyTorch简单地实现

P(x_i | y_i) 是在给定当前的骰子标签的前提条件下，观测到一个给定骰子投掷点数的概率。举例而言，如果 y_i 为公平骰子，则 P(x_i | y_i) = 1/6。另一项 T(y_i | y_{i-1}) 是从上一个骰子标签转换到当前投资标签的概率，我们可以直接从转移矩阵中读取出这个概率。

PyTorch 是一个在 Python 语言环境下为训练深度学习模型而编写的深度学习库。尽管我们在这里并不是要进行深度学习，但 PyTorch 的自动微分库将帮助我们通过梯度下降算法训练条件随机场模型，而无需我们手动计算任何梯度。使用 PyTorch 会迫使我们实现「向前-向后算法」的前向部分以及「维特比算法」，这比我们直接使用专门的条件随机场构建的 Python 包更有指导意义。

def neg_log_likelihood(self, rolls, states):
"""Compute neg log-likelihood for a given sequence.

Input:
rolls: numpy array, dim [1, n_rolls]. Integer 0-5 showing value on dice.
states: numpy array, dim [1, n_rolls]. Integer 0, 1. 0 if dice is fair.
"""
loglikelihoods = self._data_to_likelihood(rolls)
states = torch.LongTensor(states)

sequence_loglik = self._compute_likelihood_numerator(loglikelihoods, states)
denominator = self._compute_likelihood_denominator(loglikelihoods)
return denominator - sequence_loglik

array([[-1.79175947, -3.21887582],
[-1.79175947, -3.21887582],
[-1.79175947, -3.21887582],
[-1.79175947, -3.21887582],
[-1.79175947, -3.21887582],
[-1.79175947, -0.22314355]])

def _data_to_likelihood(self, rolls):
"""Converts a numpy array of rolls (integers) to log-likelihood.
self.loglikeligood is a matrix of 6 x 2 in our case.
Input is one [1, n_rolls]
"""
log_likelihoods = self.loglikelihood[rolls]
return Variable(torch.FloatTensor(log_likelihoods), requires_grad=False)

 def _compute_likelihood_numerator(self, loglikelihoods, states):
"""Computes numerator of likelihood function for a given sequence.

We'll iterate over the sequence of states and compute the sum
of the relevant transition cost with the log likelihood of the observed
roll.
Input:
loglikelihoods: torch Variable. Matrix of n_obs x n_states.
i,j entry is loglikelihood of observing roll i given state j
states: sequence of labels
Output:
score: torch Variable. Score of assignment.
"""
prev_state = self.n_states
score = Variable(torch.Tensor([0]))
for index, state in enumerate(states):
score += self.transition[state, prev_state] + loglikelihoods[index, state]
prev_state = state
return score

def _compute_likelihood_denominator(self, loglikelihoods):
"""Implements the forward pass of the forward-backward algorithm.

We loop over all possible states efficiently using the recursive
relationship: alpha_t(j) = \sum_i alpha_{t-1}(i) * L(x_t | y_t) * C(y_t | y{t-1} = i)
Input:
loglikelihoods: torch Variable. Same input as _compute_likelihood_numerator.
This algorithm efficiently loops over all possible state sequences
so no other imput is needed.
Output:
torch Variable.
"""

# Stores the current value of alpha at timestep t
prev_alpha = self.transition[:, self.n_states] + loglikelihoods[0].view(1, -1)

for roll in loglikelihoods[1:]:
alpha_t = []

# Loop over all possible states
for next_state in range(self.n_states):

# Compute all possible costs of transitioning to next_state
feature_function = self.transition[next_state,:self.n_states].view(1, -1) +\
roll[next_state].view(1, -1).expand(1, self.n_states)

alpha_t_next_state = prev_alpha + feature_function
alpha_t.append(self.log_sum_exp(alpha_t_next_state))
prev_alpha = torch.cat(alpha_t).view(1, -1)
return self.log_sum_exp(prev_alpha)

def _viterbi_algorithm(self, loglikelihoods):
"""Implements Viterbi algorithm for finding most likely sequence of labels.

Very similar to _compute_likelihood_denominator but now we take the maximum
over the previous states as opposed to the sum.
Input:
loglikelihoods: torch Variable. Same input as _compute_likelihood_denominator.
Output:
tuple. First entry is the most likely sequence of labels. Second is
the loglikelihood of this sequence.
"""

argmaxes = []

# prev_delta will store the current score of the sequence for each state
prev_delta = self.transition[:, self.n_states].view(1, -1) +\
loglikelihoods[0].view(1, -1)

for roll in loglikelihoods[1:]:
local_argmaxes = []
next_delta = []
for next_state in range(self.n_states):
feature_function = self.transition[next_state,:self.n_states].view(1, -1) +\
roll.view(1, -1) +\
prev_delta
most_likely_state = self.argmax(feature_function)
score = feature_function[0][most_likely_state]
next_delta.append(score)
local_argmaxes.append(most_likely_state)
prev_delta = torch.cat(next_delta).view(1, -1)
argmaxes.append(local_argmaxes)

final_state = self.argmax(prev_delta)
final_score = prev_delta[0][final_state]
path_list = [final_state]

# Backtrack through the argmaxes to find most likely state
for states in reversed(argmaxes):
final_state = states[final_state]
path_list.append(final_state)

return np.array(path_list), final_score

1.P（序列中的第一个骰子为公平骰子）=0.5

2.P（当前为公平骰子|上一次为公平骰子）=0.8

3.P（当前为有偏骰子|上一次为有偏骰子）=0.35

array([[-0.86563134, -0.40748784, -0.54984874],
[-1.3820231 , -0.59524935, -0.516026 ]], dtype=float32)

# observed dice rolls
array([2, 3, 4, 5, 5, 5, 1, 5, 3, 2, 5, 5, 5, 3, 5])
# corresponding labels. 0 means fair
array([0, 0, 1, 1, 1, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1])
# predictions
array([0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0])