From 73511f73cc1d43e2aa8634a4f16505abcc810d3b Mon Sep 17 00:00:00 2001 From: marslbr Date: Wed, 29 Oct 2025 18:08:49 +0800 Subject: [PATCH] =?UTF-8?q?251029=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rfm/main.py | 679 +++++++++++++++++++++++++--------------------------- 1 file changed, 322 insertions(+), 357 deletions(-) diff --git a/rfm/main.py b/rfm/main.py index cbd2bca..8144049 100644 --- a/rfm/main.py +++ b/rfm/main.py @@ -1,398 +1,363 @@ # -*- coding: utf-8 -*- -if __name__ == "__main__": - """ - 基于RFM模型生成数据分析报告 - """ +""" +基于RFM模型生成数据分析报告 +""" - # 导入模块 +# 导入模块 - import pandas +import statistics +from datetime import datetime +from decimal import Decimal, ROUND_HALF_UP - from datetime import datetime +import pandas +from jinja2 import Environment, FileSystemLoader - from decimal import Decimal, ROUND_HALF_UP +from utils.client import MySQLClient +from utils.pandas_extension import DrawAsHTML - import statistics - from jinja2 import Environment, FileSystemLoader +# 函数说明:根据RFM编码映射为客户分类 +def map_classification(r_encoded, f_encoded, m_encoded): + # 就R、F、M指标构建独热编码并匹配客户分类 + match f"{r_encoded}{f_encoded}{m_encoded}": + case "000": + classification = "流失客户" + case "010": + classification = "一般维持客户" + case "100": + classification = "新客户" + case "110": + classification = "潜力客户" + case "001": + classification = "重要挽留客户" + case "101": + classification = "重要深耕客户" + case "011": + classification = "重要唤回客户" + case "111": + classification = "重要价值客户" - from utils.client import MySQLClient + # noinspection PyUnboundLocalVariable + return classification - from utils.pandas_extension import DrawAsHTML - # 函数说明:根据RFM编码映射为客户分类 - def map_classification(r_encoded, f_encoded, m_encoded): +print("1 加载数据集...", end="") - # 就R、F、M指标构建独热编码并匹配客户分类 - match f"{r_encoded}{f_encoded}{m_encoded}": +client = MySQLClient(database="data_analysis") - case "000": +dataframe = client.execute_query( + sql="select 客户ID, 交易金额, 交易日期 from rfm_dataset" +) # customer_id 客户ID STRING,trade_date 交易日期 DATETIME.DATE,trade_amount 交易金额 DECIMAL - classification = "流失客户" +print("已完成") - case "010": +print("2 预处理,删除包含缺失值的样本和重复样本...", end="") - classification = "一般维持客户" +# 删除包含缺失值的样本 +dataframe.dropna(inplace=True) - case "100": +# 删除重复样本(保留第一例重复样本、重置索引) +dataframe = dataframe.drop_duplicates(ignore_index=True, inplace=False) - classification = "新客户" +# 仅保留交易日期为2012和2013年的样本 +dataframe = dataframe[ + dataframe["交易日期"].apply(lambda x: x.year in [2012, 2013]) +].reset_index( + drop=True +) # 因交易日期数据类型为DATETIME.DATE非DATETIME64,故无法使用SERIES.DT.YEAR方法 - case "110": +sample_size = Decimal(dataframe.shape[0]).quantize(Decimal("0")) - classification = "潜力客户" +print("已完成") - case "001": +print("3 构建RFM...", end="") - classification = "重要挽留客户" +# 最远交易日期 +min_trade_date = dataframe["交易日期"].min() - case "101": - - classification = "重要深耕客户" - - case "011": - - classification = "重要唤回客户" - - case "111": - - classification = "重要价值客户" - - # noinspection PyUnboundLocalVariable - return classification - - print("1 加载数据集...", end="") - - client = MySQLClient(database="data_analysis") - - dataframe = client.execute_query( - sql="select 客户ID, 交易金额, 交易日期 from rfm_dataset" - ) # customer_id 客户ID STRING,trade_date 交易日期 DATETIME.DATE,trade_amount 交易金额 DECIMAL - - print("已完成") - - print("2 预处理,删除包含缺失值的样本和重复样本...", end="") - - # 删除包含缺失值的样本 - dataframe.dropna(inplace=True) - - # 删除重复样本(保留第一例重复样本、重置索引) - dataframe = dataframe.drop_duplicates(ignore_index=True, inplace=False) - - # 仅保留交易日期为2012和2013年的样本 - dataframe = dataframe[ - dataframe["交易日期"].apply(lambda x: x.year in [2012, 2013]) - ].reset_index( - drop=True - ) # 因交易日期数据类型为DATETIME.DATE非DATETIME64,故无法使用SERIES.DT.YEAR方法 - - sample_size = Decimal(dataframe.shape[0]).quantize(Decimal("0")) - - print("已完成") - - print("3 构建RFM...", end="") - - # 最远交易日期 - min_trade_date = dataframe["交易日期"].min() - - # R为最近一次交易日期距离样本中最远一次交易日期的天数(单位:日),DECIMAL;F为交易频率(单位;次),DECIMAL;M为交易金额(单位:元),DECIMAL。均正向化 - rfm = ( - dataframe.groupby(by="客户ID") - .agg( - R=( - "交易日期", - lambda x: Decimal((x.max() - min_trade_date).days).quantize( - Decimal("0"), rounding=ROUND_HALF_UP - ), +# R为最近一次交易日期距离样本中最远一次交易日期的天数(单位:日),DECIMAL;F为交易频率(单位;次),DECIMAL;M为交易金额(单位:元),DECIMAL。均正向化 +rfm = ( + dataframe.groupby(by="客户ID") + .agg( + R=( + "交易日期", + lambda x: Decimal((x.max() - min_trade_date).days).quantize( + Decimal("0"), rounding=ROUND_HALF_UP ), - F=( - "客户ID", - lambda x: Decimal(len(x)).quantize( - Decimal("0"), rounding=ROUND_HALF_UP - ), - ), - M=( - "交易金额", - lambda x: sum(x, Decimal("0")).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP - ), # 求和时指定初始值为DECIMAL("0") - ), - ) - .reset_index() - ) - - # 客户数 - customer_counts = Decimal(rfm.shape[0]).quantize(Decimal("0")) - - # 总交易金额 - trade_amounts = sum(rfm["M"], Decimal("0.00")).quantize(Decimal("0.00")) - - print("已完成") - - print("4 基于平均数将R、F和M分为低、高两个等级并组合为八种客户分类...", end="") - - # R、F和M的平均数,使用STATISTICS.MEAN统计平均值,保证精度 - # noinspection PyUnresolvedReferences - means = { - "R": statistics.mean(rfm["R"]).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP ), - "F": statistics.mean(rfm["F"]).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP + F=( + "客户ID", + lambda x: Decimal(len(x)).quantize(Decimal("0"), rounding=ROUND_HALF_UP), ), - "M": statistics.mean(rfm["M"]).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP - ), - } - - rfm = rfm.assign( - 客户分类=lambda dataframe: dataframe.apply( - lambda row: map_classification( - r_encoded=0 if row["R"] <= means["R"] else 1, - f_encoded=0 if row["F"] <= means["F"] else 1, - m_encoded=0 if row["M"] <= means["M"] else 1, - ), - axis="columns", - ) - ) - - dataframe = dataframe.merge( - right=rfm[["客户ID", "客户分类"]], on="客户ID", how="left" - ) - - print("已完成") - - print("5 生成分析报告...", end="") - - draw = DrawAsHTML() - - # 生成数据预览 - draw.table( - dataframe=dataframe.sample(5), - file_name="数据预览.html", - ) - - # 客户分类维度 - customer_types = ( - rfm.groupby(by="客户分类") # 按照客户分类分组 - .agg( - R=( - "R", - lambda x: statistics.mean(x).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP - ), - ), # R平均值 - F=( - "F", - lambda x: statistics.mean(x).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP - ), - ), # F平均值 - M=( - "M", - lambda x: statistics.mean(x).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP - ), - ), # M平均值 - 客户占比=( - "客户分类", - lambda x: (Decimal(len(x)) / customer_counts * Decimal("100")).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP - ), - ), # 统计各客户分类的客户占比 - 交易金额占比=( - "M", - lambda x: Decimal( - sum(x, Decimal("0.00")) / trade_amounts * Decimal("100") - ).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), - ), # 统计各客户分类的交易金额占比 - ) - .reset_index() - ) - - # 生成客户分类分布 - draw.scatter( - dataframe=customer_types[["客户分类", "R", "F", "M"]], - xaxis_opts_min=475, - xaxis_opts_max=750, - file_name="客户分类分布.html", - ) - - # 生成客户占比 - draw.pie( - dataframe=customer_types[["客户分类", "客户占比"]].sort_values( - by="客户占比", ascending=False - ), # 按照客户占比降序 - file_name="客户占比.html", - ) - - # 生成交易金额占比 - draw.pie( - dataframe=customer_types[["客户分类", "交易金额占比"]].sort_values( - by="交易金额占比", ascending=False - ), # 按照交易金额占比降序 - file_name="交易金额占比.html", - ) - - report_backward = pandas.DataFrame( - data=[], columns=["客户分类", "窗口期", "客户数"] - ) - - for customer_type in customer_types["客户分类"]: - - for month in range(1, 13): - - # 窗口期,从2013-01至2013-12 - period = f"2013-{month:02d}" - - # 窗口期起期(向前滑动十二个月,包括当月) - period_start = ( - pandas.Period(value=f"2013-{month:02d}", freq="M") - 11 - ).start_time.date() - - # 窗口期止期 - period_end = pandas.Period( - value=f"2013-{month:02d}", freq="M" - ).end_time.date() - - # 指定客户分类窗口期内客户数 - customer_counts = dataframe.loc[ - (dataframe["客户分类"] == customer_type) - & (dataframe["交易日期"] >= period_start) - & (dataframe["交易日期"] <= period_end), - "客户ID", - ].nunique() - - report_backward.loc[report_backward.shape[0]] = [ - customer_type, - period, - customer_counts, - ] - - # 生成近十二个自然月客户数趋势 - draw.area( - dataframe=report_backward.groupby(by="窗口期", as_index=False).agg( - 客户数=("客户数", "sum") - ), - file_name="近十二个自然月客户数趋势.html", - yaxis_opts_min=1350, - ) - - report_backward = report_backward.loc[ - report_backward["客户分类"].isin( - ["新客户", "流失客户", "重要价值客户"] - ) # 仅考虑新客户、流水客户、重要价值客户 - ].assign( - 总客户数=lambda x: x.groupby(by="窗口期")["客户数"].transform( - "sum" - ), # 统计窗口期总客户数并新增值各行 - 客户占比=lambda x: x.apply( - lambda y: ( - Decimal(y["客户数"]) / Decimal(y["总客户数"]) * Decimal("100") - ).quantize( + M=( + "交易金额", + lambda x: sum(x, Decimal("0")).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP - ), # 运算各项使用DECIMAL以控制精度 - axis="columns", + ), # 求和时指定初始值为DECIMAL("0") ), ) + .reset_index() +) - # 生成近十二个自然月客户占比趋势(仅考虑新客户、流失客户和重要价值客户) - draw.bar( - dataframe=report_backward[ - ["客户分类", "窗口期", "客户占比"] - ], # 仅保留客户分类、窗口期和占比 - file_name="近十二个自然月客户占比趋势.html", - stack=True, +# 客户数 +customer_counts = Decimal(rfm.shape[0]).quantize(Decimal("0")) + +# 总交易金额 +trade_amounts = sum(rfm["M"], Decimal("0.00")).quantize(Decimal("0.00")) + +print("已完成") + +print("4 基于平均数将R、F和M分为低、高两个等级并组合为八种客户分类...", end="") + +# R、F和M的平均数,使用STATISTICS.MEAN统计平均值,保证精度 +# noinspection PyUnresolvedReferences +means = { + "R": statistics.mean(rfm["R"]).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), + "F": statistics.mean(rfm["F"]).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), + "M": statistics.mean(rfm["M"]).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), +} + +rfm = rfm.assign( + 客户分类=lambda dataframe: dataframe.apply( + lambda row: map_classification( + r_encoded=0 if row["R"] <= means["R"] else 1, + f_encoded=0 if row["F"] <= means["F"] else 1, + m_encoded=0 if row["M"] <= means["M"] else 1, + ), + axis="columns", ) +) - report_forward = ( - dataframe.assign( - 最早交易日期=lambda dataframe: dataframe.groupby(by="客户ID")[ - "交易日期" - ].transform("min"), - ) # 统计每位客户最早交易日期 - .assign( - 周期=lambda dataframe: ( - dataframe["交易日期"].apply(lambda x: x.year) - - dataframe["最早交易日期"].apply(lambda x: x.year) - ) - * 12 - + ( - dataframe["交易日期"].apply(lambda x: x.month) - - dataframe["最早交易日期"].apply(lambda x: x.month) - ) - ) # 每笔交易的交易日期和对客户最早交易日期的间隔作为周期,单位为月 - .assign( - 周期=lambda dataframe: dataframe["周期"].apply(lambda x: f"M+{x:02d}") - ) # 格式化周期 - .assign( - 群组=lambda dataframe: dataframe["最早交易日期"].apply( - lambda x: f"{x.year}-{x.month:02d}" - ) - ) # 截取最早交易日期的年月作为群组 - .groupby(by=["客户分类", "群组", "周期"], as_index=False) - .agg(客户数=("客户ID", "nunique")) # COHORT-ANALYSIS,群组-周期矩阵 - .groupby(by=["客户分类", "周期"], as_index=False) - .agg(客户数=("客户数", "sum")) # 统计各客户分类各周期用户数 - .assign( - 基准客户数=lambda dataframe: dataframe.groupby(by=["客户分类"])[ - "客户数" - ].transform("first") - ) - .assign( - 留存率=lambda dataframe: dataframe.apply( - lambda x: ( - Decimal(x["客户数"]) / Decimal(x["基准客户数"]) * Decimal("100") - ).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), - axis="columns", - ) - ) - .loc[ - lambda dataframe: dataframe["客户分类"].isin( - ["新客户", "流失客户", "重要价值客户"] - ) - & dataframe["周期"].isin( - [ - "M+01", - "M+02", - "M+03", - "M+04", - "M+05", - "M+06", - "M+07", - "M+08", - "M+09", - "M+10", - "M+11", - "M+12", - ] - ) +dataframe = dataframe.merge(right=rfm[["客户ID", "客户分类"]], on="客户ID", how="left") + +print("已完成") + +print("5 生成分析报告...", end="") + +draw = DrawAsHTML() + +# 生成数据预览 +draw.table( + dataframe=dataframe.sample(5), + file_name="数据预览.html", +) + +# 客户分类维度 +customer_types = ( + rfm.groupby(by="客户分类") # 按照客户分类分组 + .agg( + R=( + "R", + lambda x: statistics.mean(x).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + ), # R平均值 + F=( + "F", + lambda x: statistics.mean(x).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + ), # F平均值 + M=( + "M", + lambda x: statistics.mean(x).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + ), # M平均值 + 客户占比=( + "客户分类", + lambda x: (Decimal(len(x)) / customer_counts * Decimal("100")).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + ), # 统计各客户分类的客户占比 + 交易金额占比=( + "M", + lambda x: Decimal( + sum(x, Decimal("0.00")) / trade_amounts * Decimal("100") + ).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), + ), # 统计各客户分类的交易金额占比 + ) + .reset_index() +) + +# 生成客户分类分布 +draw.scatter( + dataframe=customer_types[["客户分类", "R", "F", "M"]], + xaxis_opts_min=475, + xaxis_opts_max=750, + file_name="客户分类分布.html", +) + +# 生成客户占比 +draw.pie( + dataframe=customer_types[["客户分类", "客户占比"]].sort_values( + by="客户占比", ascending=False + ), # 按照客户占比降序 + file_name="客户占比.html", +) + +# 生成交易金额占比 +draw.pie( + dataframe=customer_types[["客户分类", "交易金额占比"]].sort_values( + by="交易金额占比", ascending=False + ), # 按照交易金额占比降序 + file_name="交易金额占比.html", +) + +report_backward = pandas.DataFrame(data=[], columns=["客户分类", "窗口期", "客户数"]) + +for customer_type in customer_types["客户分类"]: + + for month in range(1, 13): + + # 窗口期,从2013-01至2013-12 + period = f"2013-{month:02d}" + + # 窗口期起期(向前滑动十二个月,包括当月) + period_start = ( + pandas.Period(value=f"2013-{month:02d}", freq="M") - 11 + ).start_time.date() + + # 窗口期止期 + period_end = pandas.Period(value=f"2013-{month:02d}", freq="M").end_time.date() + + # 指定客户分类窗口期内客户数 + customer_counts = dataframe.loc[ + (dataframe["客户分类"] == customer_type) + & (dataframe["交易日期"] >= period_start) + & (dataframe["交易日期"] <= period_end), + "客户ID", + ].nunique() + + report_backward.loc[report_backward.shape[0]] = [ + customer_type, + period, + customer_counts, ] - .reset_index(drop=True) + +# 生成近十二个自然月客户数趋势 +draw.area( + dataframe=report_backward.groupby(by="窗口期", as_index=False).agg( + 客户数=("客户数", "sum") + ), + file_name="近十二个自然月客户数趋势.html", + yaxis_opts_min=1350, +) + +report_backward = report_backward.loc[ + report_backward["客户分类"].isin( + ["新客户", "流失客户", "重要价值客户"] + ) # 仅考虑新客户、流水客户、重要价值客户 +].assign( + 总客户数=lambda x: x.groupby(by="窗口期")["客户数"].transform( + "sum" + ), # 统计窗口期总客户数并新增值各行 + 客户占比=lambda x: x.apply( + lambda y: ( + Decimal(y["客户数"]) / Decimal(y["总客户数"]) * Decimal("100") + ).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), # 运算各项使用DECIMAL以控制精度 + axis="columns", + ), +) + +# 生成近十二个自然月客户占比趋势(仅考虑新客户、流失客户和重要价值客户) +draw.bar( + dataframe=report_backward[ + ["客户分类", "窗口期", "客户占比"] + ], # 仅保留客户分类、窗口期和占比 + file_name="近十二个自然月客户占比趋势.html", + stack=True, +) + +report_forward = ( + dataframe.assign( + 最早交易日期=lambda dataframe: dataframe.groupby(by="客户ID")[ + "交易日期" + ].transform("min"), + ) # 统计每位客户最早交易日期 + .assign( + 周期=lambda dataframe: ( + dataframe["交易日期"].apply(lambda x: x.year) + - dataframe["最早交易日期"].apply(lambda x: x.year) + ) + * 12 + + ( + dataframe["交易日期"].apply(lambda x: x.month) + - dataframe["最早交易日期"].apply(lambda x: x.month) + ) + ) # 每笔交易的交易日期和对客户最早交易日期的间隔作为周期,单位为月 + .assign( + 周期=lambda dataframe: dataframe["周期"].apply(lambda x: f"M+{x:02d}") + ) # 格式化周期 + .assign( + 群组=lambda dataframe: dataframe["最早交易日期"].apply( + lambda x: f"{x.year}-{x.month:02d}" + ) + ) # 截取最早交易日期的年月作为群组 + .groupby(by=["客户分类", "群组", "周期"], as_index=False) + .agg(客户数=("客户ID", "nunique")) # COHORT-ANALYSIS,群组-周期矩阵 + .groupby(by=["客户分类", "周期"], as_index=False) + .agg(客户数=("客户数", "sum")) # 统计各客户分类各周期用户数 + .assign( + 基准客户数=lambda dataframe: dataframe.groupby(by=["客户分类"])[ + "客户数" + ].transform("first") ) - - # 生成近十二个自然月留存率趋势 - draw.line( - dataframe=report_forward[["客户分类", "周期", "留存率"]], - file_name="近十二个自然月留存率趋势.html", + .assign( + 留存率=lambda dataframe: dataframe.apply( + lambda x: ( + Decimal(x["客户数"]) / Decimal(x["基准客户数"]) * Decimal("100") + ).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), + axis="columns", + ) ) + .loc[ + lambda dataframe: dataframe["客户分类"].isin( + ["新客户", "流失客户", "重要价值客户"] + ) + & dataframe["周期"].isin( + [ + "M+01", + "M+02", + "M+03", + "M+04", + "M+05", + "M+06", + "M+07", + "M+08", + "M+09", + "M+10", + "M+11", + "M+12", + ] + ) + ] + .reset_index(drop=True) +) - # 获取报告模版 - template = Environment(loader=FileSystemLoader(".")).get_template("template.html") +# 生成近十二个自然月留存率趋势 +draw.line( + dataframe=report_forward[["客户分类", "周期", "留存率"]], + file_name="近十二个自然月留存率趋势.html", +) - # 渲染模版 - rfm_report = template.render( - { - # 报告日期 - "report_date": datetime.now().strftime("%Y-%m-%d"), - "sample_size": sample_size, - } - ) +# 获取报告模版 +template = Environment(loader=FileSystemLoader(".")).get_template("template.html") - with open("rfm_report.html", "w", encoding="utf8") as file: +# 渲染模版 +rfm_report = template.render( + { + # 报告日期 + "report_date": datetime.now().strftime("%Y-%m-%d"), + "sample_size": sample_size, + } +) - file.write(rfm_report) +with open("rfm_report.html", "w", encoding="utf8") as file: + file.write(rfm_report) - print("已完成") +print("已完成")