Commit c70eea6e authored by David Milec's avatar David Milec

!Initial commit

parents

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.
from CFR import CFR
import random
import numpy as np
class ADAPTQR(CFR):
def __init__(self, fname, cfr_player=1, rationality=1, norm=False):
super().__init__(fname, cfr_plus=False)
self.cfr_player = cfr_player
self.rationality = rationality
self.progressive_strategy = False
self.norm = norm
self.p = None
self.prev_val = 0
self.move = 0.01
self.decay = 1
self.thr = 1.00001
self.last_br = True
self.val_avg = 0
self.best_strategy = None
self.best_value = -np.inf
self.fixed = False
self.best_combined_value = []
self.best_combined_strategy = []
self.best_comb_qr = []
self.best_comb_br = []
self.comb_numbers = 11
def solve(self, iterations=1000, verbose=0, save_progression=False, save_strategy=False, skip=1):
self.p = 0.5
self.adaptqr = True
self.decay = np.power(2, -1 / iterations)
for i in range(self.comb_numbers):
self.best_combined_value.append(-np.inf)
self.best_combined_strategy.append(None)
self.best_comb_qr.append(0)
self.best_comb_br.append(0)
super().solve(iterations, verbose, save_progression, save_strategy, skip)
self.iteration = 0
self.average_strategy = None
self.initialize()
self.fixed = True
super().solve(iterations, verbose, save_progression, save_strategy, skip)
def compute_strategy(self):
self.compute_strategy_for_player(self.cfr_player)
qr_val, qr = self.quantal_response(1 - self.cfr_player, self.strategy, self.rationality)
br_val, br = self.best_response(1 - self.cfr_player, self.strategy)
if self.best_value < qr_val:
self.best_value = qr_val
self.best_strategy = self.strategy
for i, comb_number in enumerate(np.linspace(0, 1, self.comb_numbers)):
if self.best_combined_value[i] < qr_val + comb_number * br_val:
self.best_combined_value[i] = qr_val + comb_number * br_val
self.best_combined_strategy[i] = self.strategy
self.best_comb_qr[i] = qr_val
self.best_comb_br[i] = br_val
if not self.fixed:
# if self.iteration > 100:
# move = val - self.prev_val if self.last_br else self.prev_val - val
# self.p -= move / self.val_avg * self.move
# elif self.iteration < 100:
# self.val_avg += abs(val - self.prev_val)
# else:
# self.val_avg /= 100
up_thr_val = self.thr * self.prev_val if self.prev_val > 0 else self.prev_val / self.thr
do_thr_val = self.prev_val / self.thr if self.prev_val > 0 else self.thr * self.prev_val
if qr_val > up_thr_val:
if self.last_br:
self.p -= self.move
else:
self.p += self.move
elif qr_val < do_thr_val:
if self.last_br:
self.p += self.move
else:
self.p -= self.move
self.prev_val = qr_val
self.move *= self.decay
# print("p", self.p)
r = random.random()
if r < self.p:
self.last_br = False
if self.norm:
self.strategy = self.normalized_quantal_response(1 - self.cfr_player, self.strategy, self.rationality)[
1]
else:
self.strategy = qr
else:
self.last_br = True
self.strategy = br
def compute_regret(self):
self.compute_regret_for_player(self.cfr_player)
def print_responses(self):
# print()
responses = [self.best_response(1 - self.cfr_player, self.average_strategy)[0],
self.best_response(1 - self.cfr_player, self.strategy)[0], [], []]
responses[2] = self.quantal_response(1 - self.cfr_player, self.average_strategy, self.rationality)[0]
responses[3] = self.quantal_response(1 - self.cfr_player, self.strategy, self.rationality)[0]
# print("BR:", responses[0], end=" ")
# print("QR:", responses[2])
# print("BRc:", responses[1], end=" ")
# print("QRc:", responses[3],
# end=" ")
return responses
def return_values(self):
return self.quantal_response(1 - self.cfr_player, self.average_strategy, self.rationality)[0], \
self.best_response(1 - self.cfr_player, self.average_strategy)[0], self.best_comb_qr, self.best_comb_br
import numpy as np
from scipy.optimize import NonlinearConstraint
from scipy.optimize import LinearConstraint
from scipy.optimize import Bounds
from scipy.optimize import minimize
from CFR import CFR
from ExtensiveGame import ExtensiveGame
from math import exp
from SequenceNash import SequenceNash
class BestNE:
def __init__(self, fname):
# save game name
self.fname = fname
# Game variables
self.game = ExtensiveGame()
self.game.load(fname)
# Variables to save
self.is_to_node = {}
self.action_index = {}
self.action = 1
self.i_set = 1
self.i_sets = set()
self.rationality = None
self.nash_value = None
self.it = 0
# Constraints
self.constraints = None
self.nonlinear_constraints = None
self.x0 = None
self.bounds = None
# Just information variable
self.nodes = 0
# Saved solution in object
self.solution = None
def solve(self, x0=None, rationality=1, method="SLSQP"):
seq_nash = SequenceNash(self.fname)
seq_nash.solve()
self.nash_value = seq_nash.solution
self.rationality = rationality
self.nodes = 0
self.create_variables()
self.create_expresions()
if x0 is not None:
self.x0 = x0
self.solution = minimize(self.objective, self.x0, method=method, bounds=self.bounds,
constraints=[self.constraints, self.nonlinear_constraints], options={'maxiter': 20})
return self.solution
def create_variables(self):
self.create_variables_step(self.game.root, (-1, -1), 0, 1.0)
def create_variables_step(self, node, last_node_action, last_node_strat, chance):
self.nodes += 1
if node.player == 3:
# Terminal node
if last_node_action not in self.is_to_node:
self.is_to_node[last_node_action] = []
self.is_to_node[last_node_action].append((node, chance, last_node_strat))
elif node.player == 2:
# Chance node
for child, next_chance in zip(node.children, node.chance):
self.create_variables_step(child, last_node_action, last_node_strat, chance * next_chance)
elif node.player == 0:
# Player 1
if last_node_action not in self.is_to_node:
self.is_to_node[last_node_action] = []
if node.i_set not in self.is_to_node[last_node_action]:
self.is_to_node[last_node_action].append(node.i_set)
self.i_sets.add(node.i_set)
for i, child in enumerate(node.children):
self.create_variables_step(child, (node.i_set, i), last_node_strat, chance)
else:
# Player 2 (my agent)
create_strats = False
if (last_node_strat, node.i_set) not in self.action_index:
self.action_index[(last_node_strat, node.i_set)] = []
create_strats = True
for i, child in enumerate(node.children):
if create_strats:
self.action_index[(last_node_strat, node.i_set)].append(self.action)
self.action += 1
self.create_variables_step(child, last_node_action,
self.action_index[((last_node_strat, node.i_set))][i],
chance)
def objective(self, x):
# print(x[self.action])
return x[self.action]
def random_strategy(self):
constrain_matrix = self.constraints.A
x0 = [0] * len(constrain_matrix[0])
x0[0] = 1
for i in range(1, len(constrain_matrix)):
parent_val = 0
b = sum(constrain_matrix[i])
temp_strat = np.random.uniform(0, 1, b + 1)
temp_strat = temp_strat / sum(temp_strat)
index = 0
for j in range(len(constrain_matrix[i])):
if constrain_matrix[i][j] == -1:
parent_val = x0[j]
if constrain_matrix[i][j] == 1:
x0[j] = temp_strat[index] * parent_val
index += 1
return x0
def create_expresions(self):
size = self.action + len(self.i_sets) + 1
ne_ad = size + 1
size += len(self.i_sets) + 1
# print("Actions:", self.action)
# print("Information sets:", len(self.i_sets))
# print("Nodes:", self.nodes)
adition = 1
const_array = []
lesser = []
greater = []
app = [0] * size
app[0] = 1
const_array.append(app)
lesser.append(1)
greater.append(1)
for key, value in self.action_index.items():
app = [0] * size
app[key[0]] = -1
for val in value:
app[val] = 1
const_array.append(app)
lesser.append(0)
greater.append(0)
for key, value in self.is_to_node.items():
app = [0] * size
if key[0] == -1:
app[ne_ad - 1] = -1
else:
app[key[0] + ne_ad] = -1
for val in value:
if isinstance(val, int):
app[val + ne_ad] += 1
else:
app[val[2]] += val[0].value * val[1]
lesser.append(-np.inf)
greater.append(0)
const_array.append(app)
app = [0] * size
app[ne_ad - 1] = 1
const_array.append(app)
lesser.append(-np.inf)
greater.append(self.nash_value)
self.constraints = LinearConstraint(const_array, lesser, greater)
# print(self.constraints.A)
# print(lesser)
# print(const_array)
# print(greater)
self.x0 = [0] * size
lb = [-np.inf] * size
ub = [np.inf] * size
for i in range(self.action):
lb[i] = 0
ub[i] = 1
self.bounds = Bounds(lb, ub)
recreated = {}
for key, value in self.is_to_node.items():
if key[0] not in recreated:
recreated[key[0]] = []
recreated[key[0]].append(value)
ret = "def f(x):\n return ["
# print(recreated)
for key, value in recreated.items():
if key == -1:
key_mod = self.action
else:
key_mod = key + adition + self.action
ret += "-x[" + str(key_mod) + "] "
top = "("
bot = "("
for action_values in value:
expr = ""
for single_value in action_values:
if isinstance(single_value, int):
expr += "+x[" + str(single_value + adition + self.action) + "] "
else:
expr += "+" + str(single_value[0].value) + "*" + str(
single_value[1]) + "*x[" + str(single_value[2]) + "] "
top += "+(" + expr + ")*exp(" + str(self.rationality) + "*(" + expr + "))"
bot += "+exp(" + str(self.rationality) + "*(" + expr + "))"
ret += "+" + top + ")/" + bot + "),"
ret = ret.strip(",")
ret += "]"
exec(ret, globals())
# print(ret)
self.nonlinear_constraints = NonlinearConstraint(f, 0, 0)
def strategy_in_cfr_format(self):
if self.solution is None:
print("Instance not yet solved.")
else:
cfrqr = CFR(self.fname)
cfrqr.solve(1)
x = self.solution['x']
for key, value in self.action_index.items():
temp = [0] * len(value)
i = 0
if x[key[0]] < 0.000001:
temp = [1 / len(value)] * len(value)
else:
for val in value:
temp[i] = x[val] / x[key[0]]
i += 1
cfrqr.strategy[1][key[1]] = temp
return cfrqr.strategy
This diff is collapsed.
from CFR import CFR
class CFRQRCFV(CFR):
def __init__(self, fname, cfr_player=1, rationality=1):
super().__init__(fname, cfr_plus=False)
self.cfr_player = cfr_player
self.rationality = rationality
def compute_strategy(self):
self.compute_strategy_for_player(self.cfr_player)
self.strategy = self.quantal_response(1 - self.cfr_player, self.strategy, self.rationality)[1]
def compute_regret(self):
self.compute_regret_for_player(self.cfr_player)
def print_responses(self):
print()
responses = [self.best_response(0, self.average_strategy)[0], self.best_response(0, self.strategy)[0], [], []]
responses[2] = self.quantal_response(1 - self.cfr_player, self.average_strategy, self.rationality)[0]
responses[3] = self.quantal_response(1 - self.cfr_player, self.strategy, self.rationality)[0]
print("BR:", self.best_response(1 - self.cfr_player, self.average_strategy)[0], end=" ")
print("QR:", self.quantal_response(1 - self.cfr_player, self.average_strategy, self.rationality)[0],
end=" ")
return responses
import copy
import numpy as np
class Combination:
def __init__(self, s1, s2, cfr):
# print(len(s1))
# print(len(s2))
self.s1 = s1
self.s2 = s2
self.s3 = copy.deepcopy(s1)
self.visited = []
self.p = None
self.cfr = cfr
self.game = cfr.game
def best_strategy(self, splits=11):
space = self.combination_space(splits)
def combination_space(self, splits=11):
space = []
for p in np.linspace(0, 1, splits):
space.append(copy.deepcopy(self.combine_strategies(p)))
return space
def combine_strategies(self, p):
self.p = p
self.visited = []
self.combine_strategy_at_node(self.game.root, [1, 1], [1, 1])
return self.s3
def combine_strategy_at_node(self, node, s1_reach, s2_reach):
player = node.player
if player == 3:
return
elif player == 2:
for child in node.children:
self.combine_strategy_at_node(child, s1_reach, s2_reach)
else:
for i, child in enumerate(node.children):
new_s1_reach = [0, 0]
new_s2_reach = [0, 0]
new_s1_reach[1 - player] = s1_reach[1 - player]
new_s1_reach[player] = s1_reach[player] * self.s1[player][node.i_set][i]
new_s2_reach[1 - player] = s2_reach[1 - player]
new_s2_reach[player] = s2_reach[player] * self.s2[player][node.i_set][i]
self.combine_strategy_at_node(child, new_s1_reach, new_s2_reach)
if (player, node.i_set) not in self.visited and len(node.children) > 0:
self.visited.append((player, node.i_set))
self.combine_in_is(player, node.i_set, s1_reach[player], s2_reach[player])
def combine_in_is(self, player, iset, s1reach, s2reach):
if s1reach * self.p + s2reach * (1 - self.p) == 0:
self.set_uniform_result(player, iset)
else:
self.s3[player][iset] = np.divide(np.multiply(self.s1[player][iset], s1reach * self.p) + np.multiply(
self.s2[player][iset], s2reach * (1 - self.p)), s1reach * self.p + s2reach * (1 - self.p))
def set_uniform_result(self, player, iset):
strategy_size = len(self.s3[player][iset])
self.s3[player][iset] = [1 / strategy_size] * strategy_size
from CFR import CFR
from CFRBR import CFRBR
from CFRQRCFV import CFRQRCFV
from CFRQRNORM import CFRQRNORM
from SequenceNash import SequenceNash
from DataManipulation import *
from Plot import *
import numpy as np
from RQR import RQR
from BestNE import BestNE
from SequenceQSE import SequenceQSE
from CFRLin import CFRLin
from ADAPTQR import ADAPTQR
import glob
# computes set convergence curves and prints horizontal lines where nash br and nash qr values are
def plot_convergence_curves_with_nash_values(fname, algorithm, iterations=1000, cfr_plus=False, cfr_player=1,
rationality=1, current=True, average=True, br=True, qr=True,
logscale_y=False, logscale_x=False, lines=True, skip=1, p=0.5,
plot_qse=True):
if algorithm == "cfr":
cfr_alg = CFR(fname, cfr_plus=cfr_plus)
qr = False
elif algorithm == "cfrbr":
cfr_alg = CFRBR(fname, cfr_player=cfr_player)
qr = False
elif algorithm == "cfrqrcfv":
cfr_alg = CFRQRCFV(fname, cfr_player=cfr_player, rationality=rationality)
elif algorithm == "cfrqrnorm":
cfr_alg = CFRQRNORM(fname, cfr_player=cfr_player, rationality=rationality)
elif algorithm == "rqr":
cfr_alg = RQR(fname, cfr_player=cfr_player, rationality=rationality, p=p)
elif algorithm == "rqrcfv":
cfr_alg = RQR(fname, cfr_player=cfr_player, rationality=rationality, p=p, norm=False)
elif algorithm == "cfrlin":
cfr_alg = CFRLin(fname, cfr_player=cfr_player, rationality=rationality)
qr = False
else:
raise ValueError("Wrong algorithm specification, use one of: cfr, cfrbr, cfrqrcfv, cfrqrnorm")
cfr_alg.solve(iterations=iterations, verbose=3, save_progression=True, skip=skip)
curve = None
if current:
if average:
if br:
if qr:
if algorithm == "cfr" or algorithm == "cfrbr":
raise AttributeError("cfr and cfrbr does not have qr curves")
curve = create_plotable_progression(cfr_alg.progression)
else:
curve = create_plotable_progression_both_br(cfr_alg.progression)
else:
if qr:
if algorithm == "cfr" or algorithm == "cfrbr":
raise AttributeError("cfr and cfrbr does not have qr curves")
curve = create_plotable_progression_both_qr(cfr_alg.progression)
else:
if br:
if qr:
if algorithm == "cfr" or algorithm == "cfrbr":
raise AttributeError("cfr and cfrbr does not have qr curves")
curve = create_plotable_progression_current_br_qr(cfr_alg.progression)
else:
curve = create_plotable_progression_current_br(cfr_alg.progression)
else:
if qr:
if algorithm == "cfr" or algorithm == "cfrbr":
raise AttributeError("cfr and cfrbr does not have qr curves")
curve = create_plotable_progression_current_qr(cfr_alg.progression)
else:
if average:
if br:
if qr:
if algorithm == "cfr" or algorithm == "cfrbr":
raise AttributeError("cfr and cfrbr does not have qr curves")
curve = create_plotable_progression_avg_br_qr(cfr_alg.progression)
else:
curve = create_plotable_progression_avg_br(cfr_alg.progression)
else:
if qr:
if algorithm == "cfr" or algorithm == "cfrbr":
raise AttributeError("cfr and cfrbr does not have qr curves")
curve = create_plotable_progression_avg_qr(cfr_alg.progression)
if curve is None:
raise AttributeError("Curve set is empty from your specification, set at least one "
"of current and average to True and at least one of br or qr to True")
# cfr_alg.print_strategy(cfr_alg.average_strategy, 1, compact=True, decimal_points=8)
name, axis_labels = data_for_graph_of_convergence_curve(algorithm, fname, rationality)
seq_nash = BestNE(fname)
seq_nash.solve(rationality=rationality)
hlines = None
if lines:
hlines = [(cfr_alg.best_response(0, seq_nash.strategy_in_cfr_format())[0], "Nash equilibrium value", "b", ":")]
if qr:
if algorithm == "cfrqrcfv" or algorithm == "rqrcfv":
hlines.append(
(cfr_alg.quantal_response(0, seq_nash.strategy_in_cfr_format(), rationality=rationality)[0],
"Value of QR to nash equilibrium strategy", "r", ":"))
else:
hlines.append((cfr_alg.normalized_quantal_response(0, seq_nash.strategy_in_cfr_format(),
rationality=rationality)[0],
"Value of QR to nash equilibrium strategy", "r", ":"))
if plot_qse:
seq_qse = SequenceQSE(fname)
seq_qse.solve(rationality=rationality)
hlines.append(
(
cfr_alg.best_response(0, seq_qse.strategy_in_cfr_format())[0], "Value of BR to QSE strategy", "m",
":"))
if algorithm == "cfrqrcfv" or algorithm == "rqrcfv":
hlines.append(
(cfr_alg.quantal_response(0, seq_qse.strategy_in_cfr_format(), rationality=rationality)[0],
"Value of QSE", "y", ":"))
else:
hlines.append((cfr_alg.normalized_quantal_response(0, seq_qse.strategy_in_cfr_format(),
rationality=rationality)[0],
"Value of QSE", "y", ":"))
plot_convergence_curves(curve, axis_labels, name, logscale_y=logscale_y, logscale_x=logscale_x, hlines=hlines)
# computes exploitation and exploitability curves for specified algorithm
def plot_exploitation_and_exploitability(fname, algorithm, iterations=1000, cfr_player=1, rationality=1, current=True,
average=True, logscale_y=False, logscale_x=False, exploitability_on_axis=False,
zeroline=False):
if algorithm == "cfrqrcfv":
cfr_alg = CFRQRCFV(fname, cfr_player=cfr_player, rationality=rationality)