-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexploitability_two_player_kuhn_poker
182 lines (146 loc) · 7.13 KB
/
exploitability_two_player_kuhn_poker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from typing import List, Dict
import itertools
import random
import numpy as np
import sys
Actions = ['B', 'C'] # bet/call vs check/fold
class InformationSet():
def __init__(self):
self.cumulative_regrets = np.zeros(shape=len(Actions))
self.strategy_sum = np.zeros(shape=len(Actions))
self.num_actions = len(Actions)
def normalize(self, strategy: np.array) -> np.array:
"""Normalize a strategy. If there are no positive regrets,
use a uniform random strategy"""
if sum(strategy) > 0:
strategy /= sum(strategy)
else:
strategy = np.array([1.0 / self.num_actions] * self.num_actions)
return strategy
def get_strategy(self, reach_probability: float) -> np.array:
"""Return regret-matching strategy"""
strategy = np.maximum(0, self.cumulative_regrets)
strategy = self.normalize(strategy)
self.strategy_sum += reach_probability * strategy
return strategy
def get_average_strategy(self) -> np.array:
return self.normalize(self.strategy_sum.copy())
class KuhnPoker():
@staticmethod
def is_terminal(history: str) -> bool:
return history in ['BC', 'BB', 'CC', 'CBB', 'CBC']
@staticmethod
def get_payoff(history: str, cards: List[str]) -> int:
"""get payoff for 'active' player in terminal history"""
if history in ['BC', 'CBC']:
return +1
else: # CC or BB or CBB
payoff = 2 if 'B' in history else 1
active_player = len(history) % 2
player_card = cards[active_player]
opponent_card = cards[(active_player + 1) % 2]
if player_card == 'K' or opponent_card == 'J':
return payoff
else:
return -payoff
class KuhnCFRTrainer():
def __init__(self):
self.infoset_map: Dict[str, InformationSet] = {}
def get_information_set(self, history: str) -> InformationSet:
"""add if needed and return"""
if history not in self.infoset_map:
self.infoset_map[history] = InformationSet()
return self.infoset_map[history]
def cfr(self, cards: List[str], history: str, reach_probabilities: np.array, active_player: int):
if KuhnPoker.is_terminal(history):
return KuhnPoker.get_payoff(history, cards)
my_card = cards[active_player]
info_set = self.get_information_set(my_card + history)
strategy = info_set.get_strategy(reach_probabilities[active_player])
opponent = (active_player + 1) % 2
counterfactual_values = np.zeros(len(Actions))
for ix, action in enumerate(Actions):
action_probability = strategy[ix]
# compute new reach probabilities after this action
new_reach_probabilities = reach_probabilities.copy()
new_reach_probabilities[active_player] *= action_probability
# recursively call cfr method, next player to act is the opponent
counterfactual_values[ix] = -self.cfr(cards, history + action, new_reach_probabilities, opponent)
# Value of the current game state is just counterfactual values weighted by action probabilities
node_value = counterfactual_values.dot(strategy)
for ix, action in enumerate(Actions):
info_set.cumulative_regrets[ix] += reach_probabilities[opponent] * (counterfactual_values[ix] - node_value)
return node_value
def train(self, num_iterations: int) -> int:
util = 0
kuhn_cards = ['J', 'Q', 'K']
for _ in range(num_iterations):
cards = random.sample(kuhn_cards, 2)
history = ''
reach_probabilities = np.ones(2)
util += self.cfr(cards, history, reach_probabilities, 0)
return util
def calc_best_response(node_map, br_strat_map, br_player, cards, history, active_player, prob):
"""
after chance node, so only decision nodes and terminal nodes left in game tree
"""
if KuhnPoker.is_terminal(history):
return -KuhnPoker.get_payoff(history, cards)
key = cards[active_player] + history
next_player = (active_player + 1) % 2
if active_player == br_player:
vals = [calc_best_response(node_map, br_strat_map, br_player, cards, history + action,
next_player, prob) for action in Actions]
best_response_value = max(vals)
if key not in br_strat_map:
br_strat_map[key] = np.array([0.0, 0.0])
br_strat_map[key] = br_strat_map[key] + prob * np.array(vals, dtype=np.float64)
return -best_response_value
else:
strategy = node_map[key].get_average_strategy()
action_values = [calc_best_response(node_map, br_strat_map, br_player, cards,
history + action, next_player, prob * strategy[ix])
for ix, action in enumerate(Actions)]
return -np.dot(strategy, action_values)
def calc_ev(p1_strat, p2_strat, cards, history, active_player):
if KuhnPoker.is_terminal(history):
return -KuhnPoker.get_payoff(history, cards)
my_card = cards[active_player]
next_player = (active_player + 1) % 2
if active_player == 0:
strat = p1_strat[my_card + history]
else:
strat = p2_strat[my_card + history]
return -np.dot(strat, [calc_ev(p1_strat, p2_strat, cards, history + a, next_player) for a in Actions])
def compute_exploitability(infoset_map):
kuhn_cards = ['J', 'Q', 'K']
exploitability = 0
br_strat_map = {}
for cards in itertools.permutations(kuhn_cards):
calc_best_response(infoset_map, br_strat_map, 0, cards, '', 0, 1.0)
calc_best_response(infoset_map, br_strat_map, 1, cards, '', 0, 1.0)
for k,v in br_strat_map.items():
v[:] = np.where(v == np.max(v), 1, 0)
cfr_strategy = {k: v.get_average_strategy() for k,v in infoset_map.items()}
for cards in itertools.permutations(kuhn_cards):
ev_1 = calc_ev(cfr_strategy, br_strat_map, cards, '', 0)
ev_2 = calc_ev(br_strat_map, cfr_strategy, cards, '', 0)
exploitability += 1 / 6 * (ev_1 - ev_2)
return exploitability, br_strat_map
if __name__ == "__main__":
if len(sys.argv) < 2:
num_iterations = 1000000
else:
num_iterations = int(sys.argv[1])
np.set_printoptions(precision=2, floatmode='fixed', suppress=True)
cfr_trainer = KuhnCFRTrainer() # Initialises self.infoset_map = {}
util = cfr_trainer.train(num_iterations)
print(f"\nRunning Kuhn Poker chance sampling CFR for {num_iterations} iterations")
print(f"\nExpected average game value (for player 1): {(-1./18):.3f}")
print(f"Computed average game value : {(util / num_iterations):.3f}\n")
print("We expect the bet frequency for a Jack to be between 0 and 1/3")
print("The bet frequency of a King should be three times the one for a Jack\n")
print(f"History Bet Pass")
for name, info_set in sorted(cfr_trainer.infoset_map.items(), key=lambda s: len(s[0])):
print(f"{name:3}: {info_set.get_average_strategy()}")
print(f"Exploitability of final strategy is: {compute_exploitability(cfr_trainer.infoset_map)[0]:.3f}")