251029更新

This commit is contained in:
marslbr 2025-10-29 18:08:49 +08:00
parent 376d8448a7
commit 73511f73cc
1 changed files with 322 additions and 357 deletions

View File

@ -1,398 +1,363 @@
# -*- coding: utf-8 -*-
if __name__ == "__main__":
"""
基于RFM模型生成数据分析报告
"""
"""
基于RFM模型生成数据分析报告
"""
# 导入模块
# 导入模块
import pandas
import statistics
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
from datetime import datetime
import pandas
from jinja2 import Environment, FileSystemLoader
from decimal import Decimal, ROUND_HALF_UP
from utils.client import MySQLClient
from utils.pandas_extension import DrawAsHTML
import statistics
from jinja2 import Environment, FileSystemLoader
# 函数说明根据RFM编码映射为客户分类
def map_classification(r_encoded, f_encoded, m_encoded):
# 就R、F、M指标构建独热编码并匹配客户分类
match f"{r_encoded}{f_encoded}{m_encoded}":
case "000":
classification = "流失客户"
case "010":
classification = "一般维持客户"
case "100":
classification = "新客户"
case "110":
classification = "潜力客户"
case "001":
classification = "重要挽留客户"
case "101":
classification = "重要深耕客户"
case "011":
classification = "重要唤回客户"
case "111":
classification = "重要价值客户"
from utils.client import MySQLClient
# noinspection PyUnboundLocalVariable
return classification
from utils.pandas_extension import DrawAsHTML
# 函数说明根据RFM编码映射为客户分类
def map_classification(r_encoded, f_encoded, m_encoded):
print("1 加载数据集...", end="")
# 就R、F、M指标构建独热编码并匹配客户分类
match f"{r_encoded}{f_encoded}{m_encoded}":
client = MySQLClient(database="data_analysis")
case "000":
dataframe = client.execute_query(
sql="select 客户ID, 交易金额, 交易日期 from rfm_dataset"
) # customer_id 客户ID STRINGtrade_date 交易日期 DATETIME.DATEtrade_amount 交易金额 DECIMAL
classification = "流失客户"
print("已完成")
case "010":
print("2 预处理,删除包含缺失值的样本和重复样本...", end="")
classification = "一般维持客户"
# 删除包含缺失值的样本
dataframe.dropna(inplace=True)
case "100":
# 删除重复样本(保留第一例重复样本、重置索引)
dataframe = dataframe.drop_duplicates(ignore_index=True, inplace=False)
classification = "新客户"
# 仅保留交易日期为2012和2013年的样本
dataframe = dataframe[
dataframe["交易日期"].apply(lambda x: x.year in [2012, 2013])
].reset_index(
drop=True
) # 因交易日期数据类型为DATETIME.DATE非DATETIME64故无法使用SERIES.DT.YEAR方法
case "110":
sample_size = Decimal(dataframe.shape[0]).quantize(Decimal("0"))
classification = "潜力客户"
print("已完成")
case "001":
print("3 构建RFM...", end="")
classification = "重要挽留客户"
# 最远交易日期
min_trade_date = dataframe["交易日期"].min()
case "101":
classification = "重要深耕客户"
case "011":
classification = "重要唤回客户"
case "111":
classification = "重要价值客户"
# noinspection PyUnboundLocalVariable
return classification
print("1 加载数据集...", end="")
client = MySQLClient(database="data_analysis")
dataframe = client.execute_query(
sql="select 客户ID, 交易金额, 交易日期 from rfm_dataset"
) # customer_id 客户ID STRINGtrade_date 交易日期 DATETIME.DATEtrade_amount 交易金额 DECIMAL
print("已完成")
print("2 预处理,删除包含缺失值的样本和重复样本...", end="")
# 删除包含缺失值的样本
dataframe.dropna(inplace=True)
# 删除重复样本(保留第一例重复样本、重置索引)
dataframe = dataframe.drop_duplicates(ignore_index=True, inplace=False)
# 仅保留交易日期为2012和2013年的样本
dataframe = dataframe[
dataframe["交易日期"].apply(lambda x: x.year in [2012, 2013])
].reset_index(
drop=True
) # 因交易日期数据类型为DATETIME.DATE非DATETIME64故无法使用SERIES.DT.YEAR方法
sample_size = Decimal(dataframe.shape[0]).quantize(Decimal("0"))
print("已完成")
print("3 构建RFM...", end="")
# 最远交易日期
min_trade_date = dataframe["交易日期"].min()
# R为最近一次交易日期距离样本中最远一次交易日期的天数单位DECIMALF为交易频率单位DECIMALM为交易金额单位DECIMAL。均正向化
rfm = (
dataframe.groupby(by="客户ID")
.agg(
R=(
"交易日期",
lambda x: Decimal((x.max() - min_trade_date).days).quantize(
Decimal("0"), rounding=ROUND_HALF_UP
),
# R为最近一次交易日期距离样本中最远一次交易日期的天数单位DECIMALF为交易频率单位DECIMALM为交易金额单位DECIMAL。均正向化
rfm = (
dataframe.groupby(by="客户ID")
.agg(
R=(
"交易日期",
lambda x: Decimal((x.max() - min_trade_date).days).quantize(
Decimal("0"), rounding=ROUND_HALF_UP
),
F=(
"客户ID",
lambda x: Decimal(len(x)).quantize(
Decimal("0"), rounding=ROUND_HALF_UP
),
),
M=(
"交易金额",
lambda x: sum(x, Decimal("0")).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
), # 求和时指定初始值为DECIMAL("0")
),
)
.reset_index()
)
# 客户数
customer_counts = Decimal(rfm.shape[0]).quantize(Decimal("0"))
# 总交易金额
trade_amounts = sum(rfm["M"], Decimal("0.00")).quantize(Decimal("0.00"))
print("已完成")
print("4 基于平均数将R、F和M分为低、高两个等级并组合为八种客户分类...", end="")
# R、F和M的平均数使用STATISTICS.MEAN统计平均值保证精度
# noinspection PyUnresolvedReferences
means = {
"R": statistics.mean(rfm["R"]).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
"F": statistics.mean(rfm["F"]).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
F=(
"客户ID",
lambda x: Decimal(len(x)).quantize(Decimal("0"), rounding=ROUND_HALF_UP),
),
"M": statistics.mean(rfm["M"]).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
}
rfm = rfm.assign(
客户分类=lambda dataframe: dataframe.apply(
lambda row: map_classification(
r_encoded=0 if row["R"] <= means["R"] else 1,
f_encoded=0 if row["F"] <= means["F"] else 1,
m_encoded=0 if row["M"] <= means["M"] else 1,
),
axis="columns",
)
)
dataframe = dataframe.merge(
right=rfm[["客户ID", "客户分类"]], on="客户ID", how="left"
)
print("已完成")
print("5 生成分析报告...", end="")
draw = DrawAsHTML()
# 生成数据预览
draw.table(
dataframe=dataframe.sample(5),
file_name="数据预览.html",
)
# 客户分类维度
customer_types = (
rfm.groupby(by="客户分类") # 按照客户分类分组
.agg(
R=(
"R",
lambda x: statistics.mean(x).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
), # R平均值
F=(
"F",
lambda x: statistics.mean(x).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
), # F平均值
M=(
"M",
lambda x: statistics.mean(x).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
), # M平均值
客户占比=(
"客户分类",
lambda x: (Decimal(len(x)) / customer_counts * Decimal("100")).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
), # 统计各客户分类的客户占比
交易金额占比=(
"M",
lambda x: Decimal(
sum(x, Decimal("0.00")) / trade_amounts * Decimal("100")
).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
), # 统计各客户分类的交易金额占比
)
.reset_index()
)
# 生成客户分类分布
draw.scatter(
dataframe=customer_types[["客户分类", "R", "F", "M"]],
xaxis_opts_min=475,
xaxis_opts_max=750,
file_name="客户分类分布.html",
)
# 生成客户占比
draw.pie(
dataframe=customer_types[["客户分类", "客户占比"]].sort_values(
by="客户占比", ascending=False
), # 按照客户占比降序
file_name="客户占比.html",
)
# 生成交易金额占比
draw.pie(
dataframe=customer_types[["客户分类", "交易金额占比"]].sort_values(
by="交易金额占比", ascending=False
), # 按照交易金额占比降序
file_name="交易金额占比.html",
)
report_backward = pandas.DataFrame(
data=[], columns=["客户分类", "窗口期", "客户数"]
)
for customer_type in customer_types["客户分类"]:
for month in range(1, 13):
# 窗口期从2013-01至2013-12
period = f"2013-{month:02d}"
# 窗口期起期(向前滑动十二个月,包括当月)
period_start = (
pandas.Period(value=f"2013-{month:02d}", freq="M") - 11
).start_time.date()
# 窗口期止期
period_end = pandas.Period(
value=f"2013-{month:02d}", freq="M"
).end_time.date()
# 指定客户分类窗口期内客户数
customer_counts = dataframe.loc[
(dataframe["客户分类"] == customer_type)
& (dataframe["交易日期"] >= period_start)
& (dataframe["交易日期"] <= period_end),
"客户ID",
].nunique()
report_backward.loc[report_backward.shape[0]] = [
customer_type,
period,
customer_counts,
]
# 生成近十二个自然月客户数趋势
draw.area(
dataframe=report_backward.groupby(by="窗口期", as_index=False).agg(
客户数=("客户数", "sum")
),
file_name="近十二个自然月客户数趋势.html",
yaxis_opts_min=1350,
)
report_backward = report_backward.loc[
report_backward["客户分类"].isin(
["新客户", "流失客户", "重要价值客户"]
) # 仅考虑新客户、流水客户、重要价值客户
].assign(
总客户数=lambda x: x.groupby(by="窗口期")["客户数"].transform(
"sum"
), # 统计窗口期总客户数并新增值各行
客户占比=lambda x: x.apply(
lambda y: (
Decimal(y["客户数"]) / Decimal(y["总客户数"]) * Decimal("100")
).quantize(
M=(
"交易金额",
lambda x: sum(x, Decimal("0")).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
), # 运算各项使用DECIMAL以控制精度
axis="columns",
), # 求和时指定初始值为DECIMAL("0")
),
)
.reset_index()
)
# 生成近十二个自然月客户占比趋势(仅考虑新客户、流失客户和重要价值客户)
draw.bar(
dataframe=report_backward[
["客户分类", "窗口期", "客户占比"]
], # 仅保留客户分类、窗口期和占比
file_name="近十二个自然月客户占比趋势.html",
stack=True,
# 客户数
customer_counts = Decimal(rfm.shape[0]).quantize(Decimal("0"))
# 总交易金额
trade_amounts = sum(rfm["M"], Decimal("0.00")).quantize(Decimal("0.00"))
print("已完成")
print("4 基于平均数将R、F和M分为低、高两个等级并组合为八种客户分类...", end="")
# R、F和M的平均数使用STATISTICS.MEAN统计平均值保证精度
# noinspection PyUnresolvedReferences
means = {
"R": statistics.mean(rfm["R"]).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
"F": statistics.mean(rfm["F"]).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
"M": statistics.mean(rfm["M"]).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
}
rfm = rfm.assign(
客户分类=lambda dataframe: dataframe.apply(
lambda row: map_classification(
r_encoded=0 if row["R"] <= means["R"] else 1,
f_encoded=0 if row["F"] <= means["F"] else 1,
m_encoded=0 if row["M"] <= means["M"] else 1,
),
axis="columns",
)
)
report_forward = (
dataframe.assign(
最早交易日期=lambda dataframe: dataframe.groupby(by="客户ID")[
"交易日期"
].transform("min"),
) # 统计每位客户最早交易日期
.assign(
周期=lambda dataframe: (
dataframe["交易日期"].apply(lambda x: x.year)
- dataframe["最早交易日期"].apply(lambda x: x.year)
)
* 12
+ (
dataframe["交易日期"].apply(lambda x: x.month)
- dataframe["最早交易日期"].apply(lambda x: x.month)
)
) # 每笔交易的交易日期和对客户最早交易日期的间隔作为周期,单位为月
.assign(
周期=lambda dataframe: dataframe["周期"].apply(lambda x: f"M+{x:02d}")
) # 格式化周期
.assign(
群组=lambda dataframe: dataframe["最早交易日期"].apply(
lambda x: f"{x.year}-{x.month:02d}"
)
) # 截取最早交易日期的年月作为群组
.groupby(by=["客户分类", "群组", "周期"], as_index=False)
.agg(客户数=("客户ID", "nunique")) # COHORT-ANALYSIS群组-周期矩阵
.groupby(by=["客户分类", "周期"], as_index=False)
.agg(客户数=("客户数", "sum")) # 统计各客户分类各周期用户数
.assign(
基准客户数=lambda dataframe: dataframe.groupby(by=["客户分类"])[
"客户数"
].transform("first")
)
.assign(
留存率=lambda dataframe: dataframe.apply(
lambda x: (
Decimal(x["客户数"]) / Decimal(x["基准客户数"]) * Decimal("100")
).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
axis="columns",
)
)
.loc[
lambda dataframe: dataframe["客户分类"].isin(
["新客户", "流失客户", "重要价值客户"]
)
& dataframe["周期"].isin(
[
"M+01",
"M+02",
"M+03",
"M+04",
"M+05",
"M+06",
"M+07",
"M+08",
"M+09",
"M+10",
"M+11",
"M+12",
]
)
dataframe = dataframe.merge(right=rfm[["客户ID", "客户分类"]], on="客户ID", how="left")
print("已完成")
print("5 生成分析报告...", end="")
draw = DrawAsHTML()
# 生成数据预览
draw.table(
dataframe=dataframe.sample(5),
file_name="数据预览.html",
)
# 客户分类维度
customer_types = (
rfm.groupby(by="客户分类") # 按照客户分类分组
.agg(
R=(
"R",
lambda x: statistics.mean(x).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
), # R平均值
F=(
"F",
lambda x: statistics.mean(x).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
), # F平均值
M=(
"M",
lambda x: statistics.mean(x).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
), # M平均值
客户占比=(
"客户分类",
lambda x: (Decimal(len(x)) / customer_counts * Decimal("100")).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
), # 统计各客户分类的客户占比
交易金额占比=(
"M",
lambda x: Decimal(
sum(x, Decimal("0.00")) / trade_amounts * Decimal("100")
).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
), # 统计各客户分类的交易金额占比
)
.reset_index()
)
# 生成客户分类分布
draw.scatter(
dataframe=customer_types[["客户分类", "R", "F", "M"]],
xaxis_opts_min=475,
xaxis_opts_max=750,
file_name="客户分类分布.html",
)
# 生成客户占比
draw.pie(
dataframe=customer_types[["客户分类", "客户占比"]].sort_values(
by="客户占比", ascending=False
), # 按照客户占比降序
file_name="客户占比.html",
)
# 生成交易金额占比
draw.pie(
dataframe=customer_types[["客户分类", "交易金额占比"]].sort_values(
by="交易金额占比", ascending=False
), # 按照交易金额占比降序
file_name="交易金额占比.html",
)
report_backward = pandas.DataFrame(data=[], columns=["客户分类", "窗口期", "客户数"])
for customer_type in customer_types["客户分类"]:
for month in range(1, 13):
# 窗口期从2013-01至2013-12
period = f"2013-{month:02d}"
# 窗口期起期(向前滑动十二个月,包括当月)
period_start = (
pandas.Period(value=f"2013-{month:02d}", freq="M") - 11
).start_time.date()
# 窗口期止期
period_end = pandas.Period(value=f"2013-{month:02d}", freq="M").end_time.date()
# 指定客户分类窗口期内客户数
customer_counts = dataframe.loc[
(dataframe["客户分类"] == customer_type)
& (dataframe["交易日期"] >= period_start)
& (dataframe["交易日期"] <= period_end),
"客户ID",
].nunique()
report_backward.loc[report_backward.shape[0]] = [
customer_type,
period,
customer_counts,
]
.reset_index(drop=True)
# 生成近十二个自然月客户数趋势
draw.area(
dataframe=report_backward.groupby(by="窗口期", as_index=False).agg(
客户数=("客户数", "sum")
),
file_name="近十二个自然月客户数趋势.html",
yaxis_opts_min=1350,
)
report_backward = report_backward.loc[
report_backward["客户分类"].isin(
["新客户", "流失客户", "重要价值客户"]
) # 仅考虑新客户、流水客户、重要价值客户
].assign(
总客户数=lambda x: x.groupby(by="窗口期")["客户数"].transform(
"sum"
), # 统计窗口期总客户数并新增值各行
客户占比=lambda x: x.apply(
lambda y: (
Decimal(y["客户数"]) / Decimal(y["总客户数"]) * Decimal("100")
).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
), # 运算各项使用DECIMAL以控制精度
axis="columns",
),
)
# 生成近十二个自然月客户占比趋势(仅考虑新客户、流失客户和重要价值客户)
draw.bar(
dataframe=report_backward[
["客户分类", "窗口期", "客户占比"]
], # 仅保留客户分类、窗口期和占比
file_name="近十二个自然月客户占比趋势.html",
stack=True,
)
report_forward = (
dataframe.assign(
最早交易日期=lambda dataframe: dataframe.groupby(by="客户ID")[
"交易日期"
].transform("min"),
) # 统计每位客户最早交易日期
.assign(
周期=lambda dataframe: (
dataframe["交易日期"].apply(lambda x: x.year)
- dataframe["最早交易日期"].apply(lambda x: x.year)
)
* 12
+ (
dataframe["交易日期"].apply(lambda x: x.month)
- dataframe["最早交易日期"].apply(lambda x: x.month)
)
) # 每笔交易的交易日期和对客户最早交易日期的间隔作为周期,单位为月
.assign(
周期=lambda dataframe: dataframe["周期"].apply(lambda x: f"M+{x:02d}")
) # 格式化周期
.assign(
群组=lambda dataframe: dataframe["最早交易日期"].apply(
lambda x: f"{x.year}-{x.month:02d}"
)
) # 截取最早交易日期的年月作为群组
.groupby(by=["客户分类", "群组", "周期"], as_index=False)
.agg(客户数=("客户ID", "nunique")) # COHORT-ANALYSIS群组-周期矩阵
.groupby(by=["客户分类", "周期"], as_index=False)
.agg(客户数=("客户数", "sum")) # 统计各客户分类各周期用户数
.assign(
基准客户数=lambda dataframe: dataframe.groupby(by=["客户分类"])[
"客户数"
].transform("first")
)
# 生成近十二个自然月留存率趋势
draw.line(
dataframe=report_forward[["客户分类", "周期", "留存率"]],
file_name="近十二个自然月留存率趋势.html",
.assign(
留存率=lambda dataframe: dataframe.apply(
lambda x: (
Decimal(x["客户数"]) / Decimal(x["基准客户数"]) * Decimal("100")
).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
axis="columns",
)
)
.loc[
lambda dataframe: dataframe["客户分类"].isin(
["新客户", "流失客户", "重要价值客户"]
)
& dataframe["周期"].isin(
[
"M+01",
"M+02",
"M+03",
"M+04",
"M+05",
"M+06",
"M+07",
"M+08",
"M+09",
"M+10",
"M+11",
"M+12",
]
)
]
.reset_index(drop=True)
)
# 获取报告模版
template = Environment(loader=FileSystemLoader(".")).get_template("template.html")
# 生成近十二个自然月留存率趋势
draw.line(
dataframe=report_forward[["客户分类", "周期", "留存率"]],
file_name="近十二个自然月留存率趋势.html",
)
# 渲染模版
rfm_report = template.render(
{
# 报告日期
"report_date": datetime.now().strftime("%Y-%m-%d"),
"sample_size": sample_size,
}
)
# 获取报告模版
template = Environment(loader=FileSystemLoader(".")).get_template("template.html")
with open("rfm_report.html", "w", encoding="utf8") as file:
# 渲染模版
rfm_report = template.render(
{
# 报告日期
"report_date": datetime.now().strftime("%Y-%m-%d"),
"sample_size": sample_size,
}
)
file.write(rfm_report)
with open("rfm_report.html", "w", encoding="utf8") as file:
file.write(rfm_report)
print("已完成")
print("已完成")