Python/神经网络/main.py

501 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
神经网络(多层感知机)
"""
# 导入模块
from typing import List, Literal
import numpy
import pickle
class NeuralNetwork:
"""
神经网络
"""
HIDDEN_ACTIVATES = ["relu"]
OUTPUT_ACTIVATES = ["linear", "softmax"]
def __init__(
self,
structure: List[int],
hidden_activate: Literal["relu"] = "relu",
output_activate: Literal["linear", "softmax"] = "linear",
seed: int = 62,
epsilon: float = 1e-9,
):
"""
初始化
:param structure: 神经网络结构,例如[2, 10, 1]表示2层神经网络具体为输入层2个神经元、隐含层10个神经元、输出层1个神经元
:param hidden_activate: 隐含层的激活函数默认为relu
:param output_activate: 输出层的激活函数默认为linear
:param seed: 随机种子
:param epsilon: 极小值默认为1e-9
"""
print("正在初始化神经网络...", end="")
if not (
len(structure) >= 3
and all(x >= 1 if isinstance(x, int) else False for x in structure)
if isinstance(structure, list)
else False
):
raise RuntimeError("神经网络结构应为列表长度大于等于3且元素均为正整数")
# 初始化神经网络结构
self.structure = structure
# 神经网络层数定义第0层为输入层第l层为隐含层l=1,2,...,L-1第L层为输出层L为神经网络层数深度为L+1
self.layer_counts = len(structure) - 1
if hidden_activate not in self.HIDDEN_ACTIVATES:
raise RuntimeError(f"该隐含层激活函数 {hidden_activate} 暂不支持")
self.hidden_activate = hidden_activate
if output_activate not in self.OUTPUT_ACTIVATES:
raise RuntimeError(f"该输出层激活函数 {output_activate} 暂不支持")
self.output_activate = output_activate
# 初始化神经网络参数
self.parameters = {}
# 初始化模式(包括训练模式和推理模式)
self.training = None
# 初始化随机种子
self.seed = seed
# 初始化极小值
self.epsilon = epsilon
print("已完成")
def train(
self,
X: numpy.ndarray,
y_true: numpy.ndarray,
target_loss: float = 1e-3,
epochs: int = 200,
learning_rate: float = 0.001,
) -> None:
"""
训练神经网络
:param X: 输入,维度为[输入神经元数, 样本数]
:param y_true: 真实输出,维度为[输出神经元数, 样本数]
:param target_loss: 目标损失
:param epochs: 学习轮数
:param learning_rate: 学习率
:return: 无
"""
print(
f"开始训练神经网络:目标损失为 {target_loss},学习轮数为 {epochs},学习率为 {learning_rate}..."
)
if not (
X.shape[1] == y_true.shape[1]
and X.shape[0] == self.structure[0]
and y_true.shape[0] == self.structure[-1]
if isinstance(X, numpy.ndarray) and isinstance(y_true, numpy.ndarray)
else False
):
raise RuntimeError(
"输入和真实输出应为数组,其中输入维度应为[输入神经元数, 样本数],真实输出维度应为[输出神经元数, 样本数],样本数应需相同"
)
# 默认为训练模式
self.training = True
# 初始化神经网络参数
self._init_parameters()
# 归一化输入
self.parameters[0].update({"activation": self._normalize(input=X)})
epoch = 1
while True:
# 前向传播
self._forward_propagate()
# 计算损失
loss = self._calculate_loss(y_true=y_true)
if loss <= target_loss:
print(
f"{epoch:6d} 轮损失已达到目标损失 {target_loss:9.3f},训练结束"
)
break
if epoch > epochs:
print(f"已达到最大学习轮数,损失为 {loss:9.3f},训练结束")
break
# 后向传播
self._backward_propagate(y_true=y_true)
# 更新神经网络参数
self._update_parameters(learning_rate=learning_rate)
if epoch % 100 == 0:
print(f"{epoch:6d} 轮损失为 {loss:9.3f},继续训练...")
epoch += 1
# 保存神经网络参数
self._save_parameters()
def _init_parameters(self) -> None:
"""
初始化神经网络参数
:return: 无
"""
self.parameters = {0: {}}
# 初始化神经网络参数
for layer_index in range(1, self.layer_counts + 1):
self.parameters[layer_index] = {
"activate": (
activate := (
self.output_activate
if layer_index == self.layer_counts
else self.hidden_activate
)
), # 激活函数
"weight": self._init_weight(
activate=activate,
previous_layer_neuron_counts=self.structure[layer_index - 1],
current_layer_neuron_counts=(
current_layer_neuron_counts := self.structure[layer_index]
),
), # 权重,维度为[当前层神经元数,上一层神经元数],适配加权输入=权重*输入+平移
"bias": self._init_bias(
current_layer_neuron_counts=current_layer_neuron_counts
), # 平移
}
def _init_weight(
self,
activate: str,
previous_layer_neuron_counts: int,
current_layer_neuron_counts: int,
) -> numpy.ndarray: # pyright: ignore[reportReturnType]
"""
初始化权重
:param activate: 激活函数
:param previous_layer_neuron_counts: 上一层神经元数
:param current_layer_neuron_counts: 当前层神经元数
:return: 初始化后的权重,维度为[当前层神经元数,上一层神经元数]
"""
# 设置随机种子
numpy.random.seed(self.seed)
# 基于正态分布生成权重
weight = numpy.random.randn(
current_layer_neuron_counts, previous_layer_neuron_counts
)
match activate:
case "relu":
return weight * numpy.sqrt(
2 / previous_layer_neuron_counts
) # 使用He初始化权重方法
case "linear":
return weight * numpy.sqrt(
2 / previous_layer_neuron_counts
) # 使用He初始化权重方法
case "softmax":
return weight * numpy.sqrt(
2 / (previous_layer_neuron_counts + current_layer_neuron_counts)
) # 使用Xavier初始化权重方法
def _init_bias(
self,
current_layer_neuron_counts: int,
) -> numpy.ndarray:
"""
初始化平移
:param current_layer_neuron_counts: 当前层神经元数
:return: 初始化后的平移,维度为[当前层神经元数, 1]
"""
return numpy.zeros((current_layer_neuron_counts, 1))
def _normalize(
self,
input: numpy.ndarray,
) -> numpy.ndarray:
"""
归一化
:param input: 输入,维度为[输入神经元数, 样本数]
:return: 归一化后的输入
"""
# 若为训练模式则更新各输入神经元所有样本的平均值和方差,维度为[输入神经元数, 1]
if self.training:
self.parameters[0].update(
{
"mean": numpy.mean(input, axis=1, keepdims=True),
"variance": numpy.var(input, ddof=0, axis=1, keepdims=True),
}
)
return (input - self.parameters[0]["mean"]) / numpy.sqrt(
self.parameters[0]["variance"] + self.epsilon
)
def _forward_propagate(self) -> None:
"""
前向传播
:return: 输出层的预测输出,维度为[输出神经元数, 样本数]
"""
for layer_index in range(1, self.layer_counts + 1):
self.parameters[layer_index].update(
{
"weighted_input": (
weighted_input := numpy.dot(
self.parameters[layer_index]["weight"],
self.parameters[layer_index - 1][
"activation"
], # 将上一层的输出作为当前层的输入
)
+ self.parameters[layer_index]["bias"]
), # 加权输入,维度为[当前层神经元数,样本数],将上一层的输出作为当前层的输入
"activation": self._activate(
activate=self.parameters[layer_index]["activate"],
input=weighted_input,
), # 输出
}
)
def _activate(
self,
activate: str,
input: numpy.ndarray,
) -> numpy.ndarray: # pyright: ignore[reportReturnType]
"""
激活
:param activate: 激活函数
:param input: 输入
:return: 经过激活函数计算后的输入,维度与输入相同
"""
match activate:
case "relu":
return numpy.maximum(0, input)
case "linear":
return input
case "softmax":
# 加权输入的指数项
e_weighted_input = numpy.exp(
input - numpy.max(input, axis=0, keepdims=True)
) # 减去各样本所有神经元最大值以避免指数溢出
return e_weighted_input / numpy.sum(
e_weighted_input, axis=0, keepdims=True
)
def _calculate_loss(
self,
y_true: numpy.ndarray,
) -> numpy.floating: # pyright: ignore[reportReturnType]
"""
计算损失
:param y_true: 真实输出,维度为[输出神经元数, 样本数]
:return: 损失
"""
match self.parameters[self.layer_counts]["activate"]:
case "linear":
return 0.5 * numpy.mean(
numpy.square(
y_true - self.parameters[self.layer_counts]["activation"]
)
) # 若输出层的激活函数为linear则损失函数使用0.5*均方误差
case "softmax":
return numpy.mean(
(
numpy.max(
self.parameters[self.layer_counts]["weighted_input"],
axis=0,
keepdims=True,
)
+ numpy.log(
numpy.sum(
numpy.exp(
self.parameters[self.layer_counts]["weighted_input"]
- numpy.max(
self.parameters[self.layer_counts][
"weighted_input"
],
axis=0,
keepdims=True,
)
),
axis=0,
keepdims=True,
)
)
).squeeze()
- numpy.sum(
y_true * self.parameters[self.layer_counts]["weighted_input"],
axis=0,
)
) # 若输出层的激活函数为softmax则损失函数使用交叉熵
def _backward_propagate(
self,
y_true: numpy.ndarray,
) -> None:
"""
后向传播
:param y_true: 真实输输出,维度为[输出神经元数, 样本数]
:return: 无
"""
for layer_index in range(self.layer_counts, 0, -1):
self.parameters[layer_index].update(
{
"delta_activation": (
delta_activation := (
(self.parameters[layer_index]["activation"] - y_true)
/ y_true.shape[1]
# 若为输出层则直接计算输出的梯度,若为隐含层则基于下一层的加权输入的梯度推出当前层的输出梯度
if layer_index == self.layer_counts
else numpy.dot(
self.parameters[layer_index + 1]["weight"].T,
self.parameters[layer_index + 1][
"delta_weighted_input"
],
)
)
),
"delta_weighted_input": (
delta_weighted_input := delta_activation
* self._activate_derivative(
activate=self.parameters[layer_index]["activate"],
input=self.parameters[layer_index]["weighted_input"],
)
), # 加权输入的梯度
"delta_weight": numpy.dot(
delta_weighted_input,
(self.parameters[layer_index - 1]["activation"]).T,
), # 权重的梯度
"delta_bias": numpy.mean(
delta_weighted_input,
axis=1,
keepdims=True,
), # 偏置的梯度
}
)
def _activate_derivative(
self,
activate: Literal["relu", "linear", "softmax"],
input: numpy.ndarray,
) -> numpy.ndarray:
"""
根据激活函数计算输入的导数
:param activate: 激活函数
:param input: 输入
:return: 经过激活函数计算后的输入,维度与输入相同
"""
match activate:
case "relu":
return numpy.where(input > 0, 1, 0)
case "linear":
return numpy.ones_like(input)
case "softmax":
activation = self._activate(
activate=activate,
input=input,
)
return activation * (1 - activation)
def _update_parameters(self, learning_rate: float) -> None:
"""
更新神经网络参数
:param learning_rate: 学习率
:return: 无
"""
for layer_index in range(1, self.layer_counts + 1):
self.parameters[layer_index].update(
{
"weight": self.parameters[layer_index]["weight"]
- self.parameters[layer_index]["delta_weight"]
* learning_rate, # 权重
"bias": self.parameters[layer_index]["bias"]
- self.parameters[layer_index]["delta_bias"]
* learning_rate, # 平移
}
)
def _save_parameters(self) -> None:
"""
保存神经网络参数
:return: 无
"""
with open("parameters.pkl", "wb") as file:
pickle.dump(
obj={
layer_index: {
key: value
for key, value in layer_parameters.items()
if layer_index == 0
and key in ["mean", "variance"]
or layer_index != 0
and key in ["weight", "bias", "activate"]
}
for layer_index, layer_parameters in self.parameters.items()
},
file=file,
protocol=pickle.HIGHEST_PROTOCOL,
)
def reason(self, X: numpy.ndarray) -> numpy.ndarray:
"""
推理
:param X: 输入,维度为[输入神经元数, 样本数]
:return: 预测输出,维度为[输出神经元数, 样本数]
"""
print(f"基于已训练神经网络进行推理...")
if not (
X.shape[0] == self.structure[0] if isinstance(X, numpy.ndarray) else False
):
raise RuntimeError("输入应为数组,输入维度应为[输入神经元数, 样本数]")
# 默认为推理模式
self.training = False
# 加载神经网络参数
self._load_parameters()
# 归一化输入
self.parameters[0].update({"activation": self._normalize(input=X)})
# 前向传播
self._forward_propagate()
return self.parameters[self.layer_counts]["activation"]
def _load_parameters(self) -> None:
"""
加载神经网络参数
:return: 无
"""
with open("parameters.pkl", "rb") as file:
self.parameters = pickle.load(file=file)
# 校验神经网络参数
for layer_index in range(1, self.layer_counts + 1):
if not (
self.parameters[layer_index]["weight"].shape
== (self.structure[layer_index], self.structure[layer_index - 1])
and self.parameters[layer_index]["bias"].shape
== (self.structure[layer_index], 1)
and (
self.parameters[layer_index]["activate"] in self.output_activate
if layer_index == self.layer_counts
else self.parameters[layer_index]["activate"]
in self.hidden_activate
)
if isinstance(self.parameters[layer_index]["weight"], numpy.ndarray)
and isinstance(self.parameters[layer_index]["bias"], numpy.ndarray)
else False
):
raise RuntimeError(
"神经网络参数中权重和偏置的维度与神经网络结构不匹配、或激活函数不匹配"
)
# 测试代码
if __name__ == "__main__":
X = numpy.random.randn(2, 1000)
# 真实函数y = 2*x1 + 3*x2 + 1
y_true = 2 * X[0:1, :] ** 2 + 3 * X[1:2, :] + 1
# 创建并训练神经网络
neural_network = NeuralNetwork(
structure=[2, 64, 32, 1], # 2输入10隐藏神经元1输出
)
# 训练
#neural_network.train(X=X, y_true=y_true, target_loss=0.01, epochs=1000_000, learning_rate=0.05)
print(f"推理结果:{y_true[:, 0:5]}")
print(f"推理结果:{neural_network.reason(X=X)[:, 0:5]}")