SciPy

Source code for prob140.markov_chains

from collections import OrderedDict
import warnings

from datascience import Table
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy


[docs]class MarkovChain: """ A class for representing, simulating, and computing Markov Chains. """ def __init__(self, states, transition_matrix): transition_matrix = np.array(transition_matrix) if not np.all(transition_matrix >= 0): warnings.warn('Transition matrix contains negative value(s).') if not np.all(np.isclose(np.sum(transition_matrix, axis=1), 1.)): warnings.warn('Transition probabilities don\'t sum to 1.') self.states = states self.matrix = transition_matrix
[docs] def to_pandas(self): """ Returns the Pandas DataFrame representation of the MarkovChain. """ return pd.DataFrame( data=self.matrix, index=self.states, columns=self.states )
[docs] def get_transition_matrix(self, steps=1): """ Returns the transition matrix after n steps as a numpy matrix. Parameters ---------- steps : int (optional) Number of steps. (default: 1) Returns ------- Transition matrix """ return np.linalg.matrix_power(self.matrix, steps)
[docs] def transition_matrix(self, steps=1): """ Returns the transition matrix after n steps visually as a Pandas df. Parameters ---------- steps : int (optional) Number of steps. (default: 1) Returns ------- Pandas DataFrame """ return pd.DataFrame( data=self.get_transition_matrix(steps), index=self.states, columns=self.states )
[docs] def distribution(self, starting_condition, steps=1): """ Finds the distribution of states after n steps given a starting condition. Parameters ---------- starting_condition : state or Table The initial distribution or the original state. n : integer Number of transition steps. Returns ------- Table Shows the distribution after n steps Examples -------- >>> states = make_array('A', 'B') >>> transition_matrix = np.array([[0.1, 0.9], ... [0.8, 0.2]]) >>> mc = MarkovChain.from_matrix(states, transition_matrix) >>> mc.distribution(start) State | Probability A | 0.24 B | 0.76 >>> mc.distribution(start, 0) State | Probability A | 0.8 B | 0.2 >>> mc.distribution(start, 3) State | Probability A | 0.3576 B | 0.6424 """ if isinstance(starting_condition, Table): states = list(starting_condition.column(0)) # datascience Tables store everything in arrays, so iterables get # typecast to arrays. Thus, if the states are iterables, we need to # typecast it back to its original type. if hasattr(states[0], '__iter__'): desired_type = type(self.states[0]) states = list(map(desired_type, states)) probabilities = starting_condition.column(1) else: states = [starting_condition] probabilities = [1] n = len(self.states) start = np.zeros((n, 1)) for i in range(n): if self.states[i] in states: index = states.index(self.states[i]) start[i, 0] = probabilities[index] else: start[i, 0] = 0 probabilities = start.T.dot(self.get_transition_matrix(steps=steps)) return Table().states(self.states).probability(probabilities[0])
[docs] def log_prob_of_path(self, starting_condition, path): """ Finds the log-probability of a path given a starting condition. May have better precision than `prob_of_path`. Parameters ---------- starting_condition : state or Distribution If a state, finds the log-probability of the path starting at that state. If a Distribution, finds the probability of the path with the first element sampled from the Distribution path : ndarray Array of states Returns ------- float log of probability Examples -------- >>> states = make_array('A', 'B') >>> transition_matrix = np.array([[0.1, 0.9], ... [0.8, 0.2]]) >>> mc = MarkovChain.from_matrix(states, transition_matrix) >>> mc.log_prob_of_path('A', ['A', 'B', 'A']) -2.6310891599660815 >>> start = Table().states(['A', 'B']).probability([0.8, 0.2]) >>> mc.log_prob_of_path(start, ['A', 'B', 'A']) -0.55164761828624576 """ states = list(self.states) if isinstance(starting_condition, Table): first = path[0] index = list(starting_condition.column(0)).index(first) assert index != -1, 'First path value not found.' log_prob = np.log(starting_condition.column(1)[index]) prev_index = states.index(first) i = 1 else: log_prob = np.log(1) prev_index = states.index(starting_condition) i = 0 while i < len(path): curr_index = states.index(path[i]) log_prob += np.log(self.matrix[prev_index, curr_index]) prev_index = curr_index i += 1 return log_prob
[docs] def prob_of_path(self, starting_condition, path): """ Finds the probability of a path given a starting condition. Parameters ---------- starting_condition : state or Distribution If a state, finds the probability of the path starting at that state. If a Distribution, finds the probability of the path with the first element sampled from the Distribution. path : ndarray Array of states Returns ------- float probability Examples -------- >>> states = ['A', 'B'] >>> transition_matrix = np.array([[0.1, 0.9], ... [0.8, 0.2]]) >>> mc = MarkovChain.from_matrix(states, transition_matrix) >>> mc.prob_of_path('A', ['A', 'B', 'A']) 0.072 >>> 0.1 * 0.9 * 0.8 0.072 >>> start = Table().states(['A', 'B']).probability([0.8, 0.2]) >>> mc.prob_of_path(start, ['A', 'B', 'A']) 0.576 >>> 0.8 * 0.9 * 0.8 0.576 """ states = list(self.states) if isinstance(starting_condition, Table): first = path[0] index = list(starting_condition.column(0)).index(first) assert index != -1, 'First path value not found.' prob = starting_condition.column(1)[index] prev_index = states.index(first) i = 1 else: prob = 1 prev_index = states.index(starting_condition) i = 0 while i < len(path): curr_index = states.index(path[i]) prob *= self.matrix[prev_index, curr_index] prev_index = curr_index i += 1 return prob
[docs] def simulate_path(self, starting_condition, steps, plot_path=False): """ Simulates a path of n steps with a specific starting condition. Parameters ---------- starting_condition : state or Distribution If a state, simulates n steps starting at that state. If a Distribution, samples from that distribution to find the starting state. steps : int Number of steps to take. plot_path : bool If True, plots the simulated path. Returns ------- ndarray Array of sampled states. Examples -------- >>> states = ['A', 'B'] >>> transition_matrix = np.array([[0.1, 0.9], ... [0.8, 0.2]]) >>> mc = MarkovChain.from_matrix(states, transition_matrix) >>> mc.simulate_path('A', 10) array(['A', 'A', 'B', 'A', 'B', 'A', 'B', 'B', 'A', 'B', 'B']) """ states = list(self.states) if isinstance(starting_condition, Table): start = starting_condition.sample_from_dist() else: start = starting_condition path = [start] for i in range(steps): index = states.index(path[-1]) next_state = np.random.choice(states, p=self.matrix[index]) path.append(next_state) if plot_path: self.plot_path(path[0], path[1:]) else: return np.array(path)
[docs] def steady_state(self): """ Finds the stationary distribution of the Markov Chain. Returns ------- Table Distribution. Examples -------- >>> states = ['A', 'B'] >>> transition_matrix = np.array([[0.1, 0.9], ... [0.8, 0.2]]) >>> mc = MarkovChain.from_matrix(states, transition_matrix) >>> mc.steady_state() Value | Probability A | 0.666667 B | 0.333333 """ # Steady state is the left eigenvector that corresponds to eigenvalue=1. w, vl = scipy.linalg.eig(self.matrix, left=True, right=False) # Find index of eigenvalue = 1. index = np.isclose(w, 1) eigenvector = np.real(vl[:, index])[:, 0] probabilities = eigenvector / sum(eigenvector) # Zero out floating poing errors that are negative. indices = np.logical_and(np.isclose(probabilities, 0), probabilities < 0) probabilities[indices] = 0 return Table().values(self.states).probability(probabilities)
[docs] def expected_return_time(self): """ Finds the expected return time of the Markov Chain (1 / steady state). Returns ------- Table Expected Return Time Examples -------- >>> states = ['A', 'B'] >>> transition_matrix = np.array([[0.1, 0.9], ... [0.8, 0.2]]) >>> mc = MarkovChain.from_matrix(states, transition_matrix) >>> mc.expected_return_time() Value | Expected Return Time A | 1.5 B | 3 """ steady = self.steady_state() expected_return = steady.column(1) return Table().values(self.states).with_column( 'Expected Return Time', 1 / expected_return )
[docs] def plot_path(self, starting_condition, path): """ Plots a Markov Chain's path. Parameters ---------- starting_condition : state State to start at. path : iterable List of valid states. Examples -------- >>> states = ['A', 'B'] # Works with all state data types! >>> transition_matrix = np.array([[0.1, 0.9], ... [0.8, 0.2]]) >>> mc = MarkovChain.from_matrix(states, transition_matrix) >>> mc.plot_path(mc.simulate_path('B', 20)) <Plot of a Markov Chain that starts at 'B' and takes 20 steps> """ assert starting_condition in self.states, 'Start state must be a state.' states = list(self.states) prev_index = states.index(starting_condition) for state in path: curr_index = states.index(state) assert self.matrix[prev_index, curr_index] != 0, \ 'Path not possible.' prev_index = curr_index path = [starting_condition] + list(path) x = np.arange(len(path)) y = [states.index(state) for state in path] plt.scatter(x, y, color='blue') plt.plot(x, y, lw=1, color='black') plt.yticks(np.arange(len(states)), states) plt.xlim(-0.5, len(path) + 0.5) plt.ylim(-0.5, len(states) - 0.5) plt.xlabel('Time') plt.ylabel('States')
def _repr_html_(self): return self.to_pandas()._repr_html_() def __repr__(self): return self.to_pandas().__repr__() def __str__(self): return self.to_pandas().__str__()
[docs] @classmethod def from_table(cls, table): """ Constructs a Markov Chain from a Table Parameters ---------- table : Table A table with three columns for source state, target state, and probability. Returns ------- MarkovChain Examples -------- >>> table = Table().states(make_array('A', 'B')) \ ... .transition_probability(make_array(0.5, 0.5, 0.3, 0.7)) >>> table Source | Target | Probability A | A | 0.5 A | B | 0.5 B | A | 0.3 B | B | 0.7 >>> MarkovChain.from_table(table) A B A 0.5 0.5 B 0.3 0.7 """ assert table.num_columns == 3, \ 'Must have 3 columns: source, target, probability' for prob_sum in table.group(0, collect=sum).column(2): assert round(prob_sum, 6) == 1, \ 'Transition probabilities must sum to 1.' # Get a list of the states. ordered_set = OrderedDict() for row in table.rows: ordered_set[row[0]] = 0 states = list(ordered_set.keys()) n = len(states) transition_matrix = np.zeros((n, n)) for row in table.rows: source = states.index(row[0]) target = states.index(row[1]) transition_matrix[source, target] = row[2] return cls(states, transition_matrix)
[docs] @classmethod def from_transition_function(cls, states, transition_function): """ Constructs a MarkovChain from a transition function. Parameters ---------- states : iterable List of states. transition_function : function Bivariate transition function that maps two states to a probability. Returns ------- MarkovChain Examples -------- >>> states = make_array(1, 2) >>> def transition(s1, s2): ... if s1 == s2: ... return 0.7 ... else: ... return 0.3 >>> MarkovChain.from_transition_function(states, transition) 1 2 1 0.7 0.3 2 0.3 0.7 """ n = len(states) transition_matrix = np.zeros((n, n)) for i in range(n): for j in (range(n)): transition_matrix[i, j] = transition_function(states[i], states[j]) return cls(states, transition_matrix)
[docs] @classmethod def from_matrix(cls, states, transition_matrix): """ Constructs a MarkovChain from a transition matrix. Parameters ---------- states : iterable List of states. transition_matrix : ndarray Square transition matrix. Returns ------- MarkovChain Examples -------- >>> states = [1, 2] >>> transition_matrix = np.array([[0.1, 0.9], ... [0.8, 0.2]]) >>> MarkovChain.from_matrix(states, transition_matrix) 1 2 1 0.1 0.9 2 0.8 0.2 """ return cls(states, transition_matrix)
def to_markov_chain(self): """ Constructs a Markov Chain from the Table. Returns ------- MarkovChain """ return MarkovChain.from_table(self)