diff --git a/神经网络/main.py b/神经网络/main.py index 738e92d..f447389 100644 --- a/神经网络/main.py +++ b/神经网络/main.py @@ -13,142 +13,156 @@ class NeuralNetwork: 神经网络 """ + HIDDEN_ACTIVATES = ["relu"] + OUTPUT_ACTIVATES = ["linear", "softmax"] + def __init__( self, - neurons: List[int], + structure: List[int], hidden_activate: Literal["relu"] = "relu", output_activate: Literal["linear", "softmax"] = "linear", + epsilon: float = 1e-9, ): """ 初始化 - :param neurons: 神经元结构,例如[2, 10, 1]表示输入层为2个神经元、第一层隐含层为10个神经元、输出层为1个神经元 - :param hidden_activate: 隐含层激活函数,默认为relu - :param output_activate: 输出层激活函数,默认为linear + :param structure: 神经网络结构,例如[2, 10, 1]表示2层神经网络,具体为输入层2个神经元、隐含层10个神经元、输出层1个神经元 + :param hidden_activate: 隐含层的激活函数,默认为relu + :param output_activate: 输出层的激活函数,默认为linear + :param epsilon: 极小值,默认为1e-9 """ print("正在初始化神经网络...", end="") - # 初始化神经元结构 - self.neurons = neurons + # 初始化神经网络结构 + self.structure = structure + # 神经网络层数 + self.layer_counts = ( + len(structure) - 1 + ) # 定义第0层为输入层,第L层为输出层(L为神经网络层数),第l层为隐含层(l=1,2,...,L-1) - # 初始化隐含层激活函数 + if hidden_activate not in self.HIDDEN_ACTIVATES: + raise ValueError(f"该隐含层激活函数 {hidden_activate} 暂不支持") self.hidden_activate = hidden_activate - # 初始化输出层激活函数 + + if output_activate not in self.OUTPUT_ACTIVATES: + raise ValueError(f"该输出层激活函数 {output_activate} 暂不支持") self.output_activate = output_activate - # 初始化神经网络结构 - self.neural_network = {} - # 初始化神经网络所有层权重和偏置 - self._init_neural_network() + self.paramters = {} + # 就隐含层和输出层初始化神经网络参数 + for layer_index in range(1, self.layer_counts + 1): + # 上一层和当前层神经元数量 + previous_layer_neuron_counts, current_layer_neuron_counts = ( + self.structure[layer_index - 1], + self.structure[layer_index], + ) + self.paramters[layer_index] = { + "weight": numpy.random.randn( + current_layer_neuron_counts, previous_layer_neuron_counts + ) + * ( + numpy.sqrt(2 / previous_layer_neuron_counts) + if layer_index < self.layer_counts + else ( + numpy.sqrt(1 / previous_layer_neuron_counts) + if self.output_activate == "linear" + else numpy.sqrt( + 2 + / ( + previous_layer_neuron_counts + + current_layer_neuron_counts + ) + ) + ) + ), # 权重,权重维度为[当前层神经元数量,上一层神经元数量]、输入维度为[上一层神经元数量,样本数]以适配加权输=权重*输入+偏移。隐含层使用He初始化权重方法、输出层激活函数若为linear则使用标准Xavier初始化权重方法否则使用改进Xavier初始化权重方法 + "bias": numpy.zeros((current_layer_neuron_counts, 1)), # 偏移 + "gamma": numpy.ones( + (current_layer_neuron_counts, 1) + ), # 批标准化的缩放因子 + "beta": numpy.zeros( + (current_layer_neuron_counts, 1) + ), # 批标准化的偏移因子 + "activate": ( + self.hidden_activate + if layer_index < self.layer_counts + else self.output_activate + ), # 激活函数 + } + + self.epsilon = epsilon print("已完成") - def _init_neural_network(self): - """ - 初始化神经网络所有层权重和偏置 - """ - for idx in range(1, len(self.neurons)): - # 若为隐含层则根据隐含层激活函数计算当前层权重的标准偏差,若为输出层则根据输出层激活函数计算当前层权重的标准偏差 - if idx != len(self.neurons) - 1: - # 激活函数 - activate = self.hidden_activate - match self.hidden_activate: - case "relu": - # 当前层权重的标准偏差 - standard_deviation = numpy.sqrt( - 2 / self.neurons[idx - 1] - ) # 使用He方差公式 - case _: - raise RuntimeError( - f"暂不支持该隐含层激活函数 {self.hidden_activate}" - ) - else: - # 激活函数 - activate = self.output_activate - match self.output_activate: - case "linear": - # 当前层权重的标准偏差 - standard_deviation = numpy.sqrt(1 / self.neurons[idx - 1]) - case "softmax": - # 当前层权重的标准偏差 - standard_deviation = numpy.sqrt( - 2 / (self.neurons[idx - 1] + self.neurons[idx]) - ) # 使用Xavier方差公式 - case _: - raise RuntimeError( - f"暂不支持该输出层激活函数 {self.output_activate}" - ) - - self.neural_network[f"layer:{idx:03d}"] = { - "weight": numpy.random.randn(self.neurons[idx - 1], self.neurons[idx]) - * standard_deviation, # 当前层权重 - "bias": numpy.zeros((1, self.neurons[idx])), # 当前层偏置 - "activate": activate, # 当前层激活函数 - "gamma": numpy.ones((1, self.neurons[idx])), # 当前层批标准化的缩放因子 - "beta": numpy.zeros((1, self.neurons[idx])), # 当前层批标准化的偏移因子 - } - - def _forward_propagate(self, x: numpy.ndarray) -> numpy.ndarray: + def _forward_propagate(self, X: numpy.ndarray) -> numpy.ndarray: """ 前向传播 - :param x: 输入层输入 - :return: 输出层预测值 + :param X: 输入层的输入,维度为[输入特征数, 样本数] + :return: 输出层的输出预测,维度为[输出特征数, 样本数] """ - activation = x # 将输入层输入作为第0层的激活值 - for layer_name, layer in self.neural_network.items(): - self.neural_network[layer_name].update( + activation = X # 将输入层的输入作为第0层的输出 + for layer_index in range(1, self.layer_counts + 1): + x = activation # 将上一层的输出作为当前层的输入 + self.paramters[layer_index].update( { - "weighted_sum": ( - weighted_sum := numpy.dot(activation, layer["weight"]) - + layer["bias"] - ), # 当前层加权和 - "batch_normalized_weighted_sum": ( - batch_normalized_weighted_sum := layer["gamma"] - * ( - weighted_sum - - numpy.mean(weighted_sum, axis=0, keepdims=True) + "weighted_input": ( + weighted_input := numpy.dot( + self.paramters[layer_index]["weight"], x ) - / numpy.sqrt( - numpy.var( - weighted_sum, ddof=0, axis=0, keepdims=True - ) # 使用有偏方差公式 - + 1e-8 + ), # 加权输入 + "weighted_input_average": ( + weighted_input_average := numpy.mean( + weighted_input, axis=1, keepdims=True ) - + layer["beta"] - ), # 当前层批标准化加权和 + ), # 加权输入的平均值 + "weighted_input_standard_deviation": ( + weighted_input_standard_deviation := numpy.sqrt( + numpy.var(weighted_input, ddof=0, axis=1, keepdims=True) + + self.epsilon + ) + ), # 加权输入的标准差 + "batch_normalized_weighted_input": ( + batch_normalized_weighted_input := ( + weighted_input - weighted_input_average + ) + * self.paramters[layer_index]["gamma"] + / weighted_input_standard_deviation + + self.paramters[layer_index]["beta"] + ), # 就加权输入批标准化 "activation": ( activation := self._activate( - activate=layer["activate"], - weighted_sum=batch_normalized_weighted_sum, + activate=self.paramters[layer_index]["activate"], + weighted_input=batch_normalized_weighted_input, ) - ), # 当前层激活值 + ), # 输出 } ) - y_predict = activation # 将第L-1层(最后一层)的激活值作为输出层预测值(L为神经网络层数) + y_predict = activation # 将第L层(输出层)的输出作为输出层的输出预测 return y_predict def _activate( self, activate: Literal["relu", "linear", "softmax"], - weighted_sum: numpy.ndarray, + weighted_input: numpy.ndarray, ) -> numpy.ndarray: """ - 激活函数 + 根据激活函数计算输出 :param activate: 激活函数 - :param weighted_sum: 加权和 - :return: 激活值 + :param weighted_input: 加权输入 + :return: 输出 """ match activate: case "relu": - return numpy.maximum(0, weighted_sum) + return numpy.maximum(0, weighted_input) case "linear": - return weighted_sum + return weighted_input case "softmax": - # 加权和指数项 - e_weighted_sum = numpy.exp( - weighted_sum - numpy.max(weighted_sum, axis=1, keepdims=True) + # 加权输入的指数项 + e_weighted_input = numpy.exp( + weighted_input - numpy.max(weighted_input, axis=0, keepdims=True) + ) + return e_weighted_input / numpy.sum( + e_weighted_input, axis=0, keepdims=True ) - return e_weighted_sum / numpy.sum(e_weighted_sum, axis=1, keepdims=True) def _calculate_loss( self, @@ -157,62 +171,239 @@ class NeuralNetwork: ) -> numpy.floating: """ 计算损失 - :param y_true: 输出层真实值 - :param y_predict: 输出层预测值 + :param y_true: 输出层的输出真实,维度为[输出特征数, 样本数] + :param y_predict: 输出层的输出预测,维度为[输出特征数, 样本数] :return: 损失值 """ - # 第L-1层(最后一层)的层名 - layer_name = list(self.neural_network.keys())[-1] - # 根据第L-1层(最后一层)的激活函数计算损失 - match activate := self.neural_network[layer_name]["activate"]: - case "linear": - loss = 0.5 * numpy.mean( - numpy.square(y_true - y_predict) - ) # 使用均方误差公式 - case "softmax": - loss = -1 * numpy.mean( - numpy.sum(y_true * numpy.log(y_predict + 1e-8), axis=1) - ) # 使用交叉熵损失公式 - case _: - raise RuntimeError(f"暂不支持该输出层激活函数 {activate}") - - return loss + return ( + 0.5 * numpy.mean(numpy.square(y_true - y_predict)) + if self.paramters[self.layer_counts]["activate"] == "linear" + else -1 + * numpy.mean( + numpy.sum( + y_true + * numpy.log(numpy.clip(y_predict, self.epsilon, 1 - self.epsilon)), + axis=0, + ) + ) + ) # 若输出层的激活函数为linear则损失函数使用均方误差否则使用交叉熵 def _backward_propagate( self, - x: numpy.ndarray, + X: numpy.ndarray, y_true: numpy.ndarray, y_predict: numpy.ndarray, ) -> None: """ 后向传播 - :param x: 输入层输入 - :param y_true: 输出层真实值 - :param y_predict: 输出层预测值 + :param X: 输入层的输入,维度为[输入特征数, 样本数] + :param y_true: 输出层的输出真实,维度为[输出特征数, 样本数] + :param y_predict: 输出层的输出预测,维度为[输出特征数, 样本数] :return: 无 """ - # 所有层的层名 - layer_names = list(self.neural_network.keys()) + sample_counts = X.shape[1] # 样本数 - for idx, layer_name in enumerate(reversed(layer_names)): - # 当前层激活函数、加权和、批标准化加权和和激活值 - activate, weighted_sum, batch_normalized_weighted_sum, activation = ( - self.neural_network[layer_name]["activate"], - self.neural_network[layer_name]["weighted_sum"], - self.neural_network[layer_name]["batch_normalized_weighted_sum"], - self.neural_network[layer_name]["activation"], + # 损失函数对输出层的就加权输入批标准化的梯度 + self.paramters[self.layer_counts]["delta_batch_normalized_weighted_input"] = ( + y_predict - y_true + ) / sample_counts # 均方误差和交叉熵对输出层的输出预测的梯度是相同的 + + for layer_index in range(self.layer_counts, 0, -1): + self.paramters[layer_index].update( + { + "delta_gamma": numpy.sum( + self.paramters[layer_index][ + "delta_batch_normalized_weighted_input" + ] + * ( + self.paramters[layer_index]["weighted_input"] + - self.paramters[layer_index]["weighted_input_average"] + ) + / self.paramters[layer_index][ + "weighted_input_standard_deviation" + ], + axis=1, + keepdims=True, + ), # 批标准化的缩放因子的梯度 + "delta_beta": numpy.sum( + self.paramters[layer_index][ + "delta_batch_normalized_weighted_input" + ], + axis=1, + keepdims=True, + ), # 批标准化的偏移因子的梯度 + "delta_weighted_input": ( + delta_weighted_input := ( + sample_counts + * self.paramters[layer_index]["gamma"] + * self.paramters[layer_index][ + "delta_batch_normalized_weighted_input" + ] + - numpy.sum( + self.paramters[layer_index]["gamma"] + * self.paramters[layer_index][ + "delta_batch_normalized_weighted_input" + ], + axis=1, + keepdims=True, + ) + - ( + ( + self.paramters[layer_index]["weighted_input"] + - self.paramters[layer_index][ + "weighted_input_average" + ] + ) + / self.paramters[layer_index][ + "weighted_input_standard_deviation" + ] + ) + * numpy.sum( + self.paramters[layer_index]["gamma"] + * self.paramters[layer_index][ + "delta_batch_normalized_weighted_input" + ] + * ( + ( + self.paramters[layer_index]["weighted_input"] + - self.paramters[layer_index][ + "weighted_input_average" + ] + ) + / self.paramters[layer_index][ + "weighted_input_standard_deviation" + ] + ), + axis=1, + keepdims=True, + ) + ) + * (1.0 / sample_counts) + / self.paramters[layer_index][ + "weighted_input_standard_deviation" + ] + ), # 加权输入的梯度 + "delta_weight": numpy.dot( + delta_weighted_input, + ( + X + if layer_index == 1 + else self.paramters[layer_index - 1]["activation"] + ).T, + ), # 权重的梯度 + "delta_bias": numpy.sum( + delta_weighted_input, + axis=1, + keepdims=True, + ), # 偏置的梯度 + } ) - # 输出层的误差项 - if idx == 0: - match activate: - case "linear" | "softmax": - delta = y_predict - y_true # 损失函数对第L-1层(最后一层)激活值的梯度 - case _: - raise RuntimeError(f"暂不支持该输出层激活函数 {activate}") - # 隐含层的误差项 - else: - delta = numpy.dot(delta, self.neural_network[layer_names[idx - 1]]["weight"].T) + if layer_index > 1: + self.paramters[layer_index - 1][ + "delta_batch_normalized_weighted_input" + ] = numpy.dot( + self.paramters[layer_index]["weight"].T, + self.paramters[layer_index]["delta_weighted_input"], + ) * ( + self.paramters[layer_index - 1]["batch_normalized_weighted_input"] + > 0 + ).astype( + numpy.float32 + ) - delta = 0 - + def train( + self, + X: numpy.ndarray, + y_true: numpy.ndarray, + target_loss: float = 1e-3, + epochs: int = 200, + learning_rate: float = 0.001, + ) -> None: + """ + 训练神经网络 + :param X: 输入层的输入 + :param y_true: 输出层的输出真实 + :param target_loss: 目标损失 + :param epochs: 学习轮数 + :param learning_rate: 学习率 + :return: 无 + """ + print( + f"开始训练:目标损失为 {target_loss},学习轮数为 {epochs},学习率为 {learning_rate}..." + ) + # 标准化 + X = (X - numpy.mean(X, axis=1, keepdims=True)) / ( + numpy.std(X, axis=1, keepdims=True) + self.epsilon + ) + epoch = 1 + while True: + # 前向传播 + y_predict = self._forward_propagate(X=X) + + loss = self._calculate_loss(y_true=y_true, y_predict=y_predict) + if loss < target_loss: + print( + f" 第 {epoch} 轮损失为 {loss},已达到目标损失 {target_loss},训练结束" + ) + break + if epoch >= epochs: + print( + f" 第 {epoch} 轮损失为 {loss},已达到最大学习轮数 {epochs},训练结束" + ) + break + if epoch % 50 == 0: + print(f" 第 {epoch} 轮损失为 {loss},继续训练...") + + # 后向传播 + self._backward_propagate(X=X, y_true=y_true, y_predict=y_predict) + + # 更新神经网络参数 + self._update_parameters(learning_rate=learning_rate) + + epoch += 1 + + for idx in numpy.random.choice(X.shape[1], size=10, replace=False): + y_true_val = y_true[0, idx] + y_pred_val = y_predict[0, idx] + error = abs(y_true_val - y_pred_val) + print(f"{idx:<10} {y_true_val:<15.4f} {y_pred_val:<15.4f} {error:<15.4f}") + + def _update_parameters(self, learning_rate: float) -> None: + """ + 更新神经网络参数 + :param learning_rate: 学习率 + :return: 无 + """ + for layer_index in range(1, self.layer_counts + 1): + self.paramters[layer_index].update( + { + "weight": self.paramters[layer_index]["weight"] + - self.paramters[layer_index]["delta_weight"] * learning_rate, + "bias": self.paramters[layer_index]["bias"] + - self.paramters[layer_index]["delta_bias"] * learning_rate, + "gamma": self.paramters[layer_index]["gamma"] + - self.paramters[layer_index]["delta_gamma"] * learning_rate, + "beta": self.paramters[layer_index]["beta"] + - self.paramters[layer_index]["delta_beta"] * learning_rate, + } + ) + + +# 测试代码 +if __name__ == "__main__": + # 生成测试数据(回归任务) + numpy.random.seed(42) # 设置随机种子保证可复现 + X = numpy.random.randn(2, 100) * 5 + # 真实函数:y = 2*x1 + 3*x2 + 1 (加噪声) + y_true = 2 * X[0:1, :]**2 + 3 * X[1:2, :] + 1 + numpy.random.randn(1, 100) * 0.1 + + # 创建并训练神经网络 + neural_network = NeuralNetwork( + structure=[2, 200, 100, 50, 1], # 2输入,10隐藏神经元,1输出 + ) + + # 训练 + neural_network.train( + X=X, y_true=y_true, target_loss=0.001, epochs=10000, learning_rate=0.001 + )