From 68f11f911f38d71ae435783c7d1683f48073e4a3 Mon Sep 17 00:00:00 2001 From: liubiren Date: Wed, 21 Jan 2026 22:02:53 +0800 Subject: [PATCH] 1 --- 神经网络/main.py | 389 ++++++++++++++++++++++++++++------------------- 1 file changed, 231 insertions(+), 158 deletions(-) diff --git a/神经网络/main.py b/神经网络/main.py index f447389..c62e992 100644 --- a/神经网络/main.py +++ b/神经网络/main.py @@ -4,7 +4,7 @@ """ # 导入模块 -from typing import List, Literal +from typing import List, Literal, Optional, Dict import numpy @@ -21,6 +21,7 @@ class NeuralNetwork: structure: List[int], hidden_activate: Literal["relu"] = "relu", output_activate: Literal["linear", "softmax"] = "linear", + momentum: float = 0.9, epsilon: float = 1e-9, ): """ @@ -28,34 +29,41 @@ class NeuralNetwork: :param structure: 神经网络结构,例如[2, 10, 1]表示2层神经网络,具体为输入层2个神经元、隐含层10个神经元、输出层1个神经元 :param hidden_activate: 隐含层的激活函数,默认为relu :param output_activate: 输出层的激活函数,默认为linear + :param momentum: 动量因子,默认为0.9 :param epsilon: 极小值,默认为1e-9 """ print("正在初始化神经网络...", end="") + if not ( + all(x >= 1 if isinstance(x, int) else False for x in structure) + if isinstance(structure, list) and len(structure) >= 3 + else False + ): + raise RuntimeError( + "神经网络结构应为长度大于等于3的列表且列表元素应为大于等于1的整数" + ) # 初始化神经网络结构 self.structure = structure - # 神经网络层数 - self.layer_counts = ( - len(structure) - 1 - ) # 定义第0层为输入层,第L层为输出层(L为神经网络层数),第l层为隐含层(l=1,2,...,L-1) if hidden_activate not in self.HIDDEN_ACTIVATES: - raise ValueError(f"该隐含层激活函数 {hidden_activate} 暂不支持") + raise RuntimeError(f"该隐含层激活函数 {hidden_activate} 暂不支持") self.hidden_activate = hidden_activate - if output_activate not in self.OUTPUT_ACTIVATES: - raise ValueError(f"该输出层激活函数 {output_activate} 暂不支持") + raise RuntimeError(f"该输出层激活函数 {output_activate} 暂不支持") self.output_activate = output_activate - self.paramters = {} - # 就隐含层和输出层初始化神经网络参数 + # 神经网络层数(定义第0层为输入层,第L层为输出层(L为神经网络层数),第l层为隐含层(l=1,2,...,L-1),深度为L+1) + self.layer_counts = len(structure) - 1 + + self.parameters = {} + # 初始化神经网络参数 for layer_index in range(1, self.layer_counts + 1): # 上一层和当前层神经元数量 previous_layer_neuron_counts, current_layer_neuron_counts = ( self.structure[layer_index - 1], self.structure[layer_index], ) - self.paramters[layer_index] = { + self.parameters[layer_index] = { "weight": numpy.random.randn( current_layer_neuron_counts, previous_layer_neuron_counts ) @@ -73,14 +81,20 @@ class NeuralNetwork: ) ) ) - ), # 权重,权重维度为[当前层神经元数量,上一层神经元数量]、输入维度为[上一层神经元数量,样本数]以适配加权输=权重*输入+偏移。隐含层使用He初始化权重方法、输出层激活函数若为linear则使用标准Xavier初始化权重方法否则使用改进Xavier初始化权重方法 - "bias": numpy.zeros((current_layer_neuron_counts, 1)), # 偏移 + ), # 权重,维度为[当前层神经元数量,上一层神经元数量],适配加权输入=权重*输入+平移。隐含层使用He初始化权重方法、输出层激活函数若为linear则使用标准Xavier初始化权重方法否则使用改进Xavier初始化权重方法 + "bias": numpy.zeros((current_layer_neuron_counts, 1)), # 平移 + "moving_average": numpy.zeros( + (current_layer_neuron_counts, 1) + ), # 批归一化的移动平均值 + "moving_variance": numpy.ones( + (current_layer_neuron_counts, 1) + ), # 批归一化的移动方差 "gamma": numpy.ones( (current_layer_neuron_counts, 1) - ), # 批标准化的缩放因子 + ), # 批归一化的缩放因子 "beta": numpy.zeros( (current_layer_neuron_counts, 1) - ), # 批标准化的偏移因子 + ), # 批归一化的平移因子 "activate": ( self.hidden_activate if layer_index < self.layer_counts @@ -88,10 +102,92 @@ class NeuralNetwork: ), # 激活函数 } + self.momentum = momentum + # 初始化是否训练模式 + self.training = None + self.epsilon = epsilon print("已完成") + def train( + self, + X: numpy.ndarray, + y_true: numpy.ndarray, + target_loss: float = 1e-3, + epochs: int = 200, + learning_rate: float = 0.001, + ) -> None: + """ + 训练神经网络 + :param X: 输入层的输入,维度为[输入特征数, 样本数] + :param y_true: 输出层的输出真实,维度为[输出特征数, 样本数] + :param target_loss: 目标损失 + :param epochs: 学习轮数 + :param learning_rate: 学习率 + :return: 无 + """ + print( + f"开始训练:目标损失为 {target_loss},学习轮数为 {epochs},学习率为 {learning_rate}..." + ) + if not ( + X.shape[1] == y_true.shape[1] + and X.shape[0] == self.structure[0] + and y_true.shape[0] == self.structure[-1] + if isinstance(X, numpy.ndarray) and isinstance(y_true, numpy.ndarray) + else False + ): + raise RuntimeError( + f"输入层的输入和输出层的输出应为数组,其中输入层的输入维度应为[输入特征数, 样本数],输出层的输出维度应为[输出特征数, 样本数]。样本数应相同,输入特征数应为 {self.structure[0]},输出特征数应为 {self.structure[-1]}" + ) + # 开启训练模式 + self.training = True + # 归一化输入层的输入 + X = self._normalize(input=X) + + epoch = 0 + while True: + # 前向传播 + y_predict = self._forward_propagate(X=X) + # 计算损失 + loss = self._calculate_loss(y_true=y_true, y_predict=y_predict) + if loss <= target_loss: + print( + f"第 {epoch:6d} 轮损失已达到目标损失 {target_loss:9.3f},训练结束" + ) + break + if epoch > epochs: + print(f"已达到最大学习轮数,损失为 {loss:9.3f},训练结束") + break + + # 后向传播 + self._backward_propagate(X=X, y_true=y_true, y_predict=y_predict) + # 更新神经网络参数 + self._update_parameters(learning_rate=learning_rate) + + if epoch % 100 == 0: + print(f"第 {epoch:6d} 轮损失为 {loss:9.3f},继续训练...") + epoch += 1 + + for idx in numpy.random.choice(X.shape[1], size=10, replace=False): + y_true_val = y_true[0, idx] + y_pred_val = y_predict[0, idx] + error = abs(y_true_val - y_pred_val) + print(f"{idx:<10} {y_true_val:<15.4f} {y_pred_val:<15.4f} {error:<15.4f}") + + def _normalize( + self, + input: numpy.ndarray, + ) -> numpy.ndarray: + """ + 归一化 + :param input: 输入 + :return: 归一化后的输入,维度与输入相同 + """ + return (input - numpy.mean(input, axis=1, keepdims=True)) / numpy.sqrt( + numpy.var(input, ddof=0, axis=1, keepdims=True) + self.epsilon + ) + def _forward_propagate(self, X: numpy.ndarray) -> numpy.ndarray: """ 前向传播 @@ -100,37 +196,32 @@ class NeuralNetwork: """ activation = X # 将输入层的输入作为第0层的输出 for layer_index in range(1, self.layer_counts + 1): - x = activation # 将上一层的输出作为当前层的输入 - self.paramters[layer_index].update( + self.parameters[layer_index].update( { + "x": (x := activation), # 将上一层的输出作为当前层的输入 "weighted_input": ( weighted_input := numpy.dot( - self.paramters[layer_index]["weight"], x + self.parameters[layer_index]["weight"], x ) - ), # 加权输入 - "weighted_input_average": ( - weighted_input_average := numpy.mean( - weighted_input, axis=1, keepdims=True + + self.parameters[layer_index]["bias"] + ), # 加权输入,维度为[当前层神经元数量,样本数] + **( + output := self._batch_normalize( + input=weighted_input, + moving_average=self.parameters[layer_index][ + "moving_average" + ], + moving_variance=self.parameters[layer_index][ + "moving_variance" + ], + gamma=self.parameters[layer_index]["gamma"], + beta=self.parameters[layer_index]["beta"], ) - ), # 加权输入的平均值 - "weighted_input_standard_deviation": ( - weighted_input_standard_deviation := numpy.sqrt( - numpy.var(weighted_input, ddof=0, axis=1, keepdims=True) - + self.epsilon - ) - ), # 加权输入的标准差 - "batch_normalized_weighted_input": ( - batch_normalized_weighted_input := ( - weighted_input - weighted_input_average - ) - * self.paramters[layer_index]["gamma"] - / weighted_input_standard_deviation - + self.paramters[layer_index]["beta"] - ), # 就加权输入批标准化 + ), # 加权输入的批归一化 "activation": ( activation := self._activate( - activate=self.paramters[layer_index]["activate"], - weighted_input=batch_normalized_weighted_input, + activate=self.parameters[layer_index]["activate"], + input=output["normalization"], ) ), # 输出 } @@ -139,27 +230,77 @@ class NeuralNetwork: y_predict = activation # 将第L层(输出层)的输出作为输出层的输出预测 return y_predict + def _batch_normalize( + self, + input: numpy.ndarray, + moving_average: numpy.ndarray, + moving_variance: numpy.ndarray, + gamma: numpy.ndarray, + beta: numpy.ndarray, + ) -> Dict[str, numpy.ndarray]: + """ + 批归一化 + :param input: 输入 + :param moving_average: 批归一化的移动平均值,维度为[输入维度的行, 1] + :param moving_variance: 批归一化的移动方差,维度为[输入维度的行, 1] + :param gamma: 批归一化的缩放因子,维度为[输入维度的行, 1] + :param beta: 批归一化的平移因子,维度为[输入维度的行, 1] + :return: 批归一化后的输入,维度与输入相同 + """ + return { + "average": ( + average := ( + numpy.mean(input, axis=1, keepdims=True) + if self.training + else moving_average + ) + ), # 就各行所有列求平均值,维度为[输入维度的行, 1] + "variance": ( + variance := ( + numpy.var(input, ddof=0, axis=1, keepdims=True) + if self.training + else moving_variance + ) + ), # 就各行所有列求方差,维度为[输入维度的行, 1] + "moving_average": ( + self.momentum * moving_average + (1 - self.momentum) * average + if self.training + else moving_average + ), # 更新批归一化的移动平均值 + "moving_variance": ( + self.momentum * moving_variance + (1 - self.momentum) * variance + if self.training + else moving_variance + ), # 更新批归一化的移动方差 + "standard_deviation": ( + standard_deviation := numpy.sqrt(variance + self.epsilon) + ), # 就各行所有列求标准差,维度为[输入维度的行, 1] + "normalization": ( + (input - average) / standard_deviation * gamma + beta + ), # 归一化后的输入,维度与输入相同 + } + def _activate( self, activate: Literal["relu", "linear", "softmax"], - weighted_input: numpy.ndarray, + input: numpy.ndarray, ) -> numpy.ndarray: """ - 根据激活函数计算输出 + 根据激活函数计算输入 :param activate: 激活函数 - :param weighted_input: 加权输入 - :return: 输出 + :param input: 输入,维度为[当前层神经元数量,样本数] + :return: 经过激活函数计算后的输入,维度为[当前层神经元数量,样本数] """ match activate: case "relu": - return numpy.maximum(0, weighted_input) + return numpy.maximum(0, input) case "linear": - return weighted_input + return input case "softmax": # 加权输入的指数项 e_weighted_input = numpy.exp( - weighted_input - numpy.max(weighted_input, axis=0, keepdims=True) - ) + input - numpy.max(input, axis=0, keepdims=True) + ) # 减去各样本所有神经元最大值以避免指数溢出 return e_weighted_input / numpy.sum( e_weighted_input, axis=0, keepdims=True ) @@ -177,7 +318,7 @@ class NeuralNetwork: """ return ( 0.5 * numpy.mean(numpy.square(y_true - y_predict)) - if self.paramters[self.layer_counts]["activate"] == "linear" + if self.parameters[self.layer_counts]["activate"] == "linear" else -1 * numpy.mean( numpy.sum( @@ -203,74 +344,64 @@ class NeuralNetwork: """ sample_counts = X.shape[1] # 样本数 - # 损失函数对输出层的就加权输入批标准化的梯度 - self.paramters[self.layer_counts]["delta_batch_normalized_weighted_input"] = ( + # 损失函数对输出层的就加权输入批归一化的梯度 + self.parameters[self.layer_counts]["delta_normalization"] = ( y_predict - y_true ) / sample_counts # 均方误差和交叉熵对输出层的输出预测的梯度是相同的 for layer_index in range(self.layer_counts, 0, -1): - self.paramters[layer_index].update( + self.parameters[layer_index].update( { "delta_gamma": numpy.sum( - self.paramters[layer_index][ - "delta_batch_normalized_weighted_input" - ] + self.parameters[layer_index]["delta_normalization"] * ( - self.paramters[layer_index]["weighted_input"] - - self.paramters[layer_index]["weighted_input_average"] + self.parameters[layer_index]["weighted_input"] + - self.parameters[layer_index]["weighted_input_average"] ) - / self.paramters[layer_index][ + / self.parameters[layer_index][ "weighted_input_standard_deviation" ], axis=1, keepdims=True, - ), # 批标准化的缩放因子的梯度 + ), # 批归一化的缩放因子的梯度 "delta_beta": numpy.sum( - self.paramters[layer_index][ - "delta_batch_normalized_weighted_input" - ], + self.parameters[layer_index]["delta_normalization"], axis=1, keepdims=True, - ), # 批标准化的偏移因子的梯度 + ), # 批归一化的平移因子的梯度 "delta_weighted_input": ( delta_weighted_input := ( sample_counts - * self.paramters[layer_index]["gamma"] - * self.paramters[layer_index][ - "delta_batch_normalized_weighted_input" - ] + * self.parameters[layer_index]["gamma"] + * self.parameters[layer_index]["delta_normalization"] - numpy.sum( - self.paramters[layer_index]["gamma"] - * self.paramters[layer_index][ - "delta_batch_normalized_weighted_input" - ], + self.parameters[layer_index]["gamma"] + * self.parameters[layer_index]["delta_normalization"], axis=1, keepdims=True, ) - ( ( - self.paramters[layer_index]["weighted_input"] - - self.paramters[layer_index][ + self.parameters[layer_index]["weighted_input"] + - self.parameters[layer_index][ "weighted_input_average" ] ) - / self.paramters[layer_index][ + / self.parameters[layer_index][ "weighted_input_standard_deviation" ] ) * numpy.sum( - self.paramters[layer_index]["gamma"] - * self.paramters[layer_index][ - "delta_batch_normalized_weighted_input" - ] + self.parameters[layer_index]["gamma"] + * self.parameters[layer_index]["delta_normalization"] * ( ( - self.paramters[layer_index]["weighted_input"] - - self.paramters[layer_index][ + self.parameters[layer_index]["weighted_input"] + - self.parameters[layer_index][ "weighted_input_average" ] ) - / self.paramters[layer_index][ + / self.parameters[layer_index][ "weighted_input_standard_deviation" ] ), @@ -279,7 +410,7 @@ class NeuralNetwork: ) ) * (1.0 / sample_counts) - / self.paramters[layer_index][ + / self.parameters[layer_index][ "weighted_input_standard_deviation" ] ), # 加权输入的梯度 @@ -288,7 +419,7 @@ class NeuralNetwork: ( X if layer_index == 1 - else self.paramters[layer_index - 1]["activation"] + else self.parameters[layer_index - 1]["activation"] ).T, ), # 权重的梯度 "delta_bias": numpy.sum( @@ -300,75 +431,13 @@ class NeuralNetwork: ) if layer_index > 1: - self.paramters[layer_index - 1][ - "delta_batch_normalized_weighted_input" - ] = numpy.dot( - self.paramters[layer_index]["weight"].T, - self.paramters[layer_index]["delta_weighted_input"], - ) * ( - self.paramters[layer_index - 1]["batch_normalized_weighted_input"] - > 0 - ).astype( + self.parameters[layer_index - 1]["delta_normalization"] = numpy.dot( + self.parameters[layer_index]["weight"].T, + self.parameters[layer_index]["delta_weighted_input"], + ) * (self.parameters[layer_index - 1]["normalization"] > 0).astype( numpy.float32 ) - def train( - self, - X: numpy.ndarray, - y_true: numpy.ndarray, - target_loss: float = 1e-3, - epochs: int = 200, - learning_rate: float = 0.001, - ) -> None: - """ - 训练神经网络 - :param X: 输入层的输入 - :param y_true: 输出层的输出真实 - :param target_loss: 目标损失 - :param epochs: 学习轮数 - :param learning_rate: 学习率 - :return: 无 - """ - print( - f"开始训练:目标损失为 {target_loss},学习轮数为 {epochs},学习率为 {learning_rate}..." - ) - # 标准化 - X = (X - numpy.mean(X, axis=1, keepdims=True)) / ( - numpy.std(X, axis=1, keepdims=True) + self.epsilon - ) - epoch = 1 - while True: - # 前向传播 - y_predict = self._forward_propagate(X=X) - - loss = self._calculate_loss(y_true=y_true, y_predict=y_predict) - if loss < target_loss: - print( - f" 第 {epoch} 轮损失为 {loss},已达到目标损失 {target_loss},训练结束" - ) - break - if epoch >= epochs: - print( - f" 第 {epoch} 轮损失为 {loss},已达到最大学习轮数 {epochs},训练结束" - ) - break - if epoch % 50 == 0: - print(f" 第 {epoch} 轮损失为 {loss},继续训练...") - - # 后向传播 - self._backward_propagate(X=X, y_true=y_true, y_predict=y_predict) - - # 更新神经网络参数 - self._update_parameters(learning_rate=learning_rate) - - epoch += 1 - - for idx in numpy.random.choice(X.shape[1], size=10, replace=False): - y_true_val = y_true[0, idx] - y_pred_val = y_predict[0, idx] - error = abs(y_true_val - y_pred_val) - print(f"{idx:<10} {y_true_val:<15.4f} {y_pred_val:<15.4f} {error:<15.4f}") - def _update_parameters(self, learning_rate: float) -> None: """ 更新神经网络参数 @@ -376,16 +445,20 @@ class NeuralNetwork: :return: 无 """ for layer_index in range(1, self.layer_counts + 1): - self.paramters[layer_index].update( + self.parameters[layer_index].update( { - "weight": self.paramters[layer_index]["weight"] - - self.paramters[layer_index]["delta_weight"] * learning_rate, - "bias": self.paramters[layer_index]["bias"] - - self.paramters[layer_index]["delta_bias"] * learning_rate, - "gamma": self.paramters[layer_index]["gamma"] - - self.paramters[layer_index]["delta_gamma"] * learning_rate, - "beta": self.paramters[layer_index]["beta"] - - self.paramters[layer_index]["delta_beta"] * learning_rate, + "weight": self.parameters[layer_index]["weight"] + - self.parameters[layer_index]["delta_weight"] + * learning_rate, # 权重 + "bias": self.parameters[layer_index]["bias"] + - self.parameters[layer_index]["delta_bias"] + * learning_rate, # 平移 + "gamma": self.parameters[layer_index]["gamma"] + - self.parameters[layer_index]["delta_gamma"] + * learning_rate, # 批归一化的缩放因子 + "beta": self.parameters[layer_index]["beta"] + - self.parameters[layer_index]["delta_beta"] + * learning_rate, # 批归一化的平移因子 } ) @@ -396,7 +469,7 @@ if __name__ == "__main__": numpy.random.seed(42) # 设置随机种子保证可复现 X = numpy.random.randn(2, 100) * 5 # 真实函数:y = 2*x1 + 3*x2 + 1 (加噪声) - y_true = 2 * X[0:1, :]**2 + 3 * X[1:2, :] + 1 + numpy.random.randn(1, 100) * 0.1 + y_true = 2 * X[0:1, :] ** 2 + 3 * X[1:2, :] + 1 + numpy.random.randn(1, 100) * 0.1 # 创建并训练神经网络 neural_network = NeuralNetwork(