import itertools import logging import os import time from pathlib import Path import attr import matplotlib.pyplot as plt import numpy as np import torch from torch import nn from torch import optim from src.config import NeuralNetworkConfig, RootConfig from src.data.loader import np_matrix_from_scored_csv logger = logging.getLogger(__name__) DEVICE = torch.device(os.environ.get('device', 'cuda')) @attr.s class FormattedData: unique_x: np.array = attr.ib() probs_x: np.array = attr.ib() y: np.array = attr.ib() @attr.s class BenignData: unique_x: np.array = attr.ib() counts: np.array = attr.ib() y: np.array = attr.ib() class OrderCounter: order = 0 @staticmethod def next(): OrderCounter.order += 1 return OrderCounter.order class SoftClip(nn.Module): """ SoftClipping activation function https://arxiv.org/pdf/1810.11509.pdf """ def __init__(self, p=50.0): super().__init__() self.p = p def forward(self, x): first_pow = torch.pow(np.e, torch.mul(x, self.p)) second_pow = torch.pow(np.e, torch.mul(torch.add(x, -1.0), self.p)) first_div = torch.add(first_pow, 1.0) second_div = torch.add(second_pow, 1.0) division = torch.div(first_div, second_div) second_part_log = torch.log(division) first_part = 1.0 / self.p if len(second_part_log[torch.isnan(second_part_log)]): print("prdel") # todo remove me return torch.mul(second_part_log, first_part) class NeuralNetwork: def __init__(self, input_features=2, nn_conf: NeuralNetworkConfig = NeuralNetworkConfig()): self.model = nn.Sequential( nn.Linear(input_features, 20), nn.ReLU(), nn.Linear(20, 15), nn.ReLU(), nn.Linear(15, 20), nn.ReLU(), nn.Linear(20, 1), nn.Tanh(), SoftClip(50) # nn.Sigmoid() ).to(DEVICE) self._set_weights() self.conf = nn_conf self.id = OrderCounter.next() # Variables used for loss function self.attacker_actions: FormattedData = None self.benign_data: BenignData = None # Variables from last training epoch measuring quality self.final_loss = None self.final_fp_cost = None # -------------- -------------------- # self.max_constant = constant # self.cur_value = .0 # self.step = .05 # self.edge_epoch = (self.conf.epochs*0.2) # self.incr_each = self.edge_epoch / (self.max_constant/.05) # def get_cur_coefficient(self, epoch) -> float: # return self.max_constant # if epoch > self.edge_epoch: # return self.max_constant # return .01 # if self.cur_value < self.max_constant and epoch % self.incr_each == 0: # self.cur_value += self.step # return self.cur_value # -------------- -------------------- def __str__(self): return f'Neural network id:{self.id}, final loss: {self.final_loss}' def set_data(self, benign_data: BenignData, attack: FormattedData): self.attacker_actions = attack self.benign_data = benign_data def _prepare_data(self): defender = self.benign_data attacker = self.attacker_actions x = np.concatenate((defender.unique_x, attacker.unique_x), axis=0) y = np.concatenate((defender.y, attacker.y), axis=0) probs = np.concatenate((defender.counts/np.sum(defender.counts), attacker.probs_x), axis=0) self.train_y = torch.cat((torch.zeros(self.conf.batch_size).float(), torch.tensor(attacker.y).float())).to(DEVICE) self.all_x = torch.tensor(x).float().to(DEVICE) self.all_y = torch.tensor(y).float().to(DEVICE) self.all_probs = torch.tensor(probs).float().to(DEVICE) def _set_weights(self): def init_weights(m): if type(m) == nn.Linear: torch.nn.init.xavier_uniform_(m.weight) m.bias.data.fill_(.0) self.model.apply(init_weights) def train(self): self._prepare_data() self._train() def get_train_batch(self): batch_idxs = np.random.choice(len(self.benign_data.unique_x), self.conf.batch_size) current_batch_samples = np.sum(self.benign_data.counts[batch_idxs]) batch_x_np = np.concatenate((self.benign_data.unique_x[batch_idxs], self.attacker_actions.unique_x), axis=0) batch_probs_np = np.concatenate((self.benign_data.counts[batch_idxs]/current_batch_samples, self.attacker_actions.probs_x), axis=0) batch_x = torch.tensor(batch_x_np).float().to(DEVICE) batch_probs = torch.tensor(batch_probs_np).float().to(DEVICE) return batch_x, batch_probs def _train(self): learning_rate = self.conf.learning_rate optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate) for e in range(self.conf.epochs): batch_x, batch_probs = self.get_train_batch() # Forward pass: compute predicted y by passing x to the model train_ltncies = self.latency_predict(batch_x, with_grad=True) # Compute loss loss, _ = self.conf.loss_function(batch_x, train_ltncies, self.train_y, batch_probs) # Log loss function value each 5 epochs if e % 50 == 0: logging.debug(f'Epoch: {e}/{self.conf.epochs},\t' f'TrainLoss: {loss},\t') # Before the backward pass, use the optimizer object to zero all of # the gradients for the variables it will update optimizer.zero_grad() # Backward pass: compute gradient of the loss with respect to model # parameters loss.backward() # Calling the step function on an Optimizer makes an update to its # parameters optimizer.step() with torch.no_grad(): train_ltncies = self.latency_predict(self.all_x, with_grad=True) loss, fp_part = self.conf.loss_function(self.all_x, train_ltncies, self.all_y, self.all_probs) logger.debug(f'Final loss of this nn: {loss}\tfp_part is: {fp_part}') # measuring quality of final network self.final_loss = loss.item() self.final_fp_cost = fp_part.item() def plot_tmp(self): # todo remove me if self.id == 1: return try: actions = self.actions except AttributeError: self.plotted = [] self.plotting = plt.subplots() one_axis = np.linspace(0, 1, 101) # [0.00, 0.01, 0.02, ..., 0.99, 1.00] generator = itertools.product(*itertools.repeat(one_axis, 2)) actions = torch.tensor(np.array(list(generator))).float().to(DEVICE) self.actions = actions finally: # Remove all lines from previous iteration plotting for item in self.plotted: item.remove() self.plotted = [] res = self.latency_predict(actions).numpy().reshape((101, 101), order='F') self.plotted.append(self.plotting[1].imshow(res, cmap='Reds', vmin=0, vmax=1, origin='lower')) self.plotting[0].canvas.draw() plt.pause(0.000001) time.sleep(1) def _raw_predict(self, tensor: torch.Tensor): pred = self.model(tensor) return pred.flatten().float() def latency_predict(self, x: torch.Tensor, with_grad=False): if with_grad: raw_prediction = self._raw_predict(x) else: with torch.no_grad(): raw_prediction = self._raw_predict(x) return raw_prediction def predict_single_latency(self, input, return_tensor=False): in_type = type(input) if in_type == list or in_type == tuple or \ in_type == np.array or in_type == np.ndarray: input = torch.tensor(input).float().to(DEVICE) if return_tensor: return self.latency_predict(input)[0] else: return self.latency_predict(input)[0].item() def setup_loger(debug: bool): log_format = ('%(asctime)-15s\t%(name)s:%(levelname)s\t' '%(module)s:%(funcName)s:%(lineno)s\t%(message)s') level = logging.DEBUG if debug else logging.INFO logging.basicConfig(level=level, format=log_format) if __name__ == '__main__': setup_loger(True) benign_x, _ = np_matrix_from_scored_csv( Path('all_benign_scored.csv'), 0, 500) malicious_x, _ = np_matrix_from_scored_csv( Path('scored_malicious.csv'), 1, 400) benign_unique_x, counts = np.unique(benign_x, axis=0, return_counts=True) probs_benign = np.array([count / len(benign_x) for count in counts]) benign_y = np.zeros(len(benign_unique_x)) benign_data = FormattedData(benign_unique_x, probs_benign, benign_y) malicious_unique_x, counts = np.unique(malicious_x, axis=0, return_counts=True) probs_malicious = np.array([count / len(malicious_unique_x) for count in counts]) malicious_y = np.ones(len(malicious_unique_x)) malicious_data = FormattedData(malicious_unique_x, probs_malicious, malicious_y) conf = RootConfig() nn = NeuralNetwork(2, conf.model_conf.defender_conf.nn_conf) nn.set_data(benign_data, malicious_data) nn.train()