commit 7425a5187183224605a64a60e75b1d89380a1387 Author: Lorenzo Torres Date: Thu Oct 16 09:03:15 2025 +0200 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2abd421 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +venv/ +weights/ +biases/ +dataset/ +.DS_Store diff --git a/common.py b/common.py new file mode 100644 index 0000000..c362db7 --- /dev/null +++ b/common.py @@ -0,0 +1,46 @@ +import mlx.core as mx +import numpy as np +import struct + +# Data loading functions. To download the data run the data.sh script. +def load_images(filename): + with open(filename, 'rb') as f: + magic, num_images, rows, cols = struct.unpack('>IIII', f.read(16)) + data = np.frombuffer(f.read(), dtype=np.uint8) + images = data.reshape(num_images, rows, cols) + array = np.array(images / 255.0) + return np.reshape(array, (array.shape[0], array.shape[1] * array.shape[2])) + +def load_labels(filename): + with open(filename, 'rb') as f: + magic, num_labels = struct.unpack('>II', f.read(8)) + labels = np.frombuffer(f.read(), dtype=np.uint8) + return labels + +# Softmax distribution function. Given a vector, +# the function gives another vector where the sum +# of each element equals 1.0. In this case the +# softmax is stabilized, to prevent getting an +# overflow, by subtracting the maximum value of +# the vector to each one of its elements. This +# will get all negative values without changing +# the end result since each element is getting +# shifted by the same amount. +def softmax(x): + stable_x = x - mx.max(x, axis=1, keepdims=True) + e = mx.exp(stable_x) + return e / mx.sum(e, axis=1, keepdims=True) + +# ReLU activation function. If x > 0 returns +# x, otherwise returns 0.0. This is needed +# to make the neural network non-linear. +def relu(x): + return mx.maximum(0.0, x) + +# Linear prediction function. Given input, weights +# and bias it computes the prediction. Should be +# activated using ReLU before passing it to the +# next layer or distributed with softmax when +# using it in the output layer. +def linear(x, W, b): + return x @ W + b diff --git a/data.sh b/data.sh new file mode 100755 index 0000000..e1016ae --- /dev/null +++ b/data.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +curl -L -o mnist-dataset.zip https://www.kaggle.com/api/v1/datasets/download/hojjatk/mnist-dataset +mkdir dataset +pushd dataset +unzip ../mnist-dataset.zip +rm -rf ../mnist-dataset.zip +popd diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..97373e8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +mlx +numpy diff --git a/test.py b/test.py new file mode 100644 index 0000000..1af3fb6 --- /dev/null +++ b/test.py @@ -0,0 +1,52 @@ +import mlx.core as mx +import numpy as np +# Functions shared by both the training and the testing scripts. +from common import * + +# Hyperparameters. + +HIDDEN_SIZE = 256 +HIDDEN_LAYERS = 4 + +# Testing images. It's a numpy array of size 10000x784. +testing = load_images("dataset/t10k-images-idx3-ubyte/t10k-images-idx3-ubyte") +labels = load_labels("dataset/t10k-labels-idx1-ubyte/t10k-labels-idx1-ubyte") + +# Load parameters from training results. +W1 = mx.array(np.loadtxt("weights/weights1.txt", dtype=np.float32)) +b1 = mx.array(np.loadtxt("biases/biases1.txt", dtype=np.float32)) + +W_hidden = [] +b_hidden = [] +for i in range(HIDDEN_LAYERS): + W_hidden.append(mx.array(np.loadtxt(f"weights/weights_hidden{i}.txt", dtype=np.float32))) + b_hidden.append(mx.array(np.loadtxt(f"biases/biases_hidden{i}.txt", dtype=np.float32))) + +W_out = mx.array(np.loadtxt("weights/weights3.txt", dtype=np.float32)) +b_out = mx.array(np.loadtxt("biases/biases3.txt", dtype=np.float32)) + +images = mx.array(testing) +labels = mx.array(labels) + +z1 = linear(images, W1, b1) +# Apply activation. +a1 = relu(z1) + +# Hidden layers regression and activation. +z_hidden = [linear(a1, W_hidden[0], b_hidden[0])] +a_hidden = [relu(z_hidden[0])] +for l in range(1, HIDDEN_LAYERS): + z_hidden.append(linear(a_hidden[-1], W_hidden[l], b_hidden[l])) + a_hidden.append(relu(z_hidden[l])) + +# Output layer regression. +z_out = linear(a_hidden[-1], W_out, b_out) +# Apply activation +y_hat = softmax(z_out) +# Transform the predictions to a vector of class indices containing the numerical label (0-9). +predictions = mx.argmax(y_hat, axis=1) +# Check how many predictions were correct. +correct = mx.sum(predictions == labels) + +accuracy = (correct / labels.shape[0]) * 100 +print(f"Accuracy: {accuracy.item():.2f}%") diff --git a/train.py b/train.py new file mode 100644 index 0000000..1bc0f6d --- /dev/null +++ b/train.py @@ -0,0 +1,195 @@ +import mlx.core as mx +import numpy as np +import math +# Functions shared by both the training and the testing scripts. +from common import * + +# Hyperparameters + +HIDDEN_SIZE = 256 +LEARNING_RATE = 1e-3 +BETA1 = 0.9 +BETA2 = 0.999 +EPSILON = 1e-8 +EPOCHS = 10 +BATCH_SIZE = 60 +HIDDEN_LAYERS = 4 + +# Training images. It's a numpy array of size 60000x784. +training = load_images("dataset/train-images-idx3-ubyte/train-images-idx3-ubyte") +labels = load_labels("dataset/train-labels-idx1-ubyte/train-labels-idx1-ubyte") + +# The derivative of ReLU. If x > 0, it computes +# the derivative of x, which is 1.0, otherwise +# ReLU returns a constant (0.0) and thus the +# derivative is 0. +def drelu(x): + return (x > 0.0).astype(mx.float32) + +# Cross-entropy loss function. Given y (the +# actual value) and y_hat (the predicted value) +# the function returns a scalar telling how +# wrong the prediction is. This function is +# very convenient since the derivative of it +# when using softmax to distribute the predictions +# is just y_hat - y. +def cross_entropy(y, y_hat): + y_hat = mx.clip(y_hat, 1e-9, 1 - 1e-9) + return -mx.mean(mx.sum(y * mx.log(y_hat), axis=1)) + +# Input layer weights and biases +W1 = mx.random.normal((28*28, HIDDEN_SIZE)) * math.sqrt(2 / (28*28)) +b1 = mx.zeros((HIDDEN_SIZE,)) + +# Output layer weights and biases +W_out = mx.random.normal((HIDDEN_SIZE, 10)) * math.sqrt(2 / HIDDEN_SIZE) +b_out = mx.zeros((10,)) + +# Hidden layers weights and biases +W_hidden = [mx.random.normal((HIDDEN_SIZE, HIDDEN_SIZE)) * math.sqrt(2 / HIDDEN_SIZE) for _ in range(HIDDEN_LAYERS)] +b_hidden = [mx.zeros((HIDDEN_SIZE,)) for _ in range(HIDDEN_LAYERS)] + +# These are the matrices needed for Adam optimization. +# for specifics on the implementation of the algorithm +# refer to https://arxiv.org/abs/1412.6980 + +# Input weights moment +mW1 = mx.zeros_like(W1) +vW1 = mx.zeros_like(W1) +# Input biases moment +mb1 = mx.zeros_like(b1) +vb1 = mx.zeros_like(b1) +# Hidden layers weights moment +mW_hidden = [mx.zeros_like(W_hidden[0]) for _ in range(HIDDEN_LAYERS)] +vW_hidden = [mx.zeros_like(W_hidden[0]) for _ in range(HIDDEN_LAYERS)] +# Hidden layers biases moment +mb_hidden = [mx.zeros_like(b_hidden[0]) for _ in range(HIDDEN_LAYERS)] +vb_hidden = [mx.zeros_like(b_hidden[0]) for _ in range(HIDDEN_LAYERS)] +# Output weights moment +mW_out = mx.zeros_like(W_out) +vW_out = mx.zeros_like(W_out) +# Output biases moment +mb_out = mx.zeros_like(b_out) +vb_out = mx.zeros_like(b_out) + +t = 0 + +total_steps = EPOCHS * (training.shape[0] / BATCH_SIZE) +steps = 0 + +# Training pass, repeat the training process +# for an EPOCHS amount of iterations. +for epoch in range(EPOCHS): + for i in range(0, training.shape[0], BATCH_SIZE): + label_batch = mx.zeros((BATCH_SIZE, 10)) + batch = mx.array(training[i:i+BATCH_SIZE]) + for j, lbl in enumerate(labels[i:i+BATCH_SIZE]): + label_batch[j, int(lbl)] = 1.0 + # Forward pass. The forward pass is what does the actual prediction, using + # the current parameters, for a given input batch X + + # Input layer regression + z1 = linear(batch, W1, b1) + # Apply activation + a1 = relu(z1) + + # Hidden layers regression and activation + z_hidden = [linear(a1, W_hidden[0], b_hidden[0])] + a_hidden = [relu(z_hidden[0])] + for l in range(1, HIDDEN_LAYERS): + z_hidden.append(linear(a_hidden[-1], W_hidden[l], b_hidden[l])) + a_hidden.append(relu(z_hidden[l])) + + + # Output layer regression + z_out = linear(a_hidden[-1], W_out, b_out) + # Apply softmax + y_hat = softmax(z_out) + + # Back propagation. This is where the model "learns" + # by computing how wrong it is to then proceed to + # adjust parameters. + + # Calculate the loss using cross-entropy + loss = cross_entropy(label_batch, y_hat) + + # Calculate the gradient of the loss function w.r.t. + # the output. + d_loss = (y_hat - label_batch) / batch.shape[0] + # Calculate the gradient of the loss function w.r.t. + # the weights of the output layer. + dW_out = a_hidden[-1].T @ d_loss + # Calculate the gradient of the loss function w.r.t. + # the biases of the output layer. + db_out = mx.sum(d_loss, axis=0) + + # Calculate the gradient of the loss function w.r.t. + # the weights and biases of all the hidden layers. + + + # Gradients of hidden layers + dW_hidden = [None] * HIDDEN_LAYERS + db_hidden = [None] * HIDDEN_LAYERS + # Gradient of previous layer + d_hidden = d_loss + for l in reversed(range(HIDDEN_LAYERS)): + WT = W_out.T if l == HIDDEN_LAYERS - 1 else W_hidden[l + 1].T + d_hidden = (d_hidden @ WT) * drelu(z_hidden[l]) + a_prev = a1 if l == 0 else a_hidden[l - 1] + dW_hidden[l] = a_prev.T @ d_hidden + db_hidden[l] = mx.sum(d_hidden, axis=0) + + # Backpropagate through the first hidden layer + d1 = (d_hidden @ W_hidden[0].T) * drelu(z_hidden[0]) + # Compute gradients for the input layer weights and biases + dW1 = batch.T @ d1 + # Calculate the gradient of the loss function w.r.t. + # the biases of the input layer. + db1 = mx.sum(d1, axis=0) + + # Adam optimization. + + t += 1 + + # Input layer weights optimization with bias correction. + for theta, gradient, m, v in zip( + [W1, *W_hidden, W_out], + [dW1, *dW_hidden, dW_out], + [mW1, *mW_hidden, mW_out], + [vW1, *vW_hidden, vW_out], + ): + m[:] = BETA1 * m + (1 - BETA1) * gradient + v[:] = BETA2 * v + (1 - BETA2) * (gradient ** 2) + m_hat = m / (1 - (BETA1 ** t)) + v_hat = v / (1 - (BETA2 ** t)) + theta[:] -= LEARNING_RATE * m_hat / (mx.sqrt(v_hat) + EPSILON) + + # Input layer biases optimization. + for theta, gradient, m, v in zip( + [b1, *b_hidden, b_out], + [db1, *db_hidden, db_out], + [mb1, *mb_hidden, mb_out], + [vb1, *vb_hidden, vb_out], + ): + m[:] = BETA1 * m + (1 - BETA1) * gradient + v[:] = BETA2 * v + (1 - BETA2) * (gradient ** 2) + m_hat = m / (1 - (BETA1 ** t)) + v_hat = v / (1 - (BETA2 ** t)) + theta[:] -= LEARNING_RATE * m_hat / (mx.sqrt(v_hat) + EPSILON) + steps += 1 + if steps % 100 == 0: + mx.eval(m) + mx.eval(v) + if steps % 500 == 0: + print(f"\rtraining: {(100 * (steps / total_steps)):.2f}%, loss: {loss}", end="") + + LEARNING_RATE *= 0.99 + +print("\nTraining complete.") +np.savetxt("weights/weights1.txt", np.array(W1)) +np.savetxt("biases/biases1.txt", np.array(b1)) +for i in range(HIDDEN_LAYERS): + np.savetxt(f"weights/weights_hidden{i}.txt", np.array(W_hidden[i])) + np.savetxt(f"biases/biases_hidden{i}.txt", np.array(b_hidden[i])) +np.savetxt("weights/weights3.txt", np.array(W_out)) +np.savetxt("biases/biases3.txt", np.array(b_out))