Python/神经网络/main.py

483 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
神经网络
"""
# 导入模块
from typing import List, Literal, Optional, Dict
import numpy
class NeuralNetwork:
"""
神经网络
"""
HIDDEN_ACTIVATES = ["relu"]
OUTPUT_ACTIVATES = ["linear", "softmax"]
def __init__(
self,
structure: List[int],
hidden_activate: Literal["relu"] = "relu",
output_activate: Literal["linear", "softmax"] = "linear",
momentum: float = 0.9,
epsilon: float = 1e-9,
):
"""
初始化
:param structure: 神经网络结构,例如[2, 10, 1]表示2层神经网络具体为输入层2个神经元、隐含层10个神经元、输出层1个神经元
:param hidden_activate: 隐含层的激活函数默认为relu
:param output_activate: 输出层的激活函数默认为linear
:param momentum: 动量因子默认为0.9
:param epsilon: 极小值默认为1e-9
"""
print("正在初始化神经网络...", end="")
if not (
all(x >= 1 if isinstance(x, int) else False for x in structure)
if isinstance(structure, list) and len(structure) >= 3
else False
):
raise RuntimeError(
"神经网络结构应为长度大于等于3的列表且列表元素应为大于等于1的整数"
)
# 初始化神经网络结构
self.structure = structure
if hidden_activate not in self.HIDDEN_ACTIVATES:
raise RuntimeError(f"该隐含层激活函数 {hidden_activate} 暂不支持")
self.hidden_activate = hidden_activate
if output_activate not in self.OUTPUT_ACTIVATES:
raise RuntimeError(f"该输出层激活函数 {output_activate} 暂不支持")
self.output_activate = output_activate
# 神经网络层数定义第0层为输入层第L层为输出层L为神经网络层数第l层为隐含层l=1,2,...,L-1深度为L+1
self.layer_counts = len(structure) - 1
self.parameters = {}
# 初始化神经网络参数
for layer_index in range(1, self.layer_counts + 1):
# 上一层和当前层神经元数量
previous_layer_neuron_counts, current_layer_neuron_counts = (
self.structure[layer_index - 1],
self.structure[layer_index],
)
self.parameters[layer_index] = {
"weight": numpy.random.randn(
current_layer_neuron_counts, previous_layer_neuron_counts
)
* (
numpy.sqrt(2 / previous_layer_neuron_counts)
if layer_index < self.layer_counts
else (
numpy.sqrt(1 / previous_layer_neuron_counts)
if self.output_activate == "linear"
else numpy.sqrt(
2
/ (
previous_layer_neuron_counts
+ current_layer_neuron_counts
)
)
)
), # 权重,维度为[当前层神经元数量,上一层神经元数量],适配加权输入=权重*输入+平移。隐含层使用He初始化权重方法、输出层激活函数若为linear则使用标准Xavier初始化权重方法否则使用改进Xavier初始化权重方法
"bias": numpy.zeros((current_layer_neuron_counts, 1)), # 平移
"moving_average": numpy.zeros(
(current_layer_neuron_counts, 1)
), # 批归一化的移动平均值
"moving_variance": numpy.ones(
(current_layer_neuron_counts, 1)
), # 批归一化的移动方差
"gamma": numpy.ones(
(current_layer_neuron_counts, 1)
), # 批归一化的缩放因子
"beta": numpy.zeros(
(current_layer_neuron_counts, 1)
), # 批归一化的平移因子
"activate": (
self.hidden_activate
if layer_index < self.layer_counts
else self.output_activate
), # 激活函数
}
self.momentum = momentum
# 初始化是否训练模式
self.training = None
self.epsilon = epsilon
print("已完成")
def train(
self,
X: numpy.ndarray,
y_true: numpy.ndarray,
target_loss: float = 1e-3,
epochs: int = 200,
learning_rate: float = 0.001,
) -> None:
"""
训练神经网络
:param X: 输入层的输入,维度为[输入特征数, 样本数]
:param y_true: 输出层的输出真实,维度为[输出特征数, 样本数]
:param target_loss: 目标损失
:param epochs: 学习轮数
:param learning_rate: 学习率
:return: 无
"""
print(
f"开始训练:目标损失为 {target_loss},学习轮数为 {epochs},学习率为 {learning_rate}..."
)
if not (
X.shape[1] == y_true.shape[1]
and X.shape[0] == self.structure[0]
and y_true.shape[0] == self.structure[-1]
if isinstance(X, numpy.ndarray) and isinstance(y_true, numpy.ndarray)
else False
):
raise RuntimeError(
f"输入层的输入和输出层的输出应为数组,其中输入层的输入维度应为[输入特征数, 样本数],输出层的输出维度应为[输出特征数, 样本数]。样本数应相同,输入特征数应为 {self.structure[0]},输出特征数应为 {self.structure[-1]}"
)
# 开启训练模式
self.training = True
# 归一化输入层的输入
X = self._normalize(input=X)
epoch = 0
while True:
# 前向传播
y_predict = self._forward_propagate(X=X)
# 计算损失
loss = self._calculate_loss(y_true=y_true, y_predict=y_predict)
if loss <= target_loss:
print(
f"{epoch:6d} 轮损失已达到目标损失 {target_loss:9.3f},训练结束"
)
break
if epoch > epochs:
print(f"已达到最大学习轮数,损失为 {loss:9.3f},训练结束")
break
# 后向传播
self._backward_propagate(X=X, y_true=y_true, y_predict=y_predict)
# 更新神经网络参数
self._update_parameters(learning_rate=learning_rate)
if epoch % 100 == 0:
print(f"{epoch:6d} 轮损失为 {loss:9.3f},继续训练...")
epoch += 1
for idx in numpy.random.choice(X.shape[1], size=10, replace=False):
y_true_val = y_true[0, idx]
y_pred_val = y_predict[0, idx]
error = abs(y_true_val - y_pred_val)
print(f"{idx:<10} {y_true_val:<15.4f} {y_pred_val:<15.4f} {error:<15.4f}")
def _normalize(
self,
input: numpy.ndarray,
) -> numpy.ndarray:
"""
归一化
:param input: 输入
:return: 归一化后的输入,维度与输入相同
"""
return (input - numpy.mean(input, axis=1, keepdims=True)) / numpy.sqrt(
numpy.var(input, ddof=0, axis=1, keepdims=True) + self.epsilon
)
def _forward_propagate(self, X: numpy.ndarray) -> numpy.ndarray:
"""
前向传播
:param X: 输入层的输入,维度为[输入特征数, 样本数]
:return: 输出层的输出预测,维度为[输出特征数, 样本数]
"""
activation = X # 将输入层的输入作为第0层的输出
for layer_index in range(1, self.layer_counts + 1):
self.parameters[layer_index].update(
{
"x": (x := activation), # 将上一层的输出作为当前层的输入
"weighted_input": (
weighted_input := numpy.dot(
self.parameters[layer_index]["weight"], x
)
+ self.parameters[layer_index]["bias"]
), # 加权输入,维度为[当前层神经元数量,样本数]
**(
output := self._batch_normalize(
input=weighted_input,
moving_average=self.parameters[layer_index][
"moving_average"
],
moving_variance=self.parameters[layer_index][
"moving_variance"
],
gamma=self.parameters[layer_index]["gamma"],
beta=self.parameters[layer_index]["beta"],
)
), # 加权输入的批归一化
"activation": (
activation := self._activate(
activate=self.parameters[layer_index]["activate"],
input=output["normalization"],
)
), # 输出
}
)
y_predict = activation # 将第L层输出层的输出作为输出层的输出预测
return y_predict
def _batch_normalize(
self,
input: numpy.ndarray,
moving_average: numpy.ndarray,
moving_variance: numpy.ndarray,
gamma: numpy.ndarray,
beta: numpy.ndarray,
) -> Dict[str, numpy.ndarray]:
"""
批归一化
:param input: 输入
:param moving_average: 批归一化的移动平均值,维度为[输入维度的行, 1]
:param moving_variance: 批归一化的移动方差,维度为[输入维度的行, 1]
:param gamma: 批归一化的缩放因子,维度为[输入维度的行, 1]
:param beta: 批归一化的平移因子,维度为[输入维度的行, 1]
:return: 批归一化后的输入,维度与输入相同
"""
return {
"average": (
average := (
numpy.mean(input, axis=1, keepdims=True)
if self.training
else moving_average
)
), # 就各行所有列求平均值,维度为[输入维度的行, 1]
"variance": (
variance := (
numpy.var(input, ddof=0, axis=1, keepdims=True)
if self.training
else moving_variance
)
), # 就各行所有列求方差,维度为[输入维度的行, 1]
"moving_average": (
self.momentum * moving_average + (1 - self.momentum) * average
if self.training
else moving_average
), # 更新批归一化的移动平均值
"moving_variance": (
self.momentum * moving_variance + (1 - self.momentum) * variance
if self.training
else moving_variance
), # 更新批归一化的移动方差
"standard_deviation": (
standard_deviation := numpy.sqrt(variance + self.epsilon)
), # 就各行所有列求标准差,维度为[输入维度的行, 1]
"normalization": (
(input - average) / standard_deviation * gamma + beta
), # 归一化后的输入,维度与输入相同
}
def _activate(
self,
activate: Literal["relu", "linear", "softmax"],
input: numpy.ndarray,
) -> numpy.ndarray:
"""
根据激活函数计算输入
:param activate: 激活函数
:param input: 输入,维度为[当前层神经元数量,样本数]
:return: 经过激活函数计算后的输入,维度为[当前层神经元数量,样本数]
"""
match activate:
case "relu":
return numpy.maximum(0, input)
case "linear":
return input
case "softmax":
# 加权输入的指数项
e_weighted_input = numpy.exp(
input - numpy.max(input, axis=0, keepdims=True)
) # 减去各样本所有神经元最大值以避免指数溢出
return e_weighted_input / numpy.sum(
e_weighted_input, axis=0, keepdims=True
)
def _calculate_loss(
self,
y_true: numpy.ndarray,
y_predict: numpy.ndarray,
) -> numpy.floating:
"""
计算损失
:param y_true: 输出层的输出真实,维度为[输出特征数, 样本数]
:param y_predict: 输出层的输出预测,维度为[输出特征数, 样本数]
:return: 损失值
"""
return (
0.5 * numpy.mean(numpy.square(y_true - y_predict))
if self.parameters[self.layer_counts]["activate"] == "linear"
else -1
* numpy.mean(
numpy.sum(
y_true
* numpy.log(numpy.clip(y_predict, self.epsilon, 1 - self.epsilon)),
axis=0,
)
)
) # 若输出层的激活函数为linear则损失函数使用均方误差否则使用交叉熵
def _backward_propagate(
self,
X: numpy.ndarray,
y_true: numpy.ndarray,
y_predict: numpy.ndarray,
) -> None:
"""
后向传播
:param X: 输入层的输入,维度为[输入特征数, 样本数]
:param y_true: 输出层的输出真实,维度为[输出特征数, 样本数]
:param y_predict: 输出层的输出预测,维度为[输出特征数, 样本数]
:return: 无
"""
sample_counts = X.shape[1] # 样本数
# 损失函数对输出层的就加权输入批归一化的梯度
self.parameters[self.layer_counts]["delta_normalization"] = (
y_predict - y_true
) / sample_counts # 均方误差和交叉熵对输出层的输出预测的梯度是相同的
for layer_index in range(self.layer_counts, 0, -1):
self.parameters[layer_index].update(
{
"delta_gamma": numpy.sum(
self.parameters[layer_index]["delta_normalization"]
* (
self.parameters[layer_index]["weighted_input"]
- self.parameters[layer_index]["weighted_input_average"]
)
/ self.parameters[layer_index][
"weighted_input_standard_deviation"
],
axis=1,
keepdims=True,
), # 批归一化的缩放因子的梯度
"delta_beta": numpy.sum(
self.parameters[layer_index]["delta_normalization"],
axis=1,
keepdims=True,
), # 批归一化的平移因子的梯度
"delta_weighted_input": (
delta_weighted_input := (
sample_counts
* self.parameters[layer_index]["gamma"]
* self.parameters[layer_index]["delta_normalization"]
- numpy.sum(
self.parameters[layer_index]["gamma"]
* self.parameters[layer_index]["delta_normalization"],
axis=1,
keepdims=True,
)
- (
(
self.parameters[layer_index]["weighted_input"]
- self.parameters[layer_index][
"weighted_input_average"
]
)
/ self.parameters[layer_index][
"weighted_input_standard_deviation"
]
)
* numpy.sum(
self.parameters[layer_index]["gamma"]
* self.parameters[layer_index]["delta_normalization"]
* (
(
self.parameters[layer_index]["weighted_input"]
- self.parameters[layer_index][
"weighted_input_average"
]
)
/ self.parameters[layer_index][
"weighted_input_standard_deviation"
]
),
axis=1,
keepdims=True,
)
)
* (1.0 / sample_counts)
/ self.parameters[layer_index][
"weighted_input_standard_deviation"
]
), # 加权输入的梯度
"delta_weight": numpy.dot(
delta_weighted_input,
(
X
if layer_index == 1
else self.parameters[layer_index - 1]["activation"]
).T,
), # 权重的梯度
"delta_bias": numpy.sum(
delta_weighted_input,
axis=1,
keepdims=True,
), # 偏置的梯度
}
)
if layer_index > 1:
self.parameters[layer_index - 1]["delta_normalization"] = numpy.dot(
self.parameters[layer_index]["weight"].T,
self.parameters[layer_index]["delta_weighted_input"],
) * (self.parameters[layer_index - 1]["normalization"] > 0).astype(
numpy.float32
)
def _update_parameters(self, learning_rate: float) -> None:
"""
更新神经网络参数
:param learning_rate: 学习率
:return: 无
"""
for layer_index in range(1, self.layer_counts + 1):
self.parameters[layer_index].update(
{
"weight": self.parameters[layer_index]["weight"]
- self.parameters[layer_index]["delta_weight"]
* learning_rate, # 权重
"bias": self.parameters[layer_index]["bias"]
- self.parameters[layer_index]["delta_bias"]
* learning_rate, # 平移
"gamma": self.parameters[layer_index]["gamma"]
- self.parameters[layer_index]["delta_gamma"]
* learning_rate, # 批归一化的缩放因子
"beta": self.parameters[layer_index]["beta"]
- self.parameters[layer_index]["delta_beta"]
* learning_rate, # 批归一化的平移因子
}
)
# 测试代码
if __name__ == "__main__":
# 生成测试数据(回归任务)
numpy.random.seed(42) # 设置随机种子保证可复现
X = numpy.random.randn(2, 100) * 5
# 真实函数y = 2*x1 + 3*x2 + 1 (加噪声)
y_true = 2 * X[0:1, :] ** 2 + 3 * X[1:2, :] + 1 + numpy.random.randn(1, 100) * 0.1
# 创建并训练神经网络
neural_network = NeuralNetwork(
structure=[2, 200, 100, 50, 1], # 2输入10隐藏神经元1输出
)
# 训练
neural_network.train(
X=X, y_true=y_true, target_loss=0.001, epochs=10000, learning_rate=0.001
)