This is to implement backpropagation algorithm in numpy which would help me to further understand how this works.

import pandas as pd 
import numpy as np
from pdb import set_trace
from sklearn import datasets

Design the network structure

  • Each layer contains the weights/bias and activation union
structures = [  
    {"input_dim": 2, "output_dim": 25, "activation": "relu"},  
    {"input_dim": 25, "output_dim": 50, "activation": "relu"},  
    {"input_dim": 50, "output_dim": 50, "activation": "relu"},  
    {"input_dim": 50, "output_dim": 25, "activation": "relu"},  
    {"input_dim": 25, "output_dim": 1, "activation": "sigmoid"},  
]

Initiate the parameters

  • The weights can be random number and bias are preferred to be small postive values in order to pass the relu in the beginning.
def init_layers(structures, seed = 1105):  
    params = {}  
    for i, structure in enumerate(structures):  
        params["W_{}".format(i)] = np.random.randn(structure["input_dim"], structure["output_dim"])/10  
        params["b_{}".format(i)] = np.random.randint(1,10, (1, structure["output_dim"]))/100  
    return params  

The forward and backword activation union

  • During back propagation, it is appraent we would need use the output value before activation in feed forward process. We would need to save the ouput before and after activation in each layer for back propagation later.
def relu(U):  
    U[U < 0] = 0  
    return U  
  
def sigmoid(U):  
    return np.divide(1, (1+np.exp(-1*U)))  
  
def relu_backward(du, U):  
    du[U < 0] = 0  
    return du  

def sigmoid_backward(du, U):  
    sig = sigmoid(U) * (1 - sigmoid(U))  
    return du*sig  

So, we return two values in single_layer_feedforward function corresponding to the activated output and output which doesn’t. The activated output will be feed as input into the next layer and the unactivated output will be used in backpropagation - the reason is we need the partial derivatives of activation union to its input.

def single_layer_feedforward(A, W, b, activation_func):  
    return activation_func(A@W + b), A@W + b  
  • Duing feed forward process, we start with features (X), and go through each layer till the final output.
def feedforward(X, structures, params):  
    U_curr = X  
    for i, structure in enumerate(structures):  
        # set_trace()  
        W_curr = params["W_" + str(i)]  
        b_curr = params["b_" + str(i)]  
        params["U_input_" + str(i)] = U_curr  
        if structure["activation"] == "relu":  
            activation_func = relu  
        elif structure["activation"] == "sigmoid":  
            activation_func = sigmoid  
        else:  
            print("no supported activation")  
            exit  
        U_next, U_curr = single_layer_feedforward(U_curr, W_curr, b_curr, activation_func)  
        params["U_post_activation_" + str(i)] = U_next  
        params["U_prior_activation_" + str(i)] = U_curr  
        U_curr = U_next  
    return U_curr  

Loss function

Here we used the negative log-loss as the ​ (total loss) to minimize.

def negativelogloss(output, y):  
    # set_trace()  
    return np.squeeze(-1 * sum(y * np.log10(output) + (1 - y)*np.log10(1-output)) / len(y))  

def get_accuracy(y_true, y_predicted):  
    predicted_class = y_predicted.copy()  
    predicted_class.reshape(-1)  
    predicted_class[predicted_class > 0.5] = 1  
    predicted_class[predicted_class <= 0.5] = 0  
    return accuracy_score(y_true, predicted_class)  

Backpropagtion process

  • During backpropagtion, we passed the partial derivatives based on the chain rules.

  • In the single layer backward, it is obvious we will feed the previous derivatives into the layer in bottom-up order, then the derivatives will multiply the partial derivatives in the layer to generate the accumulative derivatives for next layer. During this process, we will also save the gradient of weights and bias (average gradients) in table for update later.

def single_layer_backward(dz, U_input, U_prior_activation, W, activation_func):
    m = len(dz)
    dz = activation_func(dz, U_prior_activation)
    gradient_W = (U_input.T @ dz) / m
    gradient_b = np.mean(dz, axis = 0)
    dz = dz @ W.T
#     set_trace()
    return dz, gradient_W, gradient_b
  • During the whole backpropagtion process, we started from the partial derivatives of loss function to the y_hat (output of feed forward).

  • In each layers, we will accumulate the derivatives and calculate the gradient of W and b.

  • The accumulated derivatives will be passed to next layer.

  • The gradient of W and b in each layer will stored in grads table and we can update them later.

grads_table = {}
def backward(output, y, structures, params):
    dz_prev = -(np.divide(y, output) - np.divide(1-y, 1 - output))
    # set_trace()
    i = len(structures) - 1
    while i >= 0:
        W_curr = params["W_" + str(i)]
        b_curr = params["b_" + str(i)]
        U_input_curr = params["U_input_" + str(i)]
        U_prior_activation = params["U_prior_activation_" + str(i)]
        if structures[i]["activation"] == "relu":
            activation_func = relu_backward
        elif structures[i]["activation"] == "sigmoid":
            activation_func = sigmoid_backward
        else:
            print("Not suppported activation func")
            exit
#         set_trace()
        dz_prev, gradient_W, gradient_b = single_layer_backward(dz_prev, U_input_curr, U_prior_activation, W_curr, activation_func)
#         set_trace()
#         params["W_" + str(i)] = W_curr
        grads_table["gradient_W_" + str(i)] = gradient_W
        grads_table["gradient_b_" + str(i)] = gradient_b
        i -= 1

Update the parameters

  • Go through each layer to update the parameters using grads_table
def update_weights(depth, params, grads_table, lr):
    for i in range(depth):
        params["W_" + str(i)] = params["W_" + str(i)] - grads_table["gradient_W_" + str(i)] * lr
        params["b_" + str(i)] = params["b_" + str(i)] - grads_table["gradient_b_" + str(i)] * lr
    return params

In practice

import os
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

X, y = make_moons(n_samples = 1000, noise=0.2, random_state=100)
y = y.reshape(-1, 1)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

params = init_layers(structures)
test = params.copy()
grads_table = {}
deepth = len(structures)
for epoch in range(10000):
    output = feedforward(X_train, structures, params)
#     print("logloss: {}; accuracy: {}".format(negativelogloss(output, y_train), get_accuracy(y_train, output)))
    backward(output, y_train, structures, params)
    params = update_weights(deepth, params, grads_table, 0.01)
    
y_hat = feedforward(X_test, structures, params)
print("logloss: {}; accuracy: {}".format(negativelogloss(y_hat, y_test), get_accuracy(y_test, y_hat)))