# -*- coding: utf-8 -*- """ 神经网络(多层感知机) """ # 导入模块 from typing import List, Literal, Optional, Dict import numpy class NeuralNetwork: """ 神经网络 """ HIDDEN_ACTIVATES = ["relu"] OUTPUT_ACTIVATES = ["linear", "softmax"] def __init__( self, structure: List[int], hidden_activate: Literal["relu"] = "relu", output_activate: Literal["linear", "softmax"] = "linear", seed: int = 52, epsilon: float = 1e-9, ): """ 初始化 :param structure: 神经网络结构,例如[2, 10, 1]表示2层神经网络,具体为输入层2个神经元、隐含层10个神经元、输出层1个神经元 :param hidden_activate: 隐含层的激活函数,默认为relu :param output_activate: 输出层的激活函数,默认为linear :param seed: 随机种子,默认为52 :param epsilon: 极小值,默认为1e-9 """ print("正在初始化神经网络...", end="") if not ( all(x >= 1 if isinstance(x, int) else False for x in structure) if isinstance(structure, list) and len(structure) >= 3 else False ): raise RuntimeError( "神经网络结构应为长度大于等于3的列表且列表元素应为大于等于1的整数" ) # 初始化神经网络结构 self.structure = structure if hidden_activate not in self.HIDDEN_ACTIVATES: raise RuntimeError(f"该隐含层激活函数 {hidden_activate} 暂不支持") self.hidden_activate = hidden_activate if output_activate not in self.OUTPUT_ACTIVATES: raise RuntimeError(f"该输出层激活函数 {output_activate} 暂不支持") self.output_activate = output_activate # 神经网络层数(定义,第0层为输入层,第l层为隐含层(l=1,2,...,L-1),第L层为输出层(L为神经网络层数),深度为L+1) self.layer_counts = len(structure) - 1 numpy.random.seed(seed) # 设置随机种子 self.parameters = {} # 初始化神经网络参数 for layer_index in range(1, self.layer_counts + 1): # 上一层和当前层神经元数 previous_layer_neuron_counts, current_layer_neuron_counts = ( self.structure[layer_index - 1], self.structure[layer_index], ) self.parameters[layer_index] = { "activate": ( activate := ( self.hidden_activate if layer_index != self.layer_counts else self.output_activate ) ), # 激活函数 "weight": numpy.random.randn( current_layer_neuron_counts, previous_layer_neuron_counts ) * self._calculate_init_weight_scale( activate=activate, previous_layer_neuron_counts=previous_layer_neuron_counts, current_layer_neuron_counts=current_layer_neuron_counts, ), # 权重,维度为[当前层神经元数,上一层神经元数],适配加权输入=权重*输入+平移 "bias": numpy.zeros((current_layer_neuron_counts, 1)), # 平移 } self.epsilon = epsilon print("已完成") def _calculate_init_weight_scale( self, activate: Literal["relu", "linear", "softmax"], previous_layer_neuron_counts: int, current_layer_neuron_counts: int, ) -> numpy.floating: """ 计算初始化权重时缩放因子 :param activate: 激活函数 :param previous_layer_neuron_counts: 上一层神经元数 :param current_layer_neuron_counts: 当前层神经元数 :return: 初始化权重时缩放因子 """ match activate: case "relu": return numpy.sqrt( 2 / previous_layer_neuron_counts ) # 使用He初始化权重方法 case "linear": return numpy.sqrt( 2 / previous_layer_neuron_counts ) # 使用He初始化权重方法 case "softmax": return numpy.sqrt( 2 / (previous_layer_neuron_counts + current_layer_neuron_counts) ) # 使用Xavier初始化权重方法 def train( self, X: numpy.ndarray, y_true: numpy.ndarray, target_loss: float = 1e-3, epochs: int = 200, learning_rate: float = 0.001, ) -> None: """ 训练神经网络 :param X: 输入,维度为[输入神经元数, 样本数] :param y_true: 真实输出,维度为[输出神经元数, 样本数] :param target_loss: 目标损失 :param epochs: 学习轮数 :param learning_rate: 学习率 :return: 无 """ print( f"开始训练神经网络:目标损失为 {target_loss},学习轮数为 {epochs},学习率为 {learning_rate}..." ) if not ( X.shape[1] == y_true.shape[1] and X.shape[0] == self.structure[0] and y_true.shape[0] == self.structure[-1] if isinstance(X, numpy.ndarray) and isinstance(y_true, numpy.ndarray) else False ): raise RuntimeError( "输入和真实输出应为数组,其中输入维度应为[输入神经元数, 样本数],真实输出维度应为[输出神经元数, 样本数],样本数应需相同" ) # 归一化输入并将其作为输入层的输出 self.parameters[0] = {"activation": self._normalize(input=X)} epoch = 1 while True: # 前向传播 self._forward_propagate() # 计算损失 loss = self._calculate_loss(y_true=y_true) if loss <= target_loss: print( f"第 {epoch:6d} 轮损失已达到目标损失 {target_loss:9.3f},训练结束" ) break if epoch > epochs: print(f"已达到最大学习轮数,损失为 {loss:9.3f},训练结束") break # 后向传播 self._backward_propagate(y_true=y_true) # 更新神经网络参数 self._update_parameters(learning_rate=learning_rate) if epoch % 100 == 0: print(f"第 {epoch:6d} 轮损失为 {loss:9.3f},继续训练...") epoch += 1 def _normalize( self, input: numpy.ndarray, ) -> numpy.ndarray: """ 归一化 :param input: 输入 :return: 归一化后的输入,维度与输入相同 """ return (input - numpy.mean(input, axis=1, keepdims=True)) / numpy.sqrt( numpy.var(input, ddof=0, axis=1, keepdims=True) + self.epsilon ) def _forward_propagate(self) -> None: """ 前向传播 :return: 输出层的输出预测,维度为[输出神经元数, 样本数] """ for layer_index in range(1, self.layer_counts + 1): self.parameters[layer_index].update( { "weighted_input": ( weighted_input := numpy.dot( self.parameters[layer_index]["weight"], self.parameters[layer_index - 1][ "activation" ], # 将上一层的输出作为当前层的输入 ) + self.parameters[layer_index]["bias"] ), # 加权输入,维度为[当前层神经元数,样本数],将上一层的输出作为当前层的输入 "activation": self._activate( activate=self.parameters[layer_index]["activate"], input=weighted_input, ), # 输出 } ) def _activate( self, activate: Literal["relu", "linear", "softmax"], input: numpy.ndarray, ) -> numpy.ndarray: """ 激活 :param activate: 激活函数 :param input: 输入 :return: 经过激活函数计算后的输入,维度与输入相同 """ match activate: case "relu": return numpy.maximum(0, input) case "linear": return input case "softmax": # 加权输入的指数项 e_weighted_input = numpy.exp( input - numpy.max(input, axis=0, keepdims=True) ) # 减去各样本所有神经元最大值以避免指数溢出 return e_weighted_input / numpy.sum( e_weighted_input, axis=0, keepdims=True ) def _calculate_loss( self, y_true: numpy.ndarray, ) -> numpy.floating: """ 计算损失 :param y_true: 真实输出,维度为[输出神经元数, 样本数] :return: 损失 """ match self.parameters[self.layer_counts]["activate"]: case "linear": return 0.5 * numpy.mean( numpy.square( y_true - self.parameters[self.layer_counts]["activation"] ) ) # 若输出层的激活函数为linear则损失函数使用0.5*均方误差 case "softmax": return numpy.mean( ( numpy.max( self.parameters[self.layer_counts]["weighted_input"], axis=0, keepdims=True, ) + numpy.log( numpy.sum( numpy.exp( self.parameters[self.layer_counts]["weighted_input"] - numpy.max( self.parameters[self.layer_counts][ "weighted_input" ], axis=0, keepdims=True, ) ), axis=0, keepdims=True, ) ) ).squeeze() - numpy.sum( y_true * self.parameters[self.layer_counts]["weighted_input"], axis=0, ) ) # 若输出层的激活函数为softmax则损失函数使用交叉熵 case _: raise RuntimeError( f"该激活函数 {self.parameters[self.layer_counts]["activate"]} 暂不支持" ) def _backward_propagate( self, y_true: numpy.ndarray, ) -> None: """ 后向传播 :param y_true: 真实输输出,维度为[输出神经元数, 样本数] :return: 无 """ sample_counts = y_true.shape[1] # 样本数 for layer_index in range(self.layer_counts, 0, -1): self.parameters[layer_index].update( { "delta_activation": ( delta_activation := ( (self.parameters[self.layer_counts]["activation"] - y_true) / sample_counts if layer_index == self.layer_counts else numpy.dot( self.parameters[layer_index + 1]["weight"].T, self.parameters[layer_index + 1][ "delta_weighted_input" ], ) ) ), # 若为输出层则直接计算输出的梯度,否则基于下一层的权重转置和加权输入的梯度计算当前层的输出梯度 "delta_weighted_input": ( delta_weighted_input := delta_activation * self._activate_derivative( activate=self.parameters[layer_index]["activate"], input=self.parameters[layer_index]["weighted_input"], ) ), # 加权输入的梯度 "delta_weight": numpy.dot( delta_weighted_input, (self.parameters[layer_index - 1]["activation"]).T, ), # 权重的梯度 "delta_bias": numpy.sum( delta_weighted_input, axis=1, keepdims=True, ), # 偏置的梯度 } ) def _activate_derivative( self, activate: Literal["relu", "linear", "softmax"], input: numpy.ndarray, ) -> numpy.ndarray: """ 根据激活函数计算输入的导数 :param activate: 激活函数 :param input: 输入 :return: 经过激活函数计算后的输入,维度与输入相同 """ match activate: case "relu": return numpy.where(input > 0, 1, 0) case "linear": return numpy.ones_like(input) case "softmax": activation = self._activate( activate=activate, input=input, ) return activation * (1 - activation) def _update_parameters(self, learning_rate: float) -> None: """ 更新神经网络参数 :param learning_rate: 学习率 :return: 无 """ for layer_index in range(1, self.layer_counts + 1): self.parameters[layer_index].update( { "weight": self.parameters[layer_index]["weight"] - self.parameters[layer_index]["delta_weight"] * learning_rate, # 权重 "bias": self.parameters[layer_index]["bias"] - self.parameters[layer_index]["delta_bias"] * learning_rate, # 平移 } ) # 测试代码 if __name__ == "__main__": X = numpy.random.randn(2, 100) # 真实函数:y = 2*x1 + 3*x2 + 1 y_true = 2 * X[0:1, :] ** 2 + 3 * X[1:2, :] + 1 # 创建并训练神经网络 neural_network = NeuralNetwork( structure=[2, 256, 128, 1], # 2输入,10隐藏神经元,1输出 ) # 训练 neural_network.train( X=X, y_true=y_true, target_loss=0.05, epochs=1_000, learning_rate=0.05 )