# -*- coding: utf-8 -*- """ 神经网络 """ # 导入模块 from typing import List, Literal import numpy class NeuralNetwork: """ 神经网络 """ HIDDEN_ACTIVATES = ["relu"] OUTPUT_ACTIVATES = ["linear", "softmax"] def __init__( self, structure: List[int], hidden_activate: Literal["relu"] = "relu", output_activate: Literal["linear", "softmax"] = "linear", epsilon: float = 1e-9, ): """ 初始化 :param structure: 神经网络结构,例如[2, 10, 1]表示2层神经网络,具体为输入层2个神经元、隐含层10个神经元、输出层1个神经元 :param hidden_activate: 隐含层的激活函数,默认为relu :param output_activate: 输出层的激活函数,默认为linear :param epsilon: 极小值,默认为1e-9 """ print("正在初始化神经网络...", end="") # 初始化神经网络结构 self.structure = structure # 神经网络层数 self.layer_counts = ( len(structure) - 1 ) # 定义第0层为输入层,第L层为输出层(L为神经网络层数),第l层为隐含层(l=1,2,...,L-1) if hidden_activate not in self.HIDDEN_ACTIVATES: raise ValueError(f"该隐含层激活函数 {hidden_activate} 暂不支持") self.hidden_activate = hidden_activate if output_activate not in self.OUTPUT_ACTIVATES: raise ValueError(f"该输出层激活函数 {output_activate} 暂不支持") self.output_activate = output_activate self.paramters = {} # 就隐含层和输出层初始化神经网络参数 for layer_index in range(1, self.layer_counts + 1): # 上一层和当前层神经元数量 previous_layer_neuron_counts, current_layer_neuron_counts = ( self.structure[layer_index - 1], self.structure[layer_index], ) self.paramters[layer_index] = { "weight": numpy.random.randn( current_layer_neuron_counts, previous_layer_neuron_counts ) * ( numpy.sqrt(2 / previous_layer_neuron_counts) if layer_index < self.layer_counts else ( numpy.sqrt(1 / previous_layer_neuron_counts) if self.output_activate == "linear" else numpy.sqrt( 2 / ( previous_layer_neuron_counts + current_layer_neuron_counts ) ) ) ), # 权重,权重维度为[当前层神经元数量,上一层神经元数量]、输入维度为[上一层神经元数量,样本数]以适配加权输=权重*输入+偏移。隐含层使用He初始化权重方法、输出层激活函数若为linear则使用标准Xavier初始化权重方法否则使用改进Xavier初始化权重方法 "bias": numpy.zeros((current_layer_neuron_counts, 1)), # 偏移 "gamma": numpy.ones( (current_layer_neuron_counts, 1) ), # 批标准化的缩放因子 "beta": numpy.zeros( (current_layer_neuron_counts, 1) ), # 批标准化的偏移因子 "activate": ( self.hidden_activate if layer_index < self.layer_counts else self.output_activate ), # 激活函数 } self.epsilon = epsilon print("已完成") def _forward_propagate(self, X: numpy.ndarray) -> numpy.ndarray: """ 前向传播 :param X: 输入层的输入,维度为[输入特征数, 样本数] :return: 输出层的输出预测,维度为[输出特征数, 样本数] """ activation = X # 将输入层的输入作为第0层的输出 for layer_index in range(1, self.layer_counts + 1): x = activation # 将上一层的输出作为当前层的输入 self.paramters[layer_index].update( { "weighted_input": ( weighted_input := numpy.dot( self.paramters[layer_index]["weight"], x ) ), # 加权输入 "weighted_input_average": ( weighted_input_average := numpy.mean( weighted_input, axis=1, keepdims=True ) ), # 加权输入的平均值 "weighted_input_standard_deviation": ( weighted_input_standard_deviation := numpy.sqrt( numpy.var(weighted_input, ddof=0, axis=1, keepdims=True) + self.epsilon ) ), # 加权输入的标准差 "batch_normalized_weighted_input": ( batch_normalized_weighted_input := ( weighted_input - weighted_input_average ) * self.paramters[layer_index]["gamma"] / weighted_input_standard_deviation + self.paramters[layer_index]["beta"] ), # 就加权输入批标准化 "activation": ( activation := self._activate( activate=self.paramters[layer_index]["activate"], weighted_input=batch_normalized_weighted_input, ) ), # 输出 } ) y_predict = activation # 将第L层(输出层)的输出作为输出层的输出预测 return y_predict def _activate( self, activate: Literal["relu", "linear", "softmax"], weighted_input: numpy.ndarray, ) -> numpy.ndarray: """ 根据激活函数计算输出 :param activate: 激活函数 :param weighted_input: 加权输入 :return: 输出 """ match activate: case "relu": return numpy.maximum(0, weighted_input) case "linear": return weighted_input case "softmax": # 加权输入的指数项 e_weighted_input = numpy.exp( weighted_input - numpy.max(weighted_input, axis=0, keepdims=True) ) return e_weighted_input / numpy.sum( e_weighted_input, axis=0, keepdims=True ) def _calculate_loss( self, y_true: numpy.ndarray, y_predict: numpy.ndarray, ) -> numpy.floating: """ 计算损失 :param y_true: 输出层的输出真实,维度为[输出特征数, 样本数] :param y_predict: 输出层的输出预测,维度为[输出特征数, 样本数] :return: 损失值 """ return ( 0.5 * numpy.mean(numpy.square(y_true - y_predict)) if self.paramters[self.layer_counts]["activate"] == "linear" else -1 * numpy.mean( numpy.sum( y_true * numpy.log(numpy.clip(y_predict, self.epsilon, 1 - self.epsilon)), axis=0, ) ) ) # 若输出层的激活函数为linear则损失函数使用均方误差否则使用交叉熵 def _backward_propagate( self, X: numpy.ndarray, y_true: numpy.ndarray, y_predict: numpy.ndarray, ) -> None: """ 后向传播 :param X: 输入层的输入,维度为[输入特征数, 样本数] :param y_true: 输出层的输出真实,维度为[输出特征数, 样本数] :param y_predict: 输出层的输出预测,维度为[输出特征数, 样本数] :return: 无 """ sample_counts = X.shape[1] # 样本数 # 损失函数对输出层的就加权输入批标准化的梯度 self.paramters[self.layer_counts]["delta_batch_normalized_weighted_input"] = ( y_predict - y_true ) / sample_counts # 均方误差和交叉熵对输出层的输出预测的梯度是相同的 for layer_index in range(self.layer_counts, 0, -1): self.paramters[layer_index].update( { "delta_gamma": numpy.sum( self.paramters[layer_index][ "delta_batch_normalized_weighted_input" ] * ( self.paramters[layer_index]["weighted_input"] - self.paramters[layer_index]["weighted_input_average"] ) / self.paramters[layer_index][ "weighted_input_standard_deviation" ], axis=1, keepdims=True, ), # 批标准化的缩放因子的梯度 "delta_beta": numpy.sum( self.paramters[layer_index][ "delta_batch_normalized_weighted_input" ], axis=1, keepdims=True, ), # 批标准化的偏移因子的梯度 "delta_weighted_input": ( delta_weighted_input := ( sample_counts * self.paramters[layer_index]["gamma"] * self.paramters[layer_index][ "delta_batch_normalized_weighted_input" ] - numpy.sum( self.paramters[layer_index]["gamma"] * self.paramters[layer_index][ "delta_batch_normalized_weighted_input" ], axis=1, keepdims=True, ) - ( ( self.paramters[layer_index]["weighted_input"] - self.paramters[layer_index][ "weighted_input_average" ] ) / self.paramters[layer_index][ "weighted_input_standard_deviation" ] ) * numpy.sum( self.paramters[layer_index]["gamma"] * self.paramters[layer_index][ "delta_batch_normalized_weighted_input" ] * ( ( self.paramters[layer_index]["weighted_input"] - self.paramters[layer_index][ "weighted_input_average" ] ) / self.paramters[layer_index][ "weighted_input_standard_deviation" ] ), axis=1, keepdims=True, ) ) * (1.0 / sample_counts) / self.paramters[layer_index][ "weighted_input_standard_deviation" ] ), # 加权输入的梯度 "delta_weight": numpy.dot( delta_weighted_input, ( X if layer_index == 1 else self.paramters[layer_index - 1]["activation"] ).T, ), # 权重的梯度 "delta_bias": numpy.sum( delta_weighted_input, axis=1, keepdims=True, ), # 偏置的梯度 } ) if layer_index > 1: self.paramters[layer_index - 1][ "delta_batch_normalized_weighted_input" ] = numpy.dot( self.paramters[layer_index]["weight"].T, self.paramters[layer_index]["delta_weighted_input"], ) * ( self.paramters[layer_index - 1]["batch_normalized_weighted_input"] > 0 ).astype( numpy.float32 ) def train( self, X: numpy.ndarray, y_true: numpy.ndarray, target_loss: float = 1e-3, epochs: int = 200, learning_rate: float = 0.001, ) -> None: """ 训练神经网络 :param X: 输入层的输入 :param y_true: 输出层的输出真实 :param target_loss: 目标损失 :param epochs: 学习轮数 :param learning_rate: 学习率 :return: 无 """ print( f"开始训练:目标损失为 {target_loss},学习轮数为 {epochs},学习率为 {learning_rate}..." ) # 标准化 X = (X - numpy.mean(X, axis=1, keepdims=True)) / ( numpy.std(X, axis=1, keepdims=True) + self.epsilon ) epoch = 1 while True: # 前向传播 y_predict = self._forward_propagate(X=X) loss = self._calculate_loss(y_true=y_true, y_predict=y_predict) if loss < target_loss: print( f" 第 {epoch} 轮损失为 {loss},已达到目标损失 {target_loss},训练结束" ) break if epoch >= epochs: print( f" 第 {epoch} 轮损失为 {loss},已达到最大学习轮数 {epochs},训练结束" ) break if epoch % 50 == 0: print(f" 第 {epoch} 轮损失为 {loss},继续训练...") # 后向传播 self._backward_propagate(X=X, y_true=y_true, y_predict=y_predict) # 更新神经网络参数 self._update_parameters(learning_rate=learning_rate) epoch += 1 for idx in numpy.random.choice(X.shape[1], size=10, replace=False): y_true_val = y_true[0, idx] y_pred_val = y_predict[0, idx] error = abs(y_true_val - y_pred_val) print(f"{idx:<10} {y_true_val:<15.4f} {y_pred_val:<15.4f} {error:<15.4f}") def _update_parameters(self, learning_rate: float) -> None: """ 更新神经网络参数 :param learning_rate: 学习率 :return: 无 """ for layer_index in range(1, self.layer_counts + 1): self.paramters[layer_index].update( { "weight": self.paramters[layer_index]["weight"] - self.paramters[layer_index]["delta_weight"] * learning_rate, "bias": self.paramters[layer_index]["bias"] - self.paramters[layer_index]["delta_bias"] * learning_rate, "gamma": self.paramters[layer_index]["gamma"] - self.paramters[layer_index]["delta_gamma"] * learning_rate, "beta": self.paramters[layer_index]["beta"] - self.paramters[layer_index]["delta_beta"] * learning_rate, } ) # 测试代码 if __name__ == "__main__": # 生成测试数据(回归任务) numpy.random.seed(42) # 设置随机种子保证可复现 X = numpy.random.randn(2, 100) * 5 # 真实函数:y = 2*x1 + 3*x2 + 1 (加噪声) y_true = 2 * X[0:1, :]**2 + 3 * X[1:2, :] + 1 + numpy.random.randn(1, 100) * 0.1 # 创建并训练神经网络 neural_network = NeuralNetwork( structure=[2, 200, 100, 50, 1], # 2输入,10隐藏神经元,1输出 ) # 训练 neural_network.train( X=X, y_true=y_true, target_loss=0.001, epochs=10000, learning_rate=0.001 )