# tiny-cnn **Repository Path**: fresh-ai-tech/tiny-cnn ## Basic Information - **Project Name**: tiny-cnn - **Description**: 基于Numpy从零实现CNN网络框架,复刻LeNet-5, 在MNIST数据集上完成训练和推理测试。 - **Primary Language**: Python - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-05-08 - **Last Updated**: 2026-05-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # tiny-cnn #### 介绍 基于Numpy从零实现CNN网络框架,复刻LeNet-5, 在MNIST数据集上完成训练和推理测试。 #### 软件架构 软件架构说明 
看到这个标题,朋友们的第一反应可能是“疯了吧?”,“我没看错吧?”,“Numpy也不是深度学习框架呀!”……
您没看错,这篇文章就是要从“零”实现一个可以工作的LeNet-5网络结构,初衷也很简单,就是要把卷积网络背后的一些工作原理、设计细节拆开来展示给大家,让深度学习背后的逻辑不再神秘。
其实,说实话,这篇文章并没有太多新颖的内容,核心原理我们在之前的同一列文章《从数学原理到算法实现系列(3):“卷积神经网络”——从单点数据到一目十行》已经介绍过了,本文给出来的更多的是代码实现参考。
本文中实现的简单CNN代码框架,沿袭了我们之前实现的MLP风格。我们将网络中的基础构件儿做了独立的封装,内部实现了forward,backward计算流程以及局部梯度的缓存设计。这也是在自动微分机制出现之前,大部分神经网络设计所采用的设计方案的一种真实写照。让我们在“手动推导原理”到“代码实现”的过程中感受深度学习先驱者们曾经走过的艰辛历程。

其中,Block接口的实现如下:
from abc import ABC, abstractmethod
class Block(ABC):
def __call__(self, x):
return self.forward(x)
@abstractmethod
def forward(self, x):
pass
@abstractmethod
def backward(self, diff_out):
pass
在正是介绍网络中各个层的实现之前,我还是单独把矩阵运算的梯度这部分内容单独拿出来供大家回顾一下。因为矩阵梯度这个内容属于《矩阵论》课程里的知识,而对于大部分工科生在本科甚至研究生课程体系里面是没有这部分内容的。我们所熟知的是标量的微积分运算以及《线性代数》提供给我们的矩阵运算的相关背景,但对于微积分和矩阵的交叉地带,是我们无数人知识的盲区。而它又是在处理深入学习优化任务时无法回避的关键技术点。虽然我们可以看着数学公式,把它按部就班地实现出来,但我依然想在已有知识体系的基础上搞懂为什么是这样。
在这个小小的问题上,确实曾花了不少力气去“追根溯源”!
我们从下面这个极简问题入手,去看下矩阵梯度计算的来龙去脉。
假定是我们模型的输入,, 是我们模型的权重和偏置参数, 是模型基于当前参数和输入对于输出的估计,其中是可微的激活函数。对应的真实输出是, 是我们选定的单值损失函数,如MSE等。
我们首先把的计算结果展开:
为了方便讨论,我们接下来把线性部分的运算结果记为, 即。
此时,我们就可以根据展开的计算结果和链式求导法则,分别去看下损失对模型参数和问题输入的梯度了。
同理, 我们可以写出损失对于其他权重分量的偏导数:
让我们把以上结果整理成矩阵的形式:
同理,我们可以得到损失对于模型输入的梯度
以及损失对于偏置参数的梯度,
(其中为单位矩阵),
这里为了方便起见,我们可以把视为 。
至此,心中的困惑了然于胸。
都说“万丈高楼平地起”,但“平地起”也得一层一层往上盖。接下来请大家跟随我一起看看这个卷积层是如何一点一点成为它最终的形态的。
对于卷积操作,大家并不陌生(如果不熟悉的话,可以回看下同系列的第3篇文章)。我认为在实现卷积操作的过程中,最关键的问题是如何把卷积操作转换为矩阵运算。在这里,我们通过对卷积核和卷积核覆盖的图像块儿(patch)分别转换成行向量和列向量,然后就可以向量内积(矩阵乘法)实现图像的卷积操作了。
对于单张图像,我们模型的输入规模为:, 经过卷积操作后,把输入转换成的特征图。 输出特征图的大小由输入尺寸、卷积核大小(采用方形卷积核), 步长以及填充宽度共同决定,满足如下关系:
这个关系其实非常容易理解, 请看下面的示意图:

