Neuron Networks
Ok, now we are ready to start working wint NNs. How NN works and how to build deep learning frameworks via Pytorch explained here by Andrej Karpathy we will be using Pytorch and later Jax to train our Neuron Network RL agents.
What is Pytorch? PyTorch is an optimized tensor library for deep learning using GPUs and CPUs.
For more details check:
Training pipeline
Basic torch Neuron Network pipeline
1) training/test data <- Prepare some training and test data
2) model <- Create Pytorch Neuron Network Model
3) loss_fn <- Create Loss metric for measuring accuracy and calculation gradient update
4) optimizer <- Create Optimiser which will calculate gradients for Neuron Network weight update
5) training loop: repeat few time until some stop criteria
for data, target in train_loader:
optimizer.zero_grad() # set current gradients as zero
output = model(data) # make predictions using input training data and NN pytorch model
loss = loss_fn(output, target) # measure loss using predictions and training output data
loss.backward() # calculate gradients for NN weight updates
optimizer.step() # do NN weights update
First example
We will solve spiral binary classification task using Pytorch
Imports
import numpy as np
import pandas as pd
from numpy import pi
import random
import matplotlib.pyplot as plt
from multiprocessing import freeze_support
import torch
from torch import nn
from random import shuffle
import wandb
Generate data for spiral binary task classification
def generate_spiral_data():
# training and test dataset generation (800 points of class 0 and 2 by 400 elements)
N = 400
theta = np.sqrt(np.random.rand(N)) * 2 * pi
r_a = 2 * theta + pi
data_a = np.array([np.cos(theta) * r_a, np.sin(theta) * r_a]).T
x_a = data_a + np.random.randn(N, 2)
r_b = -2 * theta - pi
data_b = np.array([np.cos(theta) * r_b, np.sin(theta) * r_b]).T
x_b = data_b + np.random.randn(N, 2)
res_a = np.append(x_a, np.zeros((N, 1)), axis=1)
res_b = np.append(x_b, np.ones((N, 1)), axis=1)
res = np.append(res_a, res_b, axis=0)
np.random.shuffle(res)
np.savetxt("result.csv", res, delimiter=",", header="x,y,label", comments="",
fmt='%.5f')
plt.scatter(x_a[:, 0], x_a[:, 1])
plt.scatter(x_b[:, 0], x_b[:, 1])
plt.show()

def split_data_to_train_and_test(data):
# split data to training and test and put it to tensor
idx = list(data.index)
shuffle(idx)
features = data[["x", "y"]].to_numpy()[idx]
labels = data["label"].to_numpy()[idx]
train_features, train_labels = features[:600], labels[:600]
test_features, test_labels = features[600:], labels[600:]
print(train_features.shape, train_labels.shape)
plt.scatter(train_features[:, 0], train_features[:, 1])
plt.show()
plt.scatter(test_features[:, 0], test_features[:, 1])
plt.show()
train_x = torch.from_numpy(train_features).type(torch.float32)
train_y = torch.from_numpy(train_labels).type(torch.float32)
test_x = torch.from_numpy(test_features).type(torch.float32)
test_y = torch.from_numpy(test_labels).type(torch.float32)
return train_x, train_y, test_x, test_y
Pytorch NN model
class NN(nn.Module):
def __init__(self, n_inputs, n_outputs):
super(NN, self).__init__()
self.linear1 = nn.Linear(n_inputs, config["NN_hidden_layers_size"][0])
self.linear2 = nn.Linear(config["NN_hidden_layers_size"][0],
config["NN_hidden_layers_size"][1])
self.linear3 = nn.Linear(config["NN_hidden_layers_size"][1], n_outputs)
self.relu = nn.ReLU()
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.relu(self.linear1(x))
x = self.relu(self.linear2(x))
x = self.sigmoid(self.linear3(x))
return x
def train(model, criterion, config):
wandb.watch(model, criterion, log="all", log_freq=10)
track_wb_step = 0
for epoch in range(config["epochs"]):
pred = model(train_x).reshape((600,))
loss = loss_fn(pred, train_y)
optim.zero_grad()
loss.backward()
optim.step()
acc = (pred.round() == train_y).float().mean()
print(f"acc: {acc:.3f} loss {loss:.3f}")
track_wb_step += 600
if ((epoch + 1) % 5) == 0:
train_log(loss, track_wb_step, epoch)
def test(model):
model.eval()
with torch.no_grad():
pred = model(test_x).reshape((200,))
accuracy = (pred.round() == test_y).float().mean()
print(accuracy)
print(f"Accuracy of the modal:{accuracy}%")
wandb.log({"test_accuracy": accuracy})
plot_predictions(model)
def train_log(loss, example_ct, epoch):
loss = float(loss)
wandb.log({"epoch": epoch, "loss": loss}, step=example_ct)
print(f"loss after" + str(example_ct).zfill(5) + f" examples {float(loss):.3f}")
def plot_predictions(model):
# visualise how NN see and predicts Spiral task
data = list()
for i in range(-15, 16):
for j in range(-15, 16):
data.append([i, j])
np_data = np.array(data)
t_data = torch.from_numpy(np_data).type(torch.float32)
pred = model(t_data)
pred = pred.round().detach().numpy()
zeros, ones = [], []
for i, val in enumerate(pred):
if val == 0:
zeros.append(i)
elif val == 1:
ones.append(i)
plt.scatter(t_data[zeros, 0], t_data[zeros, 1])
plt.scatter(t_data[ones, 0], t_data[ones, 1])
# plt.show()
plt.savefig("plane.jpg")
wandb.log({"example": wandb.Image("plane.jpg")})
if __name__ == "__main__":
# lock seed to repeat results
freeze_support()
seed = 16
np.random.seed(seed)
random.seed(seed)
torch.seed()
torch.cuda.seed()
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
# generate_and split data
generate_spiral_data()
data = pd.read_csv("result.csv", delimiter=",")
train_x, train_y, test_x, test_y = split_data_to_train_and_test(data)
config = {
"epochs": 700,
"NN_hidden_layers_size": [6, 6],
"learning_rate": 0.01,
}
model = NN(2, 1)
optim = torch.optim.Adam(model.parameters(), lr=config["learning_rate"], )
loss_fn = nn.BCELoss()
wandb.login()
with wandb.init(project="spiral", config=config):
train(model=model, criterion=loss_fn, config=config)
test(model=model)
torch.onnx.export(model, test_x, "spiral.onnx")
wandb.save("spiral.onnx")