Python/神经网络/main.py

356 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
神经网络
"""
# 导入模块
from typing import List, Literal, Optional, Dict
import numpy
class NeuralNetwork:
"""
神经网络
"""
HIDDEN_ACTIVATES = ["relu"]
OUTPUT_ACTIVATES = ["linear", "softmax"]
def __init__(
self,
structure: List[int],
hidden_activate: Literal["relu"] = "relu",
output_activate: Literal["linear", "softmax"] = "linear",
epsilon: float = 1e-9,
):
"""
初始化
:param structure: 神经网络结构,例如[2, 10, 1]表示2层神经网络具体为输入层2个神经元、隐含层10个神经元、输出层1个神经元
:param hidden_activate: 隐含层的激活函数默认为relu
:param output_activate: 输出层的激活函数默认为linear
:param epsilon: 极小值默认为1e-9
"""
print("正在初始化神经网络...", end="")
if not (
all(x >= 1 if isinstance(x, int) else False for x in structure)
if isinstance(structure, list) and len(structure) >= 3
else False
):
raise RuntimeError(
"神经网络结构应为长度大于等于3的列表且列表元素应为大于等于1的整数"
)
# 初始化神经网络结构
self.structure = structure
if hidden_activate not in self.HIDDEN_ACTIVATES:
raise RuntimeError(f"该隐含层激活函数 {hidden_activate} 暂不支持")
self.hidden_activate = hidden_activate
if output_activate not in self.OUTPUT_ACTIVATES:
raise RuntimeError(f"该输出层激活函数 {output_activate} 暂不支持")
self.output_activate = output_activate
# 神经网络层数定义第0层为输入层第L层为输出层L为神经网络层数第l层为隐含层l=1,2,...,L-1深度为L+1
self.layer_counts = len(structure) - 1
self.parameters = {0: {}}
# 初始化神经网络参数
for layer_index in range(1, self.layer_counts + 1):
# 上一层和当前层神经元数量
previous_layer_neuron_counts, current_layer_neuron_counts = (
self.structure[layer_index - 1],
self.structure[layer_index],
)
self.parameters[layer_index] = {
"weight": numpy.random.randn(
current_layer_neuron_counts, previous_layer_neuron_counts
)
* (
numpy.sqrt(2 / previous_layer_neuron_counts)
if layer_index < self.layer_counts
else (
numpy.sqrt(1 / previous_layer_neuron_counts)
if self.output_activate == "linear"
else numpy.sqrt(
2
/ (
previous_layer_neuron_counts
+ current_layer_neuron_counts
)
)
)
), # 权重,维度为[当前层神经元数量,上一层神经元数量],适配加权输入=权重*输入+平移。隐含层使用He初始化权重方法、输出层激活函数若为linear则使用标准Xavier初始化权重方法否则使用改进Xavier初始化权重方法
"bias": numpy.zeros((current_layer_neuron_counts, 1)), # 平移
"activate": (
self.hidden_activate
if layer_index < self.layer_counts
else self.output_activate
), # 激活函数
}
self.epsilon = epsilon
print("已完成")
def train(
self,
X: numpy.ndarray,
y_true: numpy.ndarray,
target_loss: float = 1e-3,
epochs: int = 200,
learning_rate: float = 0.001,
) -> None:
"""
训练神经网络
:param X: 输入,维度为[输入神经元数, 样本数]
:param y_true: 真实输出,维度为[输出神经元数, 样本数]
:param target_loss: 目标损失
:param epochs: 学习轮数
:param learning_rate: 学习率
:return: 无
"""
print(
f"开始训练:目标损失为 {target_loss},学习轮数为 {epochs},学习率为 {learning_rate}..."
)
if not (
X.shape[1] == y_true.shape[1]
and X.shape[0] == self.structure[0]
and y_true.shape[0] == self.structure[-1]
if isinstance(X, numpy.ndarray) and isinstance(y_true, numpy.ndarray)
else False
):
raise RuntimeError(
"输入和真实输出应为数组,其中输入维度应为[输入神经元数, 样本数],真实输出维度应为[输出神经元数, 样本数],样本数应需相同"
)
# 归一化输入
self.parameters[0]["activation"] = self._normalize(
input=X
) # 将输入作为输入层的输出
epoch = 1
while True:
# 前向传播
self._forward_propagate()
# 计算损失
loss = self._calculate_loss(y_true=y_true)
if loss <= target_loss:
print(
f"{epoch:6d} 轮损失已达到目标损失 {target_loss:9.3f},训练结束"
)
break
if epoch > epochs:
print(f"已达到最大学习轮数,损失为 {loss:9.3f},训练结束")
break
# 后向传播
self._backward_propagate(y_true=y_true)
# 更新神经网络参数
self._update_parameters(learning_rate=learning_rate)
if epoch % 100 == 0:
print(f"{epoch:6d} 轮损失为 {loss:9.3f},继续训练...")
epoch += 1
for idx in numpy.random.choice(X.shape[1], size=10, replace=False):
y_true_val = y_true[0, idx]
y_pred_val = self.parameters[self.layer_counts]["activation"][0, idx]
error = abs(y_true_val - y_pred_val)
print(f"{idx:<10} {y_true_val:<15.4f} {y_pred_val:<15.4f} {error:<15.4f}")
def _normalize(
self,
input: numpy.ndarray,
) -> numpy.ndarray:
"""
归一化
:param input: 输入
:return: 归一化后的输入,维度与输入相同
"""
return (input - numpy.mean(input, axis=1, keepdims=True)) / numpy.sqrt(
numpy.var(input, ddof=0, axis=1, keepdims=True) + self.epsilon
)
def _forward_propagate(self) -> None:
"""
前向传播
:return: 输出层的输出预测,维度为[输出神经元数, 样本数]
"""
for layer_index in range(1, self.layer_counts + 1):
self.parameters[layer_index].update(
{
"weighted_input": (
weighted_input := numpy.dot(
self.parameters[layer_index]["weight"],
self.parameters[layer_index - 1]["activation"],
)
+ self.parameters[layer_index]["bias"]
), # 加权输入,维度为[当前层神经元数量,样本数],将上一层的输出作为当前层的输入
"activation": (
activation := self._activate(
activate=self.parameters[layer_index]["activate"],
input=weighted_input,
)
), # 输出
}
)
def _activate(
self,
activate: Literal["relu", "linear", "softmax"],
input: numpy.ndarray,
) -> numpy.ndarray:
"""
根据激活函数计算输入
:param activate: 激活函数
:param input: 输入
:return: 经过激活函数计算后的输入,维度与输入相同
"""
match activate:
case "relu":
return numpy.maximum(0, input)
case "linear":
return input
case "softmax":
# 加权输入的指数项
e_weighted_input = numpy.exp(
input - numpy.max(input, axis=0, keepdims=True)
) # 减去各样本所有神经元最大值以避免指数溢出
return e_weighted_input / numpy.sum(
e_weighted_input, axis=0, keepdims=True
)
def _calculate_loss(
self,
y_true: numpy.ndarray,
) -> numpy.floating:
"""
计算损失
:param y_true: 真实输出,维度为[输出神经元数, 样本数]
:return: 损失
"""
return (
0.5
* numpy.mean(
numpy.square(y_true - self.parameters[self.layer_counts]["activation"])
)
if self.parameters[self.layer_counts]["activate"] == "linear"
else -1
* numpy.mean(
numpy.sum(
y_true
* numpy.log(
numpy.clip(
self.parameters[self.layer_counts]["activation"],
self.epsilon,
1 - self.epsilon,
)
),
axis=0,
)
)
) # 若输出层的激活函数为linear则损失函数基于均方误差否则基于交叉熵
def _backward_propagate(
self,
y_true: numpy.ndarray,
) -> None:
"""
后向传播
:param y_true: 真实输输出,维度为[输出神经元数, 样本数]
:return: 无
"""
sample_counts = X.shape[1] # 样本数
# 损失对输出层的加权输入的梯度
self.parameters[self.layer_counts]["delta_weighted_input"] = (
self.parameters[self.layer_counts]["activation"] - y_true
) / sample_counts # 损失函数基于均方误差和交叉熵对输出层的加权输入的梯度相同
for layer_index in range(self.layer_counts, 0, -1):
self.parameters[layer_index].update(
{
"delta_weight": numpy.dot(
self.parameters[layer_index]["delta_weighted_input"],
(
X
if layer_index == 1
else self.parameters[layer_index - 1]["activation"]
).T,
), # 权重的梯度
"delta_bias": numpy.sum(
self.parameters[layer_index]["delta_weighted_input"],
axis=1,
keepdims=True,
), # 偏置的梯度
}
)
if layer_index != 1:
self.parameters[layer_index - 1].update(
{
"delta_weighted_input": numpy.dot(
self.parameters[layer_index]["weight"].T,
self.parameters[layer_index]["delta_weighted_input"],
)
* self._activate_derivative(
activate=self.parameters[layer_index - 1]["activate"],
input=self.parameters[layer_index - 1]["weighted_input"],
),
}
)
def _activate_derivative(
self,
activate: Literal["relu"],
input: numpy.ndarray,
) -> numpy.ndarray:
"""
根据激活函数计算输入的导数
:param activate: 激活函数
:param input: 输入
:return: 经过激活函数计算后的输入,维度与输入相同
"""
match activate:
case "relu":
return numpy.where(input > 0, 1, 0)
def _update_parameters(self, learning_rate: float) -> None:
"""
更新神经网络参数
:param learning_rate: 学习率
:return: 无
"""
for layer_index in range(1, self.layer_counts + 1):
self.parameters[layer_index].update(
{
"weight": self.parameters[layer_index]["weight"]
- self.parameters[layer_index]["delta_weight"]
* learning_rate, # 权重
"bias": self.parameters[layer_index]["bias"]
- self.parameters[layer_index]["delta_bias"]
* learning_rate, # 平移
}
)
# 测试代码
if __name__ == "__main__":
# 生成测试数据(回归任务)
numpy.random.seed(42) # 设置随机种子保证可复现
X = numpy.random.randn(2, 100)
# 真实函数y = 2*x1 + 3*x2 + 1 (加噪声)
y_true = 2 * X[0:1, :] ** 2 + 3 * X[1:2, :] + 1
# 创建并训练神经网络
neural_network = NeuralNetwork(
structure=[2, 16, 4, 1], # 2输入10隐藏神经元1输出
)
# 训练
neural_network.train(
X=X, y_true=y_true, target_loss=0.001, epochs=1000, learning_rate=0.001
)
print(neural_network.parameters[2]["activation"])
print(neural_network.parameters[3]["activation"])