那我们卷积核以及偏置参数的维度应该是什么样子呢? 偏置比较容易确定,一个输出通道对应一个参数;同样,对于卷积核来说,每个输出通道也都需要一组独立的参数,而每组参数又得和输入通道数对应。 因此,卷积核的维度是。
由于卷积操作是按固定步长进行的,因此,对于输出特征图中坐标位置为的像素,其在输出图像中卷积操作的起始位置为:
因此,我们很容易根据输出像素的位置提取出参与当前卷积操作的输入图像块儿:。
为了能够达成通过矩阵运算实现卷积操作的目的, 我们把卷积核与图像块儿做一个维度调整:
此时,我们通过矩阵(张量)乘法运算,得到的输出维度为:。对于这个结果,我们在输入通道上进行求和,再加上偏置,就可以得到目标输出位置的值了。
同步地,我们按照第二部分的矩阵梯度公式,记录参数的局部梯度。
关键代码实现如下:
class Conv2d(Block):
def __init__(self, inChannels=1, outChannels=1, kernelSize=3, stride=1, padding=0):
self.inChannels = inChannels
self.outChannels = outChannels
self.kernelSize = kernelSize
self.stride = stride
self.padding = padding
fan_in = inChannels * kernelSize * kernelSize
self.weights = np.random.normal(0, np.sqrt(2/fan_in),
size=(outChannels, inChannels, kernelSize, kernelSize)
).astype(np.float32)
self.bias = np.zeros((outChannels, 1, 1)).astype(np.float32)
self.__diff_kernel = None
self.__diff_bias = None
self.__diff_x = None
self.__pad_img_shape = None
def forward(self, x):
pad = self.__padding(x)
self.__pad_img_shape = pad.shape
out_shape = self.__feat_shape(pad)
outputImage = np.zeros((self.outChannels, out_shape[-2], out_shape[-1]), dtype=x.dtype)
kernel = self.weights.reshape(self.outChannels, self.inChannels, -1, self.kernelSize * self.kernelSize)
# 记录当前步骤内误差对应变量的梯度,用于反向传播
if self.__diff_kernel is None:
self.__diff_kernel = np.zeros((self.inChannels, self.kernelSize * self.kernelSize,
self.outChannels, out_shape[-2], out_shape[-1]), dtype=np.float32)
self.__diff_x = np.zeros((self.inChannels, pad.shape[-2] * pad.shape[-1],
self.outChannels, out_shape[-2], out_shape[-1]),
dtype=np.float32)
self.__diff_bias = np.zeros((self.outChannels, out_shape[-2], out_shape[-1]),
dtype=np.float32)
for row_idx in range(out_shape[-2]):
for col_idx in range(out_shape[-1]):
i, j = self.__out_idx_to_in_idx([row_idx, col_idx])
in_patch = pad[:, i: i + self.kernelSize, j: j + self.kernelSize].reshape(self.inChannels, -1, 1)
out_patch = (kernel @ in_patch).sum(axis=1) + self.bias
self.__diff_kernel[:, :, :, row_idx, col_idx] += in_patch
self.__diff_x[:,
i * pad.shape[1] + j: i * pad.shape[1] + j + self.kernelSize * self.kernelSize,
:,
row_idx, col_idx] += kernel.reshape((self.outChannels, self.inChannels, -1)).transpose((-2, -1, -3))
self.__diff_bias[:, row_idx, col_idx] += 1
outputImage[:, row_idx, col_idx] = out_patch.reshape((self.outChannels,))
# diff = self.__diff_kernel * outputImage
return outputImage
def backward(self, d_out):
self.__diff_kernel = (self.__diff_kernel * d_out).sum(axis=(-2, -1)).transpose((-1, -3, -2)).reshape(
self.weights.shape)
diff_x = (self.__diff_x * d_out).sum(axis=(-3, -2, -1)).reshape(self.__pad_img_shape)
self.__diff_bias = (self.__diff_bias * d_out).sum(axis=(-2, -1))
if self.padding == 0:
return diff_x
return diff_x[:, self.padding: -self.padding, self.padding: -self.padding]
对于批次输入的场景,我们模型的输入数据尺寸变成了, 输出数据的尺寸变成了 。其中是我们批次数据的大小,比如每次处理32张图像,就是32。
在3.1代码的基础上实现批处理能力比较简单,只需要在forward过程中循环处理每张图像就可以了, 关键代码如下:
class Conv2d(Block):
def __init__(self, inChannels=1, outChannels=1, kernelSize=3, stride=1, padding=0):
self.inChannels = inChannels
self.outChannels = outChannels
self.kernelSize = kernelSize
self.stride = stride
self.padding = padding
fan_in = inChannels * kernelSize * kernelSize
self.weights = np.random.normal(0, np.sqrt(2/fan_in),
size=(outChannels, inChannels, kernelSize, kernelSize)
).astype(np.float32)
self.bias = np.zeros((outChannels, 1, 1)).astype(np.float32)
# 缓存内部梯度
self.__diff_kernel = None
self.__diff_bias = None
self.__diff_x = None
# 缓存全局梯度
self.__nabla_weights = None
self.__nabla_bias = None
self.__pad_img_shape = None
def forward(self, x):
pad = self.__padding(x)
self.__pad_img_shape = pad.shape
out_shape = self.__feat_shape(pad)
outputImage = np.zeros((x.shape[0], self.outChannels, out_shape[-2], out_shape[-1]), dtype=x.dtype)
kernel = self.weights.reshape(self.outChannels, self.inChannels, -1, self.kernelSize * self.kernelSize)
# 记录当前步骤内误差对应变量的梯度,用于反向传播
if self.__diff_kernel is None:
self.__diff_kernel = np.zeros((self.inChannels, self.kernelSize * self.kernelSize,
self.outChannels, out_shape[-2], out_shape[-1]), dtype=np.float32)
self.__diff_x = np.zeros((self.inChannels, pad.shape[-2] * pad.shape[-1],
self.outChannels, out_shape[-2], out_shape[-1]),
dtype=np.float32)
self.__diff_bias = np.zeros((self.outChannels, out_shape[-2], out_shape[-1]),
dtype=np.float32)
for image_idx in range(x.shape[0]):
for row_idx in range(out_shape[-2]):
for col_idx in range(out_shape[-1]):
i, j = self.__out_idx_to_in_idx([row_idx, col_idx])
in_patch = pad[image_idx, :, i: i + self.kernelSize, j: j + self.kernelSize].reshape(self.inChannels, -1, 1)
out_patch = (kernel @ in_patch).sum(axis=1) + self.bias
self.__diff_kernel[:, :, :, row_idx, col_idx] += in_patch
self.__diff_x[:,
i * pad.shape[-1] + j: i * pad.shape[-1] + j + self.kernelSize * self.kernelSize,
:,
row_idx, col_idx] += kernel.reshape((self.outChannels, self.inChannels, -1)).transpose((-2, -1, -3))
self.__diff_bias[:, row_idx, col_idx] += 1
outputImage[image_idx, :, row_idx, col_idx] = out_patch.reshape((self.outChannels,))
return outputImage
def backward(self, d_out):
d_out = Conv2d._clip_gradient(d_out)
# for image_idx in range(d_out.shape[0]):
# __diff_kernel : [C_in x K_size x C_out x H_out x W_out]
# d_out: [B_size x C_out x H_out x W_out] --> [B_size x 1 x 1 x C_out x H_out_W_out]
# __diff_kernel * d_out: [B_size x C_in x K_size x C_out x H_out x W_out] -->
# [C_out x C_in x K_size x B_size x H_out x W_out]-->
# [C_out x C_in x H_k x W_k]
batch_size, out_channels, h_out, w_out = d_out.shape
self.__nabla_weights = (self.__diff_kernel * d_out.reshape(batch_size, 1, 1, out_channels, h_out, w_out)
).transpose((-3, -5, -4, -6, -2, -1)).sum(axis=(-3, -2, -1)).reshape(
self.weights.shape)
# the same as \nabla_kernel(d_out),
# calculating \nabla_input(d_out)
# __diff_x * d_out: [B_size x C_in x In_size x C_out x H_out x W_out] -->
# [B_size x C_in x In_size]--> [B_size x C_in x H_in x W_in]
diff_x = (self.__diff_x * d_out.reshape(batch_size, 1, 1, out_channels, h_out, w_out)
).sum(axis=(-3, -2, -1)).reshape(self.__pad_img_shape)
# __diff_bias * d_out: [B_size x C_out x H_out x W_out] -->
# [C_out x B_size x H_out x W_out] --> [C_out x 1 x 1]
self.__nabla_bias = ((self.__diff_bias * d_out).transpose((-3, -4, -2, -1))
.sum(axis=(-3, -2, -1))).reshape(self.bias.shape)
return diff_x[:, :, self.padding: -self.padding, self.padding: -self.padding] if self.padding != 0 else diff_x
在这样批次处理能力的基础上,batch size为32时,实现了1.2s/it的训练迭代速度。 什么概念呢?在MNIST数据的训练集上跑完一个epoch大概需要36分钟左右,用“龟速”来形容它毫不过分!
上一小节中的实现之所以在执行速度上这样拉胯是因为forward过程中的循环嵌套。每个批次的数据要执行次矩阵运算。这样的循环迭代在Python, Matlab等语言中是“伤不起”的。
要解决这个问题的思路也很简单,那就是在计算之前把输入数据的维度彻底展开,在一次矩阵运算中完成所有循环步骤中的计算过程,这样就能充分运用矩阵计算的并行处理能力。这个策略在Python,Matlab之类的编程语言中屡试不爽(当然前提要有足够的内存来支持展开操作)。
下面我们给出在计算展开中的维度变换示意图:

