# Karma Game Library, Copyrights Kevin Riehl 2023, <kriehl@ethz.ch>
# Imports
import numpy as np
import karma_game_library
from typing import Callable
# Classes
[docs]class Optimizer:
"""
A class to provide functionality to iteratively optimize the agent policy.
Attributes
----------
hyper_lambda : float
Exponential scaling factor for policy perturbation.
hyper_dt : float
Stepwidth for state updates. Only values between 0.0 and 1.0 allowed.
hyper_nu : float
Stepwidth for policy updates. Only values between 0.0 and 1.0 allowed.
func_prob_outcome : Callable, (outcome, actions) -> (probability)
This function returns the probability for the outcome given the actions.
func_prob_karma_transition : Callable, (karma_next, karma, actions, outcome) -> (probability)
This function returns the probability of karma_next given a prior
karma, actions and outcome.
func_prob_urgency_transition : Callable, (urgency_next, urgency, outcome, atype, set_urgencies) -> (probability)
This function returns the probability of urgency_next given a prior
urgency and outcome and type.
v : np.array [action]
"Distribution of agents' actions"
Intermediate result, returns probability of an action.
gamma : np.array [outcome][action]
"Probability of interaction outcome given bid"
Intermediate result, returns the probability of an outcome given the action.
kappa : np.array [karma_next][karma][action][outcome]
"Probabilistic karma transition function"
Intermediate result, returns the probability of karma_next given the
prior karma, action and outcome.
xi : np.array [urgency][action]
"Immediate reward function"
Intermediate result, returns the expected immediate reward (negative
cost) for an participant with specific urgency, and action.
rho : np.array [type][urgency_next][karma_next][urgency][karma][action]
"Probabilistic state transition function"
Intermediate result, returns the probability that participant of type
ends up in next state urgency_next and karma_next given a prior
urgency and karma and action.
R : np.array [type][urgency][karma]
"Expected immediate reward"
Imme result, returns the expected immediate reward
(negative cost) for an participant of a specific type, urgency, karma.
P : np.array [type][urgency_next][karma_next][urgency][karma]
"State transition probabilities"
Intermediate result, returns the probability of an agent of specific
type to end up in next state urgency_next, karma_next given his prior
state urgency, karma
V : np.array [type][urgency][karma]
"Expected infinite horizon reward"
Intermediate result, returns the expected inifinite horizon reward
(negative cost) for an agent of specific type, urgency, karma.
Q : np.array [type][urgency][karma][action]
"Single-stage deviation reward (from current policy)"
Intermediate result, returns the expected reward (negative cost) for an
participant with specific type, urgency, karma and action.
pi_pbp : np.array [type][urgency][karma][bid]
"Perturbed best response policy"
Intermediate result, represents a modified, potentially better policy.
delta_state : float
The amount the state changed when compared to previous iteration. Can
be used as an indicator for convergence.
delta_policy : float
The amount the policy changed when compared to previous iteration. Can
be used as an indicator for convergence.
"""
#### Attributes
# Parameters for Optimization
_parameters = None
hyper_lambda = 150 # 1000
hyper_dt = 0.01
hyper_nu = 0.2
_hyper_infinite_horizon_reward_max_iterations = 1000
_hyper_infinite_horizon_reward_convergence_threshold = 0.00001
func_prob_outcome = None
func_prob_karma_transition = None
func_prob_urgency_transition = None
# Intermediate Optimization Results
v = None
gamma = None
kappa = None
xi = None
rho = None
R = None
P = None
V = None
Q = None
pi_pbp = None
delta_state = None
delta_policy = None
# Recorder
recorder = []
#
_enlarge_action_space = False
_enlarge_state_space = False
[docs] def __init__(self,
game_parameters: karma_game_library.GameParameters,
hyper_lambda: float,
hyper_dt: float,
hyper_nu: float,
func_prob_outcome : Callable,
func_prob_karma_transition : Callable,
func_prob_urgency_transition : Callable):
"""
This constructor initializes the PolicyOptimizer.
Parameters
----------
game_parameters : GameParameters
The simulation parameters.
hyper_lambda : float
Exponential scaling factor for policy perturbation.
hyper_dt : float
Stepwidth for state updates. Only values between 0.0 and 1.0
allowed.
hyper_nu : float
Stepwidth for policy updates. Only values between 0.0 and 1.0
allowed.
func_prob_outcome : Callable, (outcome, actions) -> (probability)
This function returns the probability for the outcome given the
actions.
func_prob_karma_transition: Callable, (outcome, actions) -> (probability)
This function returns the probability of karma_next given a prior
karma, actions and outcome.
func_prob_urgency_transition: Callable, (urgency_next, urgency, outcome, atype, set_urgencies) -> (probability)
This function returns the probability of urgency_next given a prior
urgency and outcome.
Raises
------
Exception
In case the input parameters are not plausible a specific exception
will be raised.
Returns
-------
None.
"""
# Plausibility checks
karma_game_library.check.non_negative(hyper_lambda, "hyper_lambda")
karma_game_library.check.float_bounded(hyper_dt, 0.0, 1.0, "hyper_dt")
karma_game_library.check.float_bounded(hyper_nu, 0.0, 1.0, "hyper_nu")
karma_game_library.check.func_not_none(func_prob_outcome, "func_prob_outcome")
karma_game_library.check.func_not_none(func_prob_karma_transition, "func_prob_karma_transition")
karma_game_library.check.func_not_none(func_prob_urgency_transition, "func_prob_urgency_transition")
# Initialization of matrix
self._parameters = game_parameters
self.hyper_lambda = hyper_lambda
self.hyper_dt = hyper_dt
self.hyper_nu = hyper_nu
self.func_prob_outcome = func_prob_outcome
self.func_prob_karma_transition = func_prob_karma_transition
self.func_prob_urgency_transition = func_prob_urgency_transition
self._enlarge_action_space = False
self._enlarge_state_space = False
[docs] def compute_iteration(self, state: karma_game_library.StateDistribution, policy: karma_game_library.Policy):
"""
This function updates the social state distribution and policy
probability matrix according to an optimization algorithm.
This algorithm is iterative, and shall be repeated iteratively until
state and policy converge to a stationary Nash equilibrium.
Parameters
----------
state : StateDistribution
The social state distribution that should be optimized.
policy : Policy
The policy that should be optimized.
Raises
------
Exception
In case the policy or state are invalid, an exception will be
raised.
Returns
-------
delta_state : float
The absolute change to the state (can be used as an indicator for
convergence).
delta_policy : float
The absolute change to the policy (can be used as an indicator
for convergence).
"""
# enlarge state and action spcae if required
if self._enlarge_action_space:
self._enlarge_action_space = False
# print("enlarging action space as limitations reached")
policy._enlarge_action_space()
self._parameters._enlarge_action_space()
if self._enlarge_state_space:
self._enlarge_state_space = False
# print("enlarging karma space as limitations reached")
state._enlarge_karma_space()
policy._enlarge_karma_space()
self._parameters._enlarge_karma_space()
# check validity of state and policy
if(not state.validate()):
state.autocorrect()
# print("Needed to autocorrect state")
if(not state.validate()):
raise Exception("Invalid state, could not be autocorrected!")
if(not policy.validate()):
policy.autocorrect()
# print("Needed to autocorrect policy")
if(not policy.validate()):
raise Exception("Invalid policy, could not be autocorrected!")
# plausibility checks of hyper parameters
karma_game_library.check.non_negative(self.hyper_lambda, "hyper_lambda")
karma_game_library.check.float_bounded(self.hyper_dt, 0.0, 1.0, "hyper_dt")
karma_game_library.check.float_bounded(self.hyper_nu, 0.0, 1.0, "hyper_nu")
# compute iteration
self.v = self._prob_distribution_of_action(state, policy)
self.gamma = self._prob_distribution_outcome_bid(self.v)
self.kappa = self._prob_karma_transition(policy, state, self.v, self.gamma)
self.xi = self._immediate_reward(policy, self.gamma)
self.rho = self._prob_state_transition_function(policy, self.gamma, self.kappa)
self.R = self._expected_immediate_reward_policy(policy, self.xi)
self.P = self._prob_state_transition_policy(policy, self.rho)
self.V, _ = self._inifite_horizon_reward(self.R, self.P, self._hyper_infinite_horizon_reward_max_iterations, self._hyper_infinite_horizon_reward_convergence_threshold)
self.Q = self._agent_single_state_deviation_reward(policy, self.xi, self.rho, self.V)
self.pi_pbp = self._peturbed_best_response(policy, self.Q)
delta_policy, direct_policy = self._calculate_new_policy(policy, self.pi_pbp)
delta_state, direct_state = self._calculate_new_state(state, policy, self.P)
self.recorder.append([delta_state, delta_policy, np.sum(self.V)])
return delta_state, delta_policy, direct_state, direct_policy
def _prob_distribution_of_action(self, state: karma_game_library.StateDistribution, policy: karma_game_library.Policy):
"""
Intermediate result, returns probability of an action.
Parameters
----------
state : StateDistribution
The current state.
policy : Policy
The current policy.
Returns
-------
v : np.array [action]
"Distribution of agents' actions"
"""
v = np.zeros(len(self._parameters._set_actions))
for action in self._parameters._set_actions:
prob = 0
for atype in self._parameters._set_types:
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
if action in policy._map_possible_actions[karma]:
prob += state.dist_matrix[atype][urgency][karma] * policy.prob_matrix[atype][urgency][karma][action]
v[action] = prob
return v
def _prob_distribution_outcome_bid(self, v):
"""
Intermediate result, returns the probability of an outcome given the action.
Parameters
----------
v : np.array [action]
"Distribution of agents' actions"
Returns
-------
gamma : np.array [outcome][action]
"Probability of resource competition outcome given bid"
"""
gamma = np.zeros((len(self._parameters._set_outcomes),
len(self._parameters._set_actions)
))
for outcome in self._parameters._set_outcomes:
for action in self._parameters._set_actions:
prob = 0
for other_action in self._parameters._set_actions:
prob += v[other_action] * self.func_prob_outcome(outcome, [action, other_action])
gamma[outcome][action] = prob
return gamma
def _prob_karma_transition(self, policy: karma_game_library.Policy, state: karma_game_library.StateDistribution, v, gamma):
"""
Intermediate result, returns the probability of karma_next given the
prior karma, action and outcome.
Parameters
----------
policy : Policy
The current policy.
v : np.array [action]
"Distribution of agents' actions"
gamma : np.array [outcome][action]
"Probability of resource competition outcome given bid"
Returns
-------
kappa : np.array [karma_next][karma][action][outcome]
"Probabilistic karma transition function"
"""
kappa = np.zeros((len(self._parameters._set_state_karmas),
len(self._parameters._set_state_karmas),
len(self._parameters._set_actions),
len(self._parameters._set_outcomes)
))
for karma_next in self._parameters._set_state_karmas:
for karma in self._parameters._set_state_karmas:
for action in policy._map_possible_actions[karma]:
for outcome in self._parameters._set_outcomes:
kappa[karma_next][karma][action][outcome] = self.func_prob_karma_transition(karma_next, karma, action, outcome, state.dist_matrix, policy.prob_matrix, self._parameters, gamma, v)
return kappa
def _immediate_reward(self, policy: karma_game_library.Policy, gamma):
"""
Intermediate result, returns the expected immediate reward (negative
cost) for an participant with specific type, urgency, karma balance
and action.
Parameters
----------
policy : Policy
The current policy.
gamma : np.array [outcome][action]
"Probability of resource competition outcome given bid"
Returns
-------
xi : np.array [urgency][action]
"Immediate reward function"
"""
xi = np.zeros((len(self._parameters._set_urgencies),
len(self._parameters._set_actions)
))
for urgency in self._parameters._set_urgencies:
for action in self._parameters._set_actions:
exp_cost = 0
for outcome in self._parameters._set_outcomes:
exp_cost += self._parameters._func_cost(urgency, outcome) * gamma[outcome][action]
xi[urgency][action] = - exp_cost
return xi
def _prob_state_transition_function(self, policy: karma_game_library.Policy, gamma, kappa):
"""
Intermediate result, returns the probability that participant of type
ends up in next state urgency_next and karma_next given a prior
urgency and karma and action.
Parameters
----------
policy : Policy
The current policy.
gamma : np.array [outcome][action]
"Probability of resource competition outcome given bid"
kappa : np.array [karma_next][karma][action][outcome]
"Probabilistic karma transition function"
Returns
-------
rho : np.array [type][urgency_next][karma_next][urgency][karma][action]
"Probabilistic state transition function"
"""
rho = np.zeros((len(self._parameters._set_types),
len(self._parameters._set_urgencies),
len(self._parameters._set_state_karmas),
len(self._parameters._set_urgencies),
len(self._parameters._set_state_karmas),
len(self._parameters._set_actions),
))
for atype in self._parameters._set_types:
for urgency_next in self._parameters._set_urgencies:
for karma_next in self._parameters._set_state_karmas:
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
for action in policy._map_possible_actions[karma]:
prob = 0
for outcome in self._parameters._set_outcomes:
prob += self.func_prob_urgency_transition(urgency_next, urgency, outcome, atype, self._parameters._set_urgencies) * gamma[outcome][action] * kappa[karma_next][karma][action][outcome]
rho[atype][urgency_next][karma_next][urgency][karma][action] = prob
return rho
def _expected_immediate_reward_policy(self, policy: karma_game_library.Policy, xi):
"""
Intermediate result, returns the expected immediate reward
(negative cost) for an participant of a specific type, urgency, karma.
Parameters
----------
policy : Policy
The current policy.
xi : np.array [urgency][action]
"Immediate reward function"
Returns
-------
R : np.array [type][urgency][karma]
"Expected immediate reward"
"""
R = np.zeros((len(self._parameters._set_types),
len(self._parameters._set_urgencies),
len(self._parameters._set_state_karmas)
))
for atype in self._parameters._set_types:
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
reward = 0
for action in policy._map_possible_actions[karma]:
reward += policy.prob_matrix[atype][urgency][karma][action] * xi[urgency][action]
R[atype][urgency][karma] = reward
return R
def _prob_state_transition_policy(self, policy: karma_game_library.Policy, rho):
"""
Intermediate result, returns the probability of an agent of specific
type to end up in next state urgency_next, karma_next given his prior
state urgency, karma
Parameters
----------
policy : Policy
The current policy.
rho : np.array [type][urgency_next][karma_next][urgency][karma][action]
"Probabilistic state transition function"
Returns
-------
P : np.array [type][urgency_next][karma_next][urgency][karma]
"State transition probabilities"
"""
P = np.zeros((len(self._parameters._set_types),
len(self._parameters._set_urgencies),
len(self._parameters._set_state_karmas),
len(self._parameters._set_urgencies),
len(self._parameters._set_state_karmas),
))
for atype in self._parameters._set_types:
for urgency_next in self._parameters._set_urgencies:
for karma_next in self._parameters._set_state_karmas:
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
prob = 0
for action in policy._map_possible_actions[karma]:
prob += policy.prob_matrix[atype][urgency][karma][action] * rho[atype][urgency_next][karma_next][urgency][karma][action]
P[atype][urgency_next][karma_next][urgency][karma] = prob
return P
def _inifite_horizon_reward(self, R, P, max_iterations, convergence_threshold):
"""
Intermediate result, returns the expected inifinite horizon reward
(negative cost) for an agent of specific type, urgency, karma.
Parameters
----------
R : np.array [type][urgency][karma]
"Expected immediate reward"
P : np.array [type][urgency_next][karma_next][urgency][karma]
"State transition probabilities"
max_iterations: int
How many iterations at most shall be used to calculate the
infinite horizon reward.
convergence_threshold : float
After which difference to previous iteration the iterations can
be aborted, as inifinite horizon reward converged sufficiently.
Returns
-------
V : np.array [type][urgency][karma]
"Expected infinite horizon reward"
difference_V : float
The convergence difference at which the calculation stopped.
"""
V = np.zeros((len(self._parameters._set_types),
len(self._parameters._set_urgencies),
len(self._parameters._set_state_karmas),
))
# init with alpha = 0
for atype in self._parameters._set_types:
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
V[atype][urgency][karma] = R[atype][urgency][karma]
# calculate iteratively
for it in range(0,max_iterations):
last_V = np.copy(V)
for atype in self._parameters._set_types:
alpha = self._parameters._map_type_temp_preference[atype]
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
reward = R[atype][urgency][karma]
for urgency_next in self._parameters._set_urgencies:
for karma_next in self._parameters._set_state_karmas:
reward += alpha * P[atype][urgency_next][karma_next][urgency][karma] * V[atype][urgency_next][karma_next]
V[atype][urgency][karma] = reward
difference_V = np.sum(last_V-V)
if(difference_V<convergence_threshold):
break
return V, difference_V
def _agent_single_state_deviation_reward(self, policy: karma_game_library.Policy, xi, rho, V):
"""
Intermediate result, returns the expected reward (negative cost) for an
participant with specific type, urgency, karma and action.
Parameters
----------
policy : Policy
The current policy.
xi : np.array [urgency][action]
"Immediate reward function"
rho : np.array [type][urgency_next][karma_next][urgency][karma][action]
"Probabilistic state transition function"
V : np.array [type][urgency][karma]
"Expected infinite horizon reward"
Returns
-------
Q : np.array [type][urgency][karma][action]
"Single-stage deviation reward (from current policy)"
"""
Q = np.zeros((len(self._parameters._set_types),
len(self._parameters._set_urgencies),
len(self._parameters._set_state_karmas),
len(self._parameters._set_actions),
))
for atype in self._parameters._set_types:
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
for action in policy._map_possible_actions[karma]:
alpha = self._parameters._map_type_temp_preference[atype]
reward = xi[urgency][action]
for urgency_next in self._parameters._set_urgencies:
for karma_next in self._parameters._set_state_karmas:
reward += alpha * rho[atype][urgency_next][karma_next][urgency][karma][action] * V[atype][urgency_next][karma_next]
Q[atype][urgency][karma][action] = reward
return Q
def _peturbed_best_response(self, policy: karma_game_library.Policy, Q):
"""
Intermediate result, represents a modified, potentially better policy.
Parameters
----------
policy : Policy
The current policy.
Q : np.array [type][urgency][karma][action]
"Single-stage deviation reward (from current policy)"
Returns
-------
pi_pbp : np.array [type][urgency][karma][bid]
"Perturbed best response policy"
"""
pi_pbp = np.zeros((len(self._parameters._set_types),
len(self._parameters._set_urgencies),
len(self._parameters._set_state_karmas),
len(self._parameters._set_actions),
))
for atype in self._parameters._set_types:
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
for action in policy._map_possible_actions[karma]:
divisor = 0
Q_max = np.min(Q[atype][urgency][karma])
for other_action in policy._map_possible_actions[karma]:
if(Q[atype][urgency][karma][other_action] > Q_max):
Q_max = Q[atype][urgency][karma][other_action]
for other_action in policy._map_possible_actions[karma]:
divisor += np.exp(self.hyper_lambda * Q[atype][urgency][karma][other_action] - self.hyper_lambda * Q_max)
dividend = np.exp(self.hyper_lambda * Q[atype][urgency][karma][action] - self.hyper_lambda * Q_max)
if(divisor==0):
pi_pbp[atype][urgency][karma][action] = dividend
else:
pi_pbp[atype][urgency][karma][action] = dividend / divisor
return pi_pbp
def _determine_consecutive_distribution(self, d_in, P):
"""
This function multiplies a given distribution with a state transition
probabilities matrix.
Parameters
----------
d_in : np.array
The current distribution.
P : np.array [type][urgency_next][karma_next][urgency][karma]
"State transition probabilities"
Returns
-------
d_out : np.array
New distribution.
"""
d_out = np.zeros((len(self._parameters._set_urgencies), len(self._parameters._set_state_karmas)))
for urgency_next in self._parameters._set_urgencies:
for karma_next in self._parameters._set_state_karmas:
prob = 0
for urgency2 in self._parameters._set_urgencies:
for karma2 in self._parameters._set_state_karmas:
prob += d_in[urgency2][karma2] * P[urgency_next][karma_next][urgency2][karma2]
d_out[urgency_next][karma_next] = prob
d_out = d_out / np.sum(d_out)
return d_out
def _calculate_new_state(self, state, policy: karma_game_library.Policy, P):
"""
This function calculates the next state.
Parameters
----------
state : StateDistribution
The current state.
policy : Policy
The current policy.
P : np.array [type][urgency_next][karma_next][urgency][karma]
"State transition probabilities"
Returns
-------
update_difference : float
How much the policy changed due to the update.
"""
# calculate new state
dist_matrix_next = state.dist_matrix.copy()
for atype in self._parameters._set_types:
product = self._determine_consecutive_distribution(state.dist_matrix[atype], P[atype])
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
dist_matrix_next[atype][urgency][karma] = (1-self.hyper_dt)*state.dist_matrix[atype][urgency][karma] + (self.hyper_dt)*product[urgency][karma]
update_difference = np.sum(np.abs(dist_matrix_next - state.dist_matrix))
update_direction = np.dot(dist_matrix_next.flatten(), state.dist_matrix.flatten()) / (np.linalg.norm(dist_matrix_next) * np.linalg.norm(state.dist_matrix))
state.dist_matrix = dist_matrix_next
# check if distribution gets too wide, then need to extend karma space
agg_dist = np.sum(np.sum(dist_matrix_next, axis=0), axis=0)
# especially important when optimizing for pay bid to peer
# if(np.sum(av_dist[-int(len(av_dist)/2):])>self._parameters._creation_thr_state):
if(np.sum(agg_dist[-4:])>self._parameters._creation_thr_state):
self._enlarge_state_space = True
return update_difference, update_direction
def _calculate_new_policy(self, policy: karma_game_library.Policy, pi_pbp):
"""
This function calculates the next policy.
Parameters
----------
policy : Policy
The current policy.
pi_pbp : np.array [type][urgency][karma][bid]
"Perturbated best response policy"
Returns
-------
update_difference : float
How much the policy changed due to the update.
"""
# calculate new policy
prob_mat_next = policy.prob_matrix.copy()
for atype in self._parameters._set_types:
for urgency in self._parameters._set_urgencies:
for karma in self._parameters._set_state_karmas:
for action in policy._map_possible_actions[karma]:
prob_mat_next[atype][urgency][karma][action] = (1-self.hyper_nu*self.hyper_dt)*policy.prob_matrix[atype][urgency][karma][action] + (self.hyper_nu*self.hyper_dt)*pi_pbp[atype][urgency][karma][action]
update_difference = np.sum(np.abs(prob_mat_next-policy.prob_matrix))
update_direction = np.dot(prob_mat_next.flatten(), policy.prob_matrix.flatten()) / (np.linalg.norm(prob_mat_next) * np.linalg.norm(policy.prob_matrix))
policy.prob_matrix = prob_mat_next
# check if policy gets too high, then need to extend action space
agg_pol = np.max(np.max(np.max(prob_mat_next, axis=0), axis=0), axis=0)
if(agg_pol[-1:]>self._parameters._creation_thr_action):
self._enlarge_action_space = True
return update_difference, update_direction