Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
7
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
Martin Řepa
bachelor-thesis
Commits
4a8a1f24
Commit
4a8a1f24
authored
Jan 31, 2019
by
Martin Řepa
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Wrap code so it's more user-friendly
parent
bcf23a4b
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
525 additions
and
141 deletions
+525
-141
src/config.py
src/config.py
+78
-0
src/data/features.py
src/data/features.py
+9
-1
src/data/loader.py
src/data/loader.py
+9
-1
src/data/synthesizer.py
src/data/synthesizer.py
+116
-0
src/game.py
src/game.py
+48
-30
src/game_solver.py
src/game_solver.py
+113
-98
src/main.py
src/main.py
+10
-0
src/neural_networks/model.png
src/neural_networks/model.png
+0
-0
src/neural_networks/network.py
src/neural_networks/network.py
+11
-11
src/utility.py
src/utility.py
+28
-0
src/visual/plotter.py
src/visual/plotter.py
+103
-0
No files found.
src/config.py
0 → 100644
View file @
4a8a1f24
from
pathlib
import
Path
from
typing
import
Callable
import
attr
from
utility
import
base_utility
@
attr
.
s
class
NeuralNetworkConfig
:
# Number of epochs in a neural network training phase
epochs
:
int
=
attr
.
ib
(
default
=
40
)
# String with loss_function definition.
# List of available functions: https://keras.io/losses/
loss_function
:
str
=
attr
.
ib
(
default
=
'binary_crossentropy'
)
# String with optimizer definition used to compile neural network model
# List of available optimizers: https://keras.io/optimizers/
optimizer
:
str
=
attr
.
ib
(
default
=
'adam'
)
# Weight of false positive result for training phase
# The bigger it is, less false positives will neural network produce
# Setting it to 1 makes all results neutral
fp_weight
:
int
=
attr
.
ib
(
default
=
5
)
@
attr
.
s
class
TrainingNnConfig
:
# Path to .csv file with scored data which will be used as benign data
# in neural network training phase
benign_data_file_path
:
Path
=
attr
.
ib
(
default
=
Path
(
'data/scored/all_benign_scored.csv'
))
# Number of benign records to be used
benign_data_count
:
int
=
attr
.
ib
(
default
=
1000
)
# Specifying number of fake malicious DNS records created each
# iteration of double oracle algorithm from attacker's actions used in
# neural network training phase
malicious_data_count
:
int
=
attr
.
ib
(
default
=
100
)
@
attr
.
s
class
BaseConfig
:
# Sets logger to debug level
debug
:
bool
=
attr
.
ib
(
default
=
True
)
# Determine whether to plot final results
# Do not use if features_count > 2!
plot_result
:
bool
=
attr
.
ib
(
default
=
True
)
# Number of features
features_count
:
int
=
attr
.
ib
(
default
=
2
)
# 2 neural networks are considered the same if difference of game value for
# them and each attacker's action is less than epsion
epsilon
:
float
=
attr
.
ib
(
default
=
0.05
)
# Number of false positives allowed in defender's mixture strategy.
# Sum(probability of each action times its fp_rate) must be less than this
# number. Fp_rate of the action is total number of malicious prediction for
# given benign data set
false_positives_allowed
:
int
=
attr
.
ib
(
default
=
10
)
# Function to calculate utility given the actions
# f: List[float], NeuralNetwork -> float
utility_function
:
Callable
=
attr
.
ib
(
default
=
base_utility
)
@
attr
.
s
class
RootConfig
:
base_conf
:
BaseConfig
=
attr
.
ib
(
default
=
BaseConfig
())
nn_conf
:
NeuralNetworkConfig
=
attr
.
ib
(
default
=
NeuralNetworkConfig
())
nn_train_conf
:
TrainingNnConfig
=
attr
.
ib
(
default
=
TrainingNnConfig
())
if
__name__
==
"__main__"
:
pass
src/data/features.py
View file @
4a8a1f24
...
...
@@ -6,9 +6,10 @@ import attr
@
attr
.
s
class
Feature
:
name
=
attr
.
ib
(
factory
=
str
)
func
=
attr
.
ib
(
factory
=
Callable
)
func
=
attr
.
ib
(
factory
=
Callable
)
# Function f: str -> int
# Calculate occurrence of the unusual letters in a string
def
uncommon_letters_score
(
word
:
str
)
->
int
:
unusual
=
(
'q'
,
'x'
,
'z'
,
'f'
)
score
=
0
...
...
@@ -18,10 +19,12 @@ def uncommon_letters_score(word: str) -> int:
return
score
# Calculate domain normalised uncommon_letters_score
def
normalised_letters_score
(
word
:
str
)
->
float
:
return
uncommon_letters_score
(
word
)
/
255
# Calculate entropy given the string
def
entropy
(
word
:
str
)
->
float
:
e
=
0.0
length
=
len
(
word
)
...
...
@@ -34,14 +37,18 @@ def entropy(word: str) -> float:
return
e
# Calculate normalised domain entropy. Max possible entropy is 5.2 (longest
# domain can have up to 255 chars)
def
norm_entropy
(
word
:
str
)
->
float
:
return
entropy
(
word
)
/
5.2
# Calculate normalised domain length
def
norm_len
(
word
:
str
)
->
float
:
return
len
(
word
)
/
255
# Create first line to .csv file with features
def
initial_line
(
features
:
List
[
Feature
],
debug
:
bool
=
False
)
->
str
:
line
=
'query'
if
debug
else
''
for
feature
in
features
:
...
...
@@ -52,6 +59,7 @@ def initial_line(features: List[Feature], debug: bool = False) -> str:
return
line
# Creates a line to .csv file with scored query based on given features
def
score_query
(
features
:
List
[
Feature
],
query
:
str
,
debug
:
bool
=
False
)
->
str
:
line
=
query
if
debug
else
''
for
feature
in
features
:
...
...
src/data/loader.py
View file @
4a8a1f24
...
...
@@ -4,7 +4,15 @@ import numpy as np
import
pandas
def
np_arrays_from_scored_csv
(
file
:
Path
,
label
:
int
,
count_max
:
int
=
None
,
shuffle
=
False
):
def
np_arrays_from_scored_csv
(
file
:
Path
,
label
:
int
,
count_max
:
int
=
None
,
shuffle
=
False
):
"""
Returns 2 x N array
Zero index contains array with data in a given .csv file.
First index contains array with label (2nd arg) for each line in .csv file
See usage in main
"""
content
=
pandas
.
read_csv
(
file
)
batch
=
[]
labels
=
[]
...
...
src/data/synthesizer.py
0 → 100644
View file @
4a8a1f24
from
pathlib
import
Path
from
typing
import
Tuple
,
List
,
Callable
import
matplotlib.pyplot
as
plt
import
random
import
attr
import
numpy
as
np
@
attr
.
s
class
Cluster
:
center
:
List
[
float
]
=
attr
.
ib
()
radius
:
float
=
attr
.
ib
()
num_of_points
:
int
=
attr
.
ib
()
class
Synthesizer
:
"""
Synthesizer generates synthetic data which might be used for testing.
:arg features_num specifies dimension of a point space
"""
def
__init__
(
self
,
features_num
:
int
):
self
.
_features_num
:
int
=
features_num
self
.
_clusters
:
List
[
Cluster
]
=
[]
self
.
points
:
List
[
List
[
float
]]
=
[]
self
.
_generated
=
False
def
add_cluster
(
self
,
cluster
:
Cluster
):
"""
Add points to a space given the cluster
:param cluster: definition of a cluster
"""
if
cluster
.
radius
<
0
or
cluster
.
radius
>
1
:
raise
ValueError
(
'Radius must be in range [0;1]'
)
if
len
(
cluster
.
center
)
!=
self
.
_features_num
:
raise
ValueError
(
f
'Center must be element of a space R^'
f
'
{
self
.
_features_num
}
'
)
self
.
_clusters
.
append
(
cluster
)
def
add_cluster_around_2Dfunc
(
self
,
func
:
Callable
,
radius
:
float
):
"""
Add some points around a function.
Possible only for space R^2
"""
x_axis
=
np
.
linspace
(
0
,
1
,
40
)
tmp
=
int
(
radius
*
100
)
for
x
in
x_axis
:
y
=
func
(
x
)
if
y
<
0
or
y
>
1
:
continue
x
+=
random
.
randint
(
-
tmp
,
tmp
)
/
100
y
+=
random
.
randint
(
-
tmp
,
tmp
)
/
100
x
=
min
(
max
(
x
,
0
),
1
)
y
=
min
(
max
(
y
,
0
),
1
)
self
.
points
.
append
([
x
,
y
])
def
generate
(
self
):
self
.
_generated
=
True
for
cluster
in
self
.
_clusters
:
tmp
=
int
(
cluster
.
radius
*
100
)
for
iter
in
range
(
cluster
.
num_of_points
):
point
=
[]
for
i
in
range
(
self
.
_features_num
):
coord
=
random
.
randint
(
-
tmp
,
tmp
)
/
100
coord
=
min
(
max
(
cluster
.
center
[
i
]
+
coord
,
0
),
1
)
point
.
append
(
coord
)
self
.
points
.
append
(
point
)
def
plot2D
(
self
):
"""
Show generated data in a 2D chart
Possible only for space R^2
Data needs to be generated at first with 'generate' function
"""
if
not
self
.
_generated
:
raise
RuntimeError
(
'Trying to plot while points are not generated'
)
if
self
.
_features_num
!=
2
:
raise
ValueError
(
f
'Trying to plot in 2D, but there are '
f
'
{
self
.
_features_num
}
features'
)
plt
.
xlim
(
0
,
1
)
plt
.
ylim
(
0
,
1
)
for
point
in
self
.
points
:
plt
.
scatter
(
point
[
0
],
point
[
1
],
c
=
'red'
)
plt
.
show
()
def
save_to_file
(
self
,
path
:
Path
):
"""
Save generated data to a file.
Data needs to be generated at first with 'generate' function
"""
if
not
self
.
_generated
:
raise
RuntimeError
(
'Trying to save while points are not generated'
)
with
open
(
path
,
'w'
,
encoding
=
'utf-8'
)
as
file
:
initial_line
=
','
.
join
([
f
'initial
{
i
}
'
for
i
in
range
(
0
,
self
.
_features_num
)])
file
.
write
(
f
'
{
initial_line
}
\n
'
)
for
point
in
self
.
points
:
line
=
','
.
join
([
str
(
c
)
for
c
in
point
])
file
.
write
(
f
'
{
line
}
\n
'
)
if
__name__
==
"__main__"
:
synt
=
Synthesizer
(
2
)
synt
.
add_cluster_around_2Dfunc
(
lambda
x
:
0.2
,
0.05
)
synt
.
add_cluster_around_2Dfunc
(
lambda
x
:
0.8
,
0.05
)
synt
.
add_cluster
(
Cluster
([
0.2
,
0.2
],
0.15
,
200
))
synt
.
add_cluster
(
Cluster
([
0.2
,
0.8
],
0.15
,
200
))
synt
.
add_cluster
(
Cluster
([
0.8
,
0.2
],
0.15
,
200
))
synt
.
add_cluster
(
Cluster
([
0.8
,
0.8
],
0.15
,
200
))
synt
.
generate
()
synt
.
plot2D
()
synt
.
save_to_file
(
Path
(
'scored/test.csv'
))
src/game.py
View file @
4a8a1f24
import
functools
import
itertools
import
operator
from
typing
import
List
import
logging
import
numpy
as
np
from
src.game_solver
import
GameSolver
from
src.neural_networks.network
import
NeuralNetwork
from
src.config
import
RootConfig
from
src.game_solver
import
GameSolver
,
Result
from
src.visual.plotter
import
Plotter
FEATURES_NO
=
2
SUCCESS_ATTACK_CONST
=
1
logger
=
logging
.
getLogger
(
__name__
)
def
create_attacker_actions
(
features_no
:
int
):
one_axis
=
np
.
linspace
(
0
,
1
,
101
)
# [0.00, 0.01, 0.02, ..., 0.99, 1.00]
return
list
(
itertools
.
product
(
one_axis
,
*
itertools
.
repeat
(
one_axis
,
features_no
-
1
)))
def
setup_loger
(
conf
:
RootConfig
):
log_format
=
(
'%(asctime)-15s
\t
%(name)s:%(levelname)s
\t
'
'%(module)s:%(funcName)s:%(lineno)s
\t
%(message)s'
)
level
=
logging
.
DEBUG
if
conf
.
base_conf
.
debug
else
logging
.
INFO
logging
.
basicConfig
(
level
=
level
,
format
=
log_format
)
def
utility
(
attacker_features
:
List
[
float
],
defender_network
:
NeuralNetwork
):
pred
=
defender_network
.
predict
(
attacker_features
)
return
functools
.
reduce
(
operator
.
mul
,
attacker_features
,
1
)
*
(
1
-
pred
)
*
SUCCESS_ATTACK_CONST
class
Game
:
def
__init__
(
self
,
conf
:
RootConfig
=
RootConfig
()):
setup_loger
(
conf
)
self
.
_conf
=
conf
self
.
result
:
Result
=
None
def
_create_attacker_actions
(
self
):
one_axis
=
np
.
linspace
(
0
,
1
,
101
)
# [0.00, 0.01, 0.02, ..., 0.99, 1.00]
axes
=
self
.
_conf
.
base_conf
.
features_count
-
1
return
list
(
itertools
.
product
(
one_axis
,
*
itertools
.
repeat
(
one_axis
,
axes
)))
def
solve_game
():
print
(
'Creating
actions'
)
actions_attacker
=
create_attacker_actions
(
FEATURES_NO
)
def
solve_game
(
self
):
logger
.
debug
(
'Creating attacker
\'
s
actions'
)
actions_attacker
=
self
.
_
create_attacker_actions
()
gs
=
GameSolver
()
val
,
ordered_actions_p1
,
prob_p1
,
ordered_actions_p2
,
prob_p2
=
gs
.
double_oracle
(
actions_attacker
,
utility
,
log
=
True
)
logger
.
info
(
"Starting game solver"
)
gs
=
GameSolver
(
self
.
_conf
)
self
.
result
:
Result
=
gs
.
double_oracle
(
actions_attacker
)
print
(
'
\n\n
-------------------------------------------------'
)
print
(
f
'Game has ended with value:
{
val
}
'
)
print
(
'Attacker: action x probability'
)
for
a
,
p
in
zip
(
ordered_actions_p1
,
prob_p1
):
print
(
f
'
{
a
}
x
{
p
}
'
)
print
(
'
\n
Defender: action x probability'
)
for
nn
,
p
in
zip
(
ordered_actions_p2
,
prob_p2
):
print
(
f
'
{
nn
}
x
{
p
}
'
)
print
(
'-------------------------------------------------'
)
self
.
write_summary
()
self
.
plot_result
()
def
write_summary
(
self
):
print
(
'
\n\n
-------------------------------------------------'
)
logger
.
info
(
f
'Game has ended with value:
{
self
.
result
.
value
}
'
)
logger
.
info
(
'Attacker: action x probability'
)
for
a
,
p
in
zip
(
self
.
result
.
ordered_actions_p1
,
self
.
result
.
probs_p1
):
logger
.
info
(
f
'
{
a
}
x
{
p
}
'
)
def
main
():
solve_game
()
print
(
"
\n
"
)
logger
.
info
(
'Defender: action x probability'
)
for
nn
,
p
in
zip
(
self
.
result
.
ordered_actions_p2
,
self
.
result
.
probs_p2
):
logger
.
info
(
f
'
{
nn
}
x
{
p
}
'
)
print
(
'-------------------------------------------------'
)
def
plot_result
(
self
):
if
self
.
_conf
.
base_conf
.
plot_result
:
logger
.
debug
(
"Plotting result..."
)
p
=
Plotter
(
self
.
result
.
ordered_actions_p1
,
self
.
result
.
probs_p1
,
self
.
result
.
ordered_actions_p2
,
self
.
result
.
probs_p2
)
p
.
plot_result
()
if
__name__
==
"__main__"
:
main
()
Game
().
solve_game
()
src/game_solver.py
View file @
4a8a1f24
import
logging
import
operator
from
itertools
import
count
from
pathlib
import
Path
from
typing
import
List
,
Callable
from
typing
import
List
import
attr
import
pulp
from
config
import
RootConfig
from
src.data.loader
import
np_arrays_from_scored_csv
from
src.neural_networks.network
import
NeuralNetwork
FP_CONSTANT
=
10
BENIGN_DATA_N0
=
1000
MALICIOUS_DATA_N0
=
100
EPSILON
=
0.05
logger
=
logging
.
getLogger
(
__name__
)
@
attr
.
s
class
Result
:
value
:
int
=
attr
.
ib
()
ordered_actions_p1
:
List
=
attr
.
ib
()
probs_p1
:
List
=
attr
.
ib
()
ordered_actions_p2
:
List
=
attr
.
ib
()
probs_p2
:
List
=
attr
.
ib
()
class
GameSolver
:
def
__init__
(
self
):
self
.
benign_data
=
np_arrays_from_scored_csv
(
Path
(
'data/scored/all_benign_scored.csv'
),
0
,
BENIGN_DATA_N0
)
def
__init__
(
self
,
conf
:
RootConfig
):
self
.
conf
=
conf
self
.
utility
=
conf
.
base_conf
.
utility_function
# Test data to calculate how many false positives the neural networks do
# self.benign_test_data = np_arrays_from_scored_csv(Path('data/scored/all_benign_scored.csv'), 0, 100, True)
train
=
conf
.
nn_train_conf
self
.
benign_data
=
np_arrays_from_scored_csv
(
train
.
benign_data_file_path
,
0
,
train
.
benign_data_count
)
def
_get_trained_nn
(
self
,
attacker_features_x
:
List
[
List
[
float
]])
->
NeuralNetwork
:
# Initialize the model
network
=
NeuralNetwork
()
network
=
NeuralNetwork
(
self
.
conf
.
base_conf
.
features_count
,
self
.
conf
.
nn_conf
)
network
.
train
(
attacker_features_x
,
self
.
benign_data
)
network
.
calc_n0_false_positives
(
self
.
benign_data
[
0
])
return
network
def
double_oracle
(
self
,
actions_p1
,
utility
,
log
=
False
):
def
double_oracle
(
self
,
actions_p1
:
List
)
->
Result
:
# Get initial actions as the first ones
used_actions_p1
=
set
(
actions_p1
[:
1
])
used_actions_p2
=
{
self
.
_get_trained_nn
([[]])}
for
i
in
count
():
if
log
:
print
(
f
'
\n\n
Iteration:
{
i
}
\n
'
)
logger
.
debug
(
f
'Iteration:
{
i
}
\n
'
)
ordered_used_actions_p1
=
list
(
used_actions_p1
)
ordered_used_actions_p2
=
list
(
used_actions_p2
)
value
,
probs_p1
,
probs_p2
=
solve_zero_sum_game_pulp
(
ordered_used_actions_p1
,
ordered_used_actions_p2
,
utility
,
log
)
br_p1
=
self
.
best_response_p1
(
actions_p1
,
ordered_used_actions_p2
,
probs_p2
,
utility
)
# Solve current game with linear programming
value
,
probs_p1
,
probs_p2
=
self
.
solve_zero_sum_game_pulp
(
ordered_used_actions_p1
,
ordered_used_actions_p2
)
# Find best responses for each player given the mixture strategies
br_p1
=
self
.
best_response_p1
(
actions_p1
,
ordered_used_actions_p2
,
probs_p2
)
br_p2
=
self
.
best_response_p2
(
ordered_used_actions_p1
,
probs_p1
)
if
br_p1
in
used_actions_p1
and
self
.
is_nn_similiar
(
br_p2
,
ordered_used_actions_p2
,
EPSILON
,
used_actions_p1
,
utility
):
return
value
,
ordered_used_actions_p1
,
probs_p1
,
ordered_used_actions_p2
,
probs_p2
# If there is no new action in best responses, algorithm ends
if
br_p1
in
used_actions_p1
and
self
.
is_nn_similar
(
br_p2
,
ordered_used_actions_p2
,
used_actions_p1
):
return
Result
(
value
,
ordered_used_actions_p1
,
probs_p1
,
ordered_used_actions_p2
,
probs_p2
)
# Otherwise add new actions to lists and continue
used_actions_p1
.
add
(
br_p1
)
used_actions_p2
.
add
(
br_p2
)
def
is_nn_similiar
(
self
,
new_nn
:
NeuralNetwork
,
old_nns
:
List
,
epsilon
:
float
,
used_actions_p1
,
utility
):
# Compares utilities of a new neural network with utilities of the old neural networks and checks if the new one
# is similar to some old (ie. difference of its utilities is for every p1 action is lower than epsilon)
print
(
'Let
\'
s compare new neural networks with the others:'
)
utilities_of_new_nn
=
[
utility
(
a1
,
new_nn
)
for
a1
in
used_actions_p1
]
used_actions_p2
.
add
(
br_p2
)
# TODO get rid of duplicates
def
is_nn_similar
(
self
,
new_nn
:
NeuralNetwork
,
old_nns
:
List
,
actions_p1
):
"""
Compares utilities of a new neural network with utilities of the old
neural networks and checks if the new one is similar to some old
(difference of its utilities for every p1 action is lower than epsilon)
"""
logger
.
debug
(
'Let
\'
s compare new neural network with the others:'
)
utilities_of_new_nn
=
[
self
.
utility
(
a1
,
new_nn
)
for
a1
in
actions_p1
]
for
old_nn
in
old_nns
:
as_good
=
True
for
new_utility
,
a1
in
zip
(
utilities_of_new_nn
,
used_actions_p1
):
old_utility
=
utility
(
a1
,
old_nn
)
print
(
f
'old utility:
{
old_utility
}
, new utility:
{
new_utility
}
, difference:
{
abs
(
old_utility
-
new_utility
)
}
'
)
if
abs
(
old_utility
-
new_utility
)
>
epsilon
:
as_good
=
False
print
(
'########################################################'
)
for
new_utility
,
a1
in
zip
(
utilities_of_new_nn
,
actions_p1
):
old_utility
=
self
.
utility
(
a1
,
old_nn
)
logger
.
debug
(
f
'old utility:
{
old_utility
}
, '
f
'new utility:
{
new_utility
}
, '
f
'difference:
{
abs
(
old_utility
-
new_utility
)
}
'
)
if
abs
(
old_utility
-
new_utility
)
>
self
.
conf
.
base_conf
.
epsilon
:
as_good
=
False
break
if
as_good
:
print
(
"
J
ep, this
is
already
there
."
)
logger
.
debug
(
"
Y
ep, this
neural network
already
exists
."
)
return
True
logger
.
debug
(
'Nope, this is new neural network'
)
return
False
def
best_response_p1
(
self
,
actions_p1
,
used_actions_p2
,
probs_p2
,
utility
):
return
max
(
actions_p1
,
key
=
lambda
a1
:
sum
(
map
(
operator
.
mul
,
map
(
lambda
a2
:
utility
(
a1
,
a2
),
used_actions_p2
),
probs_p2
)))
def
best_response_p1
(
self
,
actions_p1
,
used_actions_p2
,
probs_p2
):
return
max
(
actions_p1
,
key
=
lambda
a1
:
sum
(
map
(
operator
.
mul
,
map
(
lambda
a2
:
self
.
utility
(
a1
,
a2
),
used_actions_p2
),
probs_p2
)))
def
best_response_p2
(
self
,
used_actions_p1
,
probs_p1
):
malicious_features
=
[]
for
ai
,
pi
in
zip
(
used_actions_p1
,
probs_p1
):
count
=
int
(
MALICIOUS_DATA_N0
*
pi
)
for
i
in
range
(
count
):
count
er
=
int
(
self
.
conf
.
nn_train_conf
.
malicious_data_count
*
pi
)
for
i
in
range
(
count
er
):
malicious_features
.
append
(
ai
)
print
(
'Let
\'
s train new NN with this malicious data:'
)
print
(
f
'
{
malicious_features
}
\n
'
)
logger
.
debug
(
'Let
\'
s train new NN with this malicious data:'
)
logger
.
debug
(
f
'
{
malicious_features
}
\n
'
)
return
self
.
_get_trained_nn
(
malicious_features
)
def
solve_zero_sum_game_pulp
(
self
,
actions_p1
:
List
[
List
[
float
]],
actions_p2
:
List
[
NeuralNetwork
]):
logger
.
debug
(
'Going to solve current state with LP'
)
logger
.
debug
(
f
'Attacker
\'
s actions by now:
{
actions_p1
}
'
)
logger
.
debug
(
f
'Deffender
\'
s actions by now:
{
actions_p2
}
'
)
# Create LP problem
m
=
pulp
.
LpProblem
(
"Zero sum game"
,
pulp
.
LpMinimize
)
# Minimizing value "v"
v
=
pulp
.
LpVariable
(
"v"
)
m
+=
v
# Player two probability vector
probs_p_two
=
[
pulp
.
LpVariable
(
"np"
+
str
(
i
),
0
,
1
)
for
i
in
range
(
len
(
actions_p2
))]
m
+=
pulp
.
lpSum
(
probs_p_two
)
==
1
# Probabilities sum to 1
def
solve_zero_sum_game_pulp
(
actions_p1
:
List
[
List
[
float
]],
actions_p2
:
List
[
NeuralNetwork
],
utility
:
Callable
,
log
:
bool
):
if
log
:
print
(
'Going to solve with LP'
)
print
(
f
'Attacker
\'
s actions by now:
{
actions_p1
}
'
)
print
(
f
'Deffender
\'
s actions by now:
{
actions_p2
}
'
)
# Create LP problem
m
=
pulp
.
LpProblem
(
"Zero sum game"
,
pulp
.
LpMinimize
)
# Minimizing value "v"
v
=
pulp
.
LpVariable
(
"v"
)
m
+=
v
# Player two probability vector
probs_p_two
=
[
pulp
.
LpVariable
(
"np"
+
str
(
i
),
0
,
1
)
for
i
in
range
(
len
(
actions_p2
))]
m
+=
pulp
.
lpSum
(
probs_p_two
)
==
1
# Probabilities sum to 1
suma
=
[]
i
=
0
for
a2
in
actions_p2
:
suma
.
append
(
probs_p_two
[
i
]
*
a2
.
get_false_positive_rate
())
i
+=
1
fp_constraint
=
pulp
.
lpSum
(
suma
)
<=
FP_CONSTANT
m
+=
fp_constraint
constraints
=
[]
for
a1
in
actions_p1
:
suma
=
[]
j
=
0
i
=
0
for
a2
in
actions_p2
:
suma
.
append
(
probs_p_two
[
j
]
*
utility
(
a1
,
a2
))
j
+=
1
constraints
.
append
(
pulp
.
lpSum
(
suma
)
<=
v
)
for
c
in
constraints
:
m
+=
c
m
.
solve
()
if
log
:
print
(
f
'LP solved'
)
print
(
f
'Value of the game:
{
v
.
varValue
}
'
)
print
(
f
'Number of false positives in this game:
{
fp_constraint
}
'
)
print
(
f
'Found solution:
{
pulp
.
LpStatus
[
m
.
status
]
}
'
)
print
(
f
'Attacker
\'
probabilities:'
)
print
(
f
'
{
list
(
str
(
abs
(
c
.
pi
))
+
" "
for
c
in
constraints
)
}
'
)
print
(
f
'Deffender
\'
s probabilities:'
)
print
(
f
'
{
list
(
str
(
prob
.
varValue
)
+
" "
for
prob
in
probs_p_two
)
}
'
)
return
v
.
varValue
,
[
abs
(
c
.
pi
)
for
c
in
constraints
],
[
prob
.
varValue
for
prob
in
probs_p_two
]
suma
.
append
(
probs_p_two
[
i
]
*
a2
.
get_false_positive_rate
())
i
+=
1
fp_constraint
=
pulp
.
lpSum
(
suma
)
<=
self
.
conf
.
base_conf
.
false_positives_allowed
m
+=
fp_constraint
constraints
=
[]
for
a1
in
actions_p1
:
suma
=
[]
j
=
0
for
a2
in
actions_p2
:
suma
.
append
(
probs_p_two
[
j
]
*
self
.
utility
(
a1
,
a2
))
j
+=
1
constraints
.
append
(
pulp
.
lpSum
(
suma
)
<=
v
)
for
c
in
constraints
:
m
+=
c
m
.
solve
()
logger
.
debug
(
f
'LP solved'
)
logger
.
debug
(
f
'Value of the game:
{
v
.
varValue
}
'
)
logger
.
debug
(
f
'Number of false positives in this game:
{
fp_constraint
}
'
)
logger
.
debug
(
f
'Found solution:
{
pulp
.
LpStatus
[
m
.
status
]
}
'
)
logger
.
debug
(
f
'Attacker
\'
probabilities:'
)
logger
.
debug
(
f
'
{
list
(
str
(
abs
(
c
.
pi
))
+
" "
for
c
in
constraints
)
}
'
)