其实在目前绝大多数的CNN实现中,都采用了这种策略,人们专门给它起了一个名字“Image2col”,即把图像转换成列向量来处理。
关键代码实现如下:
class Conv2d(Block):
def __init__(self, inChannels=1, outChannels=1, kernelSize=3, stride=1, padding=0):
self.inChannels = inChannels
self.outChannels = outChannels
self.kernelSize = kernelSize
self.stride = stride
self.padding = padding
fan_in = inChannels * kernelSize * kernelSize
self.weights = np.random.normal(0, np.sqrt(2/fan_in),
size=(outChannels, inChannels * kernelSize * kernelSize)
).astype(np.float32)
self.bias = np.zeros((outChannels, 1)).astype(np.float32)
self.__output_shape = None
# 缓存内部梯度
self.__diff_kernel = None
# self.__diff_bias = None
# self.__diff_x = None
# 缓存全局梯度
self.__nabla_weights = None
self.__nabla_bias = None
self.__pad_img_shape = None
def __image2col(self, x):
ho, wo = self.__output_shape
ksize = self.weights.shape[-1]
bsize = x.shape[0]
n_step = ho * wo
col_image = np.zeros((bsize, ksize, n_step), dtype=x.dtype)
for col_out in range(wo):
for row_out in range(ho):
col_idx = row_out * wo + col_out
i, j = self.__out_idx_to_in_idx([row_out, col_out])
in_patch = x[:, :, i: i + self.kernelSize, j: j + self.kernelSize].reshape((bsize, -1))
col_image[:, :, col_idx] = in_patch
return col_image
def __row2image(self, col_image):
ho, wo = self.__output_shape
bsize = col_image.shape[0]
return col_image.reshape((bsize, self.outChannels, ho, wo))
def forward(self, x):
pad = self.__padding(x)
self.__pad_img_shape = pad.shape
# # print("type: ", pad.shape, pad.dtype)
self.__output_shape = self.__feat_shape(pad)
col_image = self.__image2col(pad)
self.__diff_kernel = col_image.transpose((-3, -1, -2))
out_prod = self.weights @ col_image + self.bias
return self.__row2image(out_prod)
def __delta_feature2row(self, delta_feature):
ho, wo = self.__output_shape
batch_size = delta_feature.shape[0]
return delta_feature.reshape((batch_size, self.outChannels, ho * wo))
def __delta_col2image(self, delta_col):
# hin, win = self.__pad_img_shape
delta_pad = np.zeros(self.__pad_img_shape)
ho, wo = self.__output_shape
# ksize = self.weights.shape[-1]
bsize = delta_col.shape[0]
# n_step = ho * wo
for col_out in range(wo):
for row_out in range(ho):
col_idx = row_out * wo + col_out
i, j = self.__out_idx_to_in_idx([row_out, col_out])
delta_pad[:, :, i: i + self.kernelSize, j: j + self.kernelSize] += delta_col[:, :, col_idx].reshape(
(bsize, self.inChannels, self.kernelSize, self.kernelSize)
)# .transpose((0, -1, -3, -2))
return delta_pad
def backward(self, d_out):
# pass
delta_feature = self.__delta_feature2row(d_out)
# nabla kernel: delta_feature @ self.__delta_kernel -> [Batch, C_out, H_k * W_k * C_in]
self.__nabla_weights = (delta_feature @ self.__diff_kernel).sum(axis=0).reshape(self.weights.shape)
# nabla bias:
self.__nabla_bias = delta_feature.transpose((-2, -3, -1)).sum(axis=(-2, -1)).reshape(self.bias.shape)
# delta_input: self.weights.T @ delta_feature --> [Batch, H_k * W_k * C_in, N_step]
delta_col = self.weights.T @ delta_feature
delta_x = self.__delta_col2image(delta_col)
if self.padding == 0:
return delta_x
else: return delta_x[:, :, self.padding:-self.padding, self.padding:-self.padding]
注意, 在这个实现里,我们权重直接采用的二维矩阵来存储,计算过程中不需要再对权重矩阵做维度调整了。
在这个版本中,同样batch size为32的情况下,实现了8-12it/s的训练速度,即在MNIST数据的训练集上完成一个epoch的迭代大概需要2-3分钟。
在卷积神经网络中,卷积操作是重头戏,但不是全部。我们知道,除此之外还有池化层、Dropout层、全连接层以及激活函数、Flatten等操作。接下来我们看下复刻LeNet-5需要的其他组件。
为了完成LeNet-5的结构,我们实现了最大池化层。
池化层和卷积操作类似,同样也可以指定池化核的大小,步长以及边缘填充的大小。
与卷积层不同的是,池化层没有可学习的参数。拿最大池化操作为例,它将池化核覆盖范围内的最大的元素保留,作为特征图目标位置的数值,其他领域内的元素都被丢弃,进而达到压缩特征尺寸(下采样)的目的。
因为我们当前的实现版本并没有自动微分机制,因此,对于池化层来讲,我们需要记录哪些位置的元素被传递到下一层了,在反向传播过程中,也仅有这部分元素的误差允许通过pooling层继续往前传递。
代码实现如下:
class MaxPool2d(Block):
def __init__(self, ksize, stride, padding=0):
super(MaxPool2d).__init__()
self.ksize = ksize
self.stride = stride
self.padding = padding
self.__mask = None
def __padding(self, x):
if self.padding == 0:
return x
else:
in_channels = x.shape[-3] if x.ndim == 3 else 1
paddedImage = np.zeros((in_channels, x.shape[-2] + self.padding * 2, x.shape[-1] + self.padding * 2),
dtype=x.dtype)
paddedImage[:, self.padding:-self.padding, self.padding:-self.padding] = x
return paddedImage
def __feat_shape(self, paddedImage):
return ((paddedImage.shape[-2] - self.ksize) // self.stride + 1,
(paddedImage.shape[-1] - self.ksize) // self.stride + 1
)
def __out_idx_to_in_idx(self, outIndex):
i, j = outIndex
return self.stride * i, self.stride * j
def forward(self, x):
padded = self.__padding(x)
out_shape = self.__feat_shape(padded)
out = np.zeros((x.shape[0], x.shape[1], out_shape[-2], out_shape[-1]), dtype=x.dtype)
self.__mask = np.zeros_like(padded, dtype=x.dtype)
for image_idx in range(x.shape[0]):
for row_idx in range(out_shape[-2]):
for col_idx in range(out_shape[-1]):
i, j = self.__out_idx_to_in_idx([row_idx, col_idx])
# print("padded: ", padded.shape, i, j)
patch = padded[image_idx, :, i:i + self.ksize, j:j + self.ksize]
# print("patch: ", patch.shape)
max_val = np.max(
patch.reshape((-1, patch.shape[-1] * patch.shape[-2]))
, axis=-1
)# .reshape((-1, 1, 1))
# print(max_val.shape)
# print("idx: ", (patch == max_val.reshape((-1, 1, 1))).astype(int))
out[image_idx, :, row_idx, col_idx] = max_val
self.__mask[image_idx, :, i: i + self.ksize, j: j + self.ksize] = patch == max_val.reshape((-1, 1, 1))
return out
def backward(self, d_out):
diff_x = np.zeros_like(self.__mask, dtype=d_out.dtype)
for image_idx in range(diff_x.shape[0]):
for row_idx in range(d_out.shape[-2]):
for col_idx in range(d_out.shape[-1]):
i, j = self.__out_idx_to_in_idx([row_idx, col_idx])
diff_x[image_idx, :, i: i + self.ksize, j: j + self.ksize] = d_out[image_idx, :, row_idx, col_idx].reshape((-1, 1, 1))
if self.padding == 0:
return diff_x * self.__mask
else:
return (diff_x * self.__mask)[:, :, self.padding: -self.padding, self.padding: -self.padding]
Flatten层的作用是,在卷积层提取到图像的深层特征之后,进入到全连接层学习实际的目标任务(如分类)之前,把所有的特征图展开成行(列)向量,以适配全连接的输入。
我们除了把输入特征图转换到全连接层需要的输入维度外,还需要记录该层的输入尺寸,以便在反向传播过程中,能将下游传递过来的误差正确还原到上一层(卷积或者池化层)的输出维度。
代码实现如下:
class Flatten(Block):
def __init__(self, start_dim=1, end_dim=-1):
self.input_shape = None
self.output_shape = None
self.start_dim = start_dim
self.end_dim = end_dim
def forward(self, x):
self.input_shape = x.shape
self.end_dim = self.end_dim if self.end_dim < 0 else self.end_dim - len(self.input_shape)
self.start_dim = self.start_dim if self.start_dim >= 0 else len(self.input_shape) + self.start_dim
end_dim = len(self.input_shape) + self.end_dim
output_shape = tuple(self.input_shape[i] for i in range(self.start_dim)) + (-1,) + tuple(
(self.input_shape[i] for i in range(end_dim + 1, len(self.input_shape)))
)
return x.reshape(output_shape)
def backward(self, y):
return y.reshape(self.input_shape)
我们的全连接层设计的输入维度为:, 即:每一行为一条数据。
全连接层的线性映射模型为:
在我们理解清楚了第二部分讲的矩阵梯度的计算过程之后,全连接层的实现简直就“水到渠成”了。
关键代码如下:
class Linear(Block):
"""
Linear layer: Y = XW + bias
"""
def __init__(self, in_dim, out_dim):
super().__init__()
self.in_dim = in_dim
self.out_dim = out_dim
self.weights = np.random.normal(0, np.sqrt(2 / self.in_dim), size=(in_dim, out_dim)).astype(float)
self.bias = np.zeros((1, out_dim), dtype=float)
self.__diff_w = None
self.__diff_b = None # np.eye(out_dim, dtype=float)
def forward(self, x):
assert self.in_dim == x.shape[-1]
self.__diff_w = x.T
return x @ self.weights + self.bias
def backward(self, diff_out):
diff_out = Linear._clip_gradient(diff_out) #np.clip(diff_out, -200, 200)
assert self.out_dim == diff_out.shape[-1]
# print(self.__diff_w.shape, diff_out.shape)
self.__diff_w = self.__diff_w @ diff_out
self.__diff_b = (diff_out @ np.eye(self.out_dim, dtype=float)).sum(axis=-2, keepdims=True)
# print("diff: ", self.__diff_w.shape, self.__diff_b.shape)
return diff_out @ self.weights.T
现代深度学习框架一般不会将激活函数作为单独的层,即便它提供了类似层的实现(如 torch.nn.ReLU),也是无状态的,因此你可以创建一个ReLu实例,把它作为多个网络层的激活函数。
和最大池化层类似,由于我们的实现中没有带自动微分的Tensor类型,因此我们这里的ReLu也需要作为一个特定的网络层存在,且需要记录哪些位置的元素通过了激活函数传递到下一层。 只有这样,我们才能确保误差进行了正确的传递。
代码实现如下:
class ReLU(Block):
def __init__(self):
super().__init__()
self.__mask = None
def forward(self, x):
self.__mask = (x > 0).astype(x.dtype)
return x * self.__mask
def backward(self, diff_out):
assert self.__mask is not None
return diff_out * self.__mask
为了方便地创建不同深度,不同结构的模型,我们创建了网络层的序列容器Sequential类。
它通过Python关键字语法允许我们将任意数量的层对象实例作为构造函数的参数,且支持对整个模型层面的前向和反向传播操作,支持获取模型参数的操作。
代码实现如下:
class Sequential(Block):
def __init__(self, *layers):
super(Sequential, self).__init__()
self.layers = layers
def forward(self, x):
# print("===================== forward =====================")
for layer in self.layers:
# print(f"{layer} input: ", x.shape)
x = layer(x)
# print("output: ", x.shape)
return x
def backward(self, diff_out):
# print("===================== backward =====================")
for layer in self.layers[::-1]:
# print(f"{layer} input: ", diff_out.shape)
diff_out = layer.backward(diff_out)
# print("output: ", diff_out.shape)
def get_weights_layer(self):
weights_layers = []
for layer in self.layers:
if isinstance(layer, Conv2d) or isinstance(layer, Linear):
weights_layers.append(layer)
return weights_layers
很多刚接触深度学习的新人,知道MNIST数据集,知道这是一个进入深度学习领域的Hello world级别的问题,可是并没有见到过,数据集中的手写数字图像是什么样子的!因为torchvision提供给我们的是ubyte的二进制文件格式。
这里我们首先提供了一个基于torchvision的MNIST数据下载器,在数据下载完成后,我们以数据实例的序号和类别标签构造文件名,还原成图像文件。代码如下:
import os
from torchvision import datasets
if __name__ == "__main__":
train_dataset = datasets.MNIST(root="./data", train=True, download=True)
test_dataset = datasets.MNIST(root="./data", train=False)
# 创建目录
os.makedirs("images/train", exist_ok=True)
for idx, (x, y) in enumerate(train_dataset):
x.save(f"images//train//idx-{idx}-label-{y}.png")
os.makedirs("images/test", exist_ok=True)
for idx, (x, y) in enumerate(test_dataset):
x.save(f"images//test//idx-{idx}-label-{y}.png")
print("=============== download finished ===============")
保持下来的图像如下图所示:

