Python/普康健康机构周报/main.py

475 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
普康健康_生成直付理赔周报
"""
# 加载模块
import pandas, numpy
from utils.pandas_extension import open_csv, save_as_workbook
# 根据机构所在省份匹配为机构名称
def match_institution_name(x):
x_matched = "总部"
match x:
case "北京市" | "天津市":
x_matched = "京津"
case "河北省":
x_matched = "河北"
case "山西省":
x_matched = "山西"
case "内蒙古自治区":
x_matched = "内蒙"
case "辽宁省":
x_matched = "辽宁"
case "吉林省" | "黑龙江省":
x_matched = "黑吉"
case "上海市":
x_matched = "上海"
case "江苏省":
x_matched = "江苏"
case "浙江省":
x_matched = "浙江"
case "安徽省":
x_matched = "安徽"
case "福建省":
x_matched = "福建"
case "江西省":
x_matched = "江西"
case "山东省":
x_matched = "山东"
case "河南省":
x_matched = "河南"
case "湖北省":
x_matched = "湖北"
case "湖南省":
x_matched = "湖南"
case "广东省" | "海南省":
x_matched = "广东"
case "广西壮族自治区":
x_matched = "广西"
case "重庆市" | "四川省" | "西藏自治区":
x_matched = "四川"
case "贵州省":
x_matched = "贵州"
case "云南省":
x_matched = "云南"
case "新疆维吾尔自治区":
x_matched = "新疆"
case "陕西省" | "青海省":
x_matched = "陕西"
case "甘肃省":
x_matched = "甘肃"
case "宁夏回族自治区":
x_matched = "宁夏"
return x_matched
# 根据机构名称匹配为大区名称
def match_region_name(x):
x_matched = "总部"
match x:
case "内蒙" | "辽宁" | "黑吉":
x_matched = "东北大区"
case "京津" | "河北" | "山西":
x_matched = "华北大区"
case "安徽" | "山东" | "河南":
x_matched = "华东大区"
case "江苏" | "福建" | "广东":
x_matched = "东南大区"
case "江西" | "湖北" | "湖南":
x_matched = "华中大区"
case "新疆" | "陕西" | "甘肃" | "宁夏":
x_matched = "西北大区"
case "广西" | "四川" | "云南" | "贵州":
x_matched = "西南大区"
case "上海":
x_matched = "上海"
case "浙江":
x_matched = "浙江"
return x_matched
"""
统计方案:
1、读取当年往月对账单数据包括对账期、商家编号、保单编号和对账金额文件名为reconciliations.csv
1.1 根据对账期和保单编号分组,就对账金额求和,其中对账期、对账金额之和重命名为考核周期、消费规模
2、读取当年当月保单扣减数据包括扣减期、商家编号、保单编号和扣减金额文件名为reconciliations_month.csv
2.1 根据扣减期和保单编号分组,就扣减金额求和,其中扣减期、扣减金额之和重命名为考核周期、消费规模
3、合并1.1和2.1,即当年往月和当年当月考核周期、保单编号和消费规模
4、读取徐丹老师提供的保单机构分配数据包括保单编号、落地机构、落地机构分配比例、出单机构、出单机构分配比例、总部分配比例文件名为slips.csv
4.1 先查询3中消费规模大于0的保单编号再和4中保单编号比较、就不在4中的保单编号提供徐丹老师由其补录保单机构分配方案。补录后重复4.1至无需再提供徐丹老师 -->过程表
5、就机构拆解保单消费规模根据考核周期、机构分组就消费规模求和
5.1 根据机构名称匹配大区名称
5.2 读取当年机构消费目标数据,包括考核周期、机构名称和消费目标,根据考核周期和机构名称匹配消费目标
5.3 就算达成率(消费规模/消费目标)-->基表
6、透视基表生成各机构在当年各月消费目标、消费规模和转化率当年消费目标、消费规模和转化率并汇总
"""
print("正在生成直付理赔周报...", end="")
# 当年往月对账单数据(维度为对账期-商家编号-保单编号)
reconciliations = open_csv(file_name="reconciliations.csv")
# 删除保单编号为空的行(若保单编号为空则对账金额必定为空,若对账金额为空则保单编号必定为空)
reconciliations.dropna(subset=["保单编号"], inplace=True)
# 数据类型转换
for variable_label in reconciliations.columns:
match variable_label:
case "对账金额":
# 不可能出现缺失值,无需填补缺失值
reconciliations[variable_label] = reconciliations[variable_label].astype(
"float"
)
# 按照对账期和保单编号分组,就对账金额求和,重置索引,修改列名
reconciliations = (
reconciliations.groupby(by=["对账期", "保单编号"])
.agg(对账金额=("对账金额", "sum"))
.reset_index()
.rename(columns={"对账期": "考核周期", "对账金额": "消费规模"})
)
# 当年当月保单扣减数据(维度为扣减期-商家编号-保单编号)
reconciliations_month = open_csv(file_name="reconciliations_month.csv")
# 数据类型转换
for variable_label in reconciliations_month.columns:
match variable_label:
case "扣减金额":
# 不可能出现缺失值,无需填补缺失值
reconciliations_month[variable_label] = reconciliations_month[
variable_label
].astype("float")
# 按照扣减期和保单编号分组,就扣减金额求和,重置索引,修改列名
reconciliations_month = (
reconciliations_month.groupby(by=["扣减期", "保单编号"])
.agg(扣减金额=("扣减金额", "sum"))
.reset_index()
.rename(columns={"扣减期": "考核周期", "扣减金额": "消费规模"})
)
# 合并上述当年往月对账单数据和当年当月保单扣减数据
reconciliations = pandas.concat(
objs=[reconciliations, reconciliations_month], ignore_index=True
)
# 徐丹老师提供的保单机构分配数据
slips = open_csv(file_name="slips.csv")
# 数据类型转换
for variable_label in slips.columns:
match variable_label:
# 不可能出现缺失值,无需填补缺失值
case "落地机构分配比例" | "出单机构分配比例" | "总部分配比例":
slips[variable_label] = slips[variable_label].astype("int")
# 过程表
process_table = reconciliations.merge(right=slips, on="保单编号", how="left")
# 统计消费规模大于0且出单机构分配比例为空的保单机构分配数据
process_table.loc[
(process_table["消费规模"] > 0) & (process_table["出单机构分配比例"].isna()),
"异常标签",
] = "无分配方案"
if process_table.loc[process_table["异常标签"] == "无分配方案"].shape[0] > 0:
print("存在未分配机构的保单,请提请徐丹老师补录")
print()
save_as_workbook(
worksheets=[("异常保单", process_table)],
workbook_name="普康健康_需补录保单机构分配方案.xlsx",
)
exit()
# 新增总部
slips.insert(loc=slips.shape[1] - 1, column="总部", value="总部")
# 先就落地机构、出单机构和总部新增机构名称列,落地机构分配比例、出单机构分配比例和总部分配比例新增分配比例列,再拆分为行
slips = (
slips.assign(
# 整合机构
机构名称=slips.apply(
lambda x: [x["落地机构"], x["出单机构"], x["总部"]], axis="columns"
),
# 整合分配比例
分配比例=slips.apply(
lambda x: [x["落地机构分配比例"], x["出单机构分配比例"], x["总部分配比例"]],
axis="columns",
),
)
# 拆分机构名称和分配比例并重置索引
.explode(["机构名称", "分配比例"]).reset_index(drop=True)
)
# 保留分配比例大于0的保单机构分配数据
slips = slips.loc[slips["分配比例"] > 0, ["保单编号", "机构名称", "分配比例"]]
# 根据机构所在省份匹配为机构名称
slips["机构名称"] = slips["机构名称"].apply(lambda x: match_institution_name(x))
# 根据机构名称匹配为大区名称并插入至第二列
slips.insert(
loc=slips.shape[1] - 2,
column="大区名称",
value=slips["机构名称"].apply(lambda x: match_region_name(x)),
)
# 左拼接保单机构分配数据(分配比例不可能出现缺失值,无需填补缺失值)
process_table = process_table.merge(right=slips, on="保单编号", how="left")
# 分配后消费规模
process_table["分配后消费规模"] = process_table.apply(
lambda x: x["消费规模"] * x["分配比例"] / 100, axis="columns"
)
# 按照考核周期和机构名称分组,就分配后消费规模求和
process_table = (
process_table.groupby(by=["考核周期", "机构名称"])
.agg(大区名称=("大区名称", "first"), 分配后消费规模=("分配后消费规模", "sum"))
.reset_index()
)
# 机构考核周期消费目标数据(维度为对机构名称-考核周期)
targets = open_csv(file_name="targets.csv")
# 数据类型转换
for variable_label in targets.columns:
match variable_label:
case "消费目标":
# 消费目标不可能出现缺失值,无需填补缺失值
targets[variable_label] = targets[variable_label].astype("float")
process_table = process_table.merge(
right=targets, on=["机构名称", "考核周期"], how="left"
)
# 根据过程表透视(第一级行索引为大区名称,第二级行索引为机构名称,第一级列索引为考核周期,列索引值为分配后消费规模和消费目标,行和列汇总)
pivot_table = process_table.pivot_table(
index=["大区名称", "机构名称"],
columns="考核周期",
values=[
"分配后消费规模",
"消费目标",
], # 注意若设置一个列索引和多个列索引值PANDAS将自动创建多级列索引第一级列索引为VALUES第二季列索引为COLUMNS
aggfunc="sum",
margins=True,
margins_name="汇总",
)
# 添加大区汇总
for region_name in pivot_table.index.get_level_values("大区名称").unique():
if region_name not in ["上海", "浙江", "总部", "汇总"]:
# 汇总大区数据(就各机构的考核周期分配后消费规模和消费目标分别求和)
region_summary = pivot_table.loc[region_name].sum() # SERIES对象
region_summary = pandas.DataFrame(
data=[region_summary], # SERIES列表
# 创建多级行索引
index=pandas.MultiIndex.from_tuples(
tuples=[(region_name, "汇总")], names=["大区名称", "机构名称"]
),
columns=region_summary.index,
)
pivot_table = pandas.concat(objs=[pivot_table, region_summary])
# 计算各考核周期和汇总达成率
for period in pivot_table.columns.get_level_values("考核周期").unique():
pivot_table[("达成率", period)] = pivot_table.apply(
lambda x: (
x[("分配后消费规模", period)] / x[("消费目标", period)]
if x[("消费目标", period)] != 0
else 0
),
axis="columns",
)
# 交换列索引层级,再就列索引排序
pivot_table = pivot_table.swaplevel(axis="columns").sort_index(axis="columns")
# 大区名称排序
regions_orders = [
"东北大区",
"华北大区",
"华东大区",
"华中大区",
"东南大区",
"西北大区",
"西南大区",
"上海",
"浙江",
"总部",
"汇总",
]
# 大区名称和排序映射器
region_mapper = {
region_name: region_index for region_index, region_name in enumerate(regions_orders)
}
# 根据大区名称映射排序
regions_mapped = [
region_mapper.get(region_name)
for region_name in pivot_table.index.get_level_values("大区名称")
]
# 机构排序
institutions_orders = {
"东北大区": ["汇总", "内蒙", "辽宁", "黑吉"],
"华北大区": ["汇总", "京津", "河北", "山西"],
"华东大区": ["汇总", "安徽", "山东", "河南"],
"华中大区": ["汇总", "江西", "湖北", "湖南"],
"东南大区": ["汇总", "江苏", "福建", "广东"],
"西北大区": ["汇总", "新疆", "陕西", "甘肃", "宁夏"],
"西南大区": ["汇总", "广西", "四川", "云南", "贵州"],
"上海": ["上海"],
"浙江": ["浙江"],
"总部": ["总部"],
"汇总": [""],
}
# 机构名称和排序映射器
institution_mapper = {}
institution_mapper.update(
{
(region_name, institution_name): institution_index
for region_name, institution_names in institutions_orders.items()
for institution_index, institution_name in enumerate(institution_names)
}
)
# 根据机构名称映射排序
institutions_mapped = [
institution_mapper.get((region, institution))
for region, institution in zip(
pivot_table.index.get_level_values("大区名称"),
pivot_table.index.get_level_values("机构名称"),
)
]
# 根据大区名称映射排序和机构名称映射排序多重排序
pivot_table = pivot_table.iloc[
numpy.lexsort((institutions_mapped, regions_mapped))
].reset_index()
save_as_workbook(
worksheets=[("sheet1", pivot_table)], workbook_name="普康健康_机构周报.xlsx"
)
print("生成成功")