另外,我们在处理实际问题时,尝尝需要构造、加载自己的数据集,需要处理批量操作,需要随机打乱数据的顺序(防止数据分布与样本顺序强相关而影响训练过程)等,需要编写自己的数据加载器。这里我们给出两种版本的实现,以便大家对于数据加载器有更深入的理解。
基于生成器的版本:
def data_loader(path, batch_size, shuffle=True):
def get_filenames(_path):
_filenames = []
for r, d, files in os.walk(path):
_filenames += [os.path.join(r, filename) for filename in files]
return _filenames
filenames = get_filenames(path)
# print(filenames)
indices = list(range(len(filenames)))
if shuffle:
np.random.shuffle(indices)
x, y = [], []
# print("len indices: ", len(indices))
for idx, index in enumerate(indices):
filename = filenames[index]
# print(filename)
with Image.open(filename) as img:
np_img = np.array(img)
if len(np_img.shape) == 2:
np_img = np_img.reshape((1, np_img.shape[0], np_img.shape[1]))
else:
np_img = np_img.transpose((2, 0, 1))
x.append(np_img)
y.append(int(filename.split(os.path.sep)[-1].split(".")[0].split("-")[-1]))
if len(x) == batch_size:
yield np.array(x), np.array(y)
x.clear()
y.clear()
if len(x) > 0 and len(x) == len(y):
yield np.array(x), np.array(y)
基于迭代器类的实现版本:
def _get_filenames(_path):
_filenames = []
for r, d, files in os.walk(_path):
_filenames += [os.path.join(r, filename) for filename in files]
return _filenames
class MyDataLoader:
def __init__(self, path, batch_size, shuffle=True, limit = -1, transform=lambda x: x):
self.filenames = _get_filenames(path)
self.indices = list(range(len(self.filenames)))
if shuffle:
random.shuffle(self.indices)
if limit > 0:
self.indices = self.indices[:limit]
self.shuffle = shuffle
self.batch_size = batch_size
self.num_batches = len(self.indices) // self.batch_size
if self.num_batches * self.batch_size < len(self.indices):
self.num_batches += 1
self.current_index = 0
self.transform = transform
def __len__(self):
return self.num_batches
def __iter__(self):
self.current_index = 0
if self.shuffle:
random.shuffle(self.indices)
return self
def __next__(self):
if self.current_index >= len(self.indices):
raise StopIteration
x, y = [], []
batch_indices = self.indices[self.current_index:self.current_index + self.batch_size]
self.current_index += len(batch_indices)
# print(self.current_index)
for index in batch_indices:
filename = self.filenames[index]
with Image.open(filename) as img:
np_img = self.transform(np.array(img, dtype=float))
if len(np_img.shape) == 2:
np_img = np_img.reshape((1, np_img.shape[0], np_img.shape[1]))
else:
np_img = np_img.transpose((2, 0, 1))
x.append(np_img)
y.append(int(filename.split(os.path.sep)[-1].split(".")[0].split("-")[-1]))
return np.array(x), np.array(y)
在进行训练之前,我们还需要定义交叉熵损失函数、独热(one-hot)编码器、Softmax映射、实现简单的优化器等。 这些细节我们就不在本文中讨论了,感兴趣的朋友可以参考之前的文章:《Softmax函数与交叉熵损失》 , 《梯度下降算法与自动微分机制》, 《Adam优化器原理》等。
训练模型的代码如下:
import os.path
from conv_layer import Conv2d, MaxPool2d, Flatten
from sequential import Sequential
from linear_layer import Linear
from relu import ReLU
import numpy as np
from criterion import cross_entropy_loss
from optimizer import SGD
from downloader.my_dataloader_v2 import MyDataLoader
from preprocessing import one_hot_encoding
from tqdm import tqdm
import pickle
if __name__ == "__main__":
model = Sequential(
Conv2d(inChannels=1, outChannels=6, kernelSize=5, stride=1, padding=2),
ReLU(),
MaxPool2d(2, stride=2),
Conv2d(6, 16, kernelSize=5,stride=1, padding=0),
ReLU(),
MaxPool2d(2, stride=2),
Flatten(),
Linear(16 * 5 * 5, out_dim=120),
ReLU(),
Linear(120, 84),
ReLU(),
Linear(84, 10),
)
epochs = 50
batch_size = 32
num_classes = 10
dataloader = MyDataLoader(os.path.join("downloader", "images", "train"), batch_size,
transform=lambda img:img/255.0)
optimizer = SGD(model.get_weights_layer(), lr=0.001)
for epoch in range(epochs):
batch_loss = []
batch_acc = []
pbar = tqdm(dataloader, desc="training")
for x, y in pbar:# tqdm(dataloader, desc=f"epoch {epoch}"):
onehot_labels = one_hot_encoding(y, num_classes)
optimizer.zero_grad()
logits = model(x)
loss = cross_entropy_loss(onehot_labels, logits)
acc = (logits.argmax(axis=-1) == y).sum() / len(y)
# print(f"current loss: {loss}")
pbar.set_postfix(epoch=f"{epoch}",
acc=f"{acc:.4f}",
mean_acc=f"{(np.array(batch_acc).mean() if len(batch_acc) > 0 else 0):.4f}",
loss=f"{loss:.4f}",
mean_loss=f"{(np.array(batch_loss).mean() if len(batch_loss) > 0 else 0):.4f}")
# print(f"{logits}, {(logits.argmax(axis=-1) == y).sum() / len(y) },{onehot_labels},{loss:.4f}")
batch_loss.append(loss)
batch_acc.append(acc)
model.backward((logits - onehot_labels)/len(y))
optimizer.step()
with open("model.pickle", "wb") as f:
pickle.dump(model, f)
终于到了激动人心的时刻!我们经历了千辛万苦完成了从零搭建CNN网络框架的工作,经过简单优化后,完成了在MNIST 60000张全量训练集上50个epochs的训练。最终我们的模型能达到怎样的效果呢?
接下来我们通过下面的代码完成在测试集上的推理和准确率统计工作:
import os
from pickle import load
from downloader.my_dataloader_v2 import MyDataLoader
from criterion import softmax
from tqdm import tqdm
if __name__ == "__main__":
with open("model.pickle", "rb") as f:
model = load(f)
print(model)
dataloader = MyDataLoader(os.path.join("downloader", "images", "test"), batch_size=1,
transform=lambda x: x /255.0)
progress = tqdm(dataloader, desc="classification: ")
right_classified_count = 0
processed_samples = 0
for x, y in progress:
prob = softmax(model(x))
right_classified_count += (prob.argmax(axis=-1) == y).sum()
processed_samples += 1
progress.set_postfix(processed=f"{processed_samples}", right_classified=f"{right_classified_count}",
accuracy=f"{(right_classified_count / processed_samples * 100):0.2f}%")
# print(count)
print(f"final result: {right_classified_count} out of {len(dataloader)} are classified correctly")
最终我们在测试集上的效果如下:准确率98.76%, 即10000张图像中,有124张图像存在误判。推理速度为332.07it/s, 即每秒钟大约可以处理330张左右的手写数字识别任务。

本文从零(除了矩阵运算)搭建了一个通用的小型CNN网络框架,并且用它复刻了曾经比较有影响力的LeNet-5卷积神经网络,深入了解了卷积网络的各个细节。
在MNIST手写数字识别任务上,也达到了可以接受的效果。
不过,美中不足的是:目前经历了50轮次的训练,虽然准确率在98-99%左右,但是正确分类实力的置信度仅有23%左右,损失值始终维持在1.46附近。
针对模型的优化求解,仍然有深大的上升空间和需要深入分析的地方。