From 7379c992bf8db8e588bdac47f98dc38a4be02e8d Mon Sep 17 00:00:00 2001 From: liubiren Date: Tue, 6 Jan 2026 15:12:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=B8=B8=E6=9B=B4=E6=96=B0=20from=20N?= =?UTF-8?q?UC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pyproject.toml | 0 rfm/main.py | 2 +- utils/__init__.py | 5 + utils/feishu.py | 242 +++++++++++ utils/mysql.py | 66 +++ utils/{client.py => request.py} | 480 +--------------------- utils/{rule_engine.py => rules_engine.py} | 6 +- utils/sqlite.py | 187 +++++++++ 普康健康发票查验/main.py | 2 +- 普康健康发票识别/main.py | 2 +- 普康健康审核机器人/pageobject.py | 2 +- 普康健康客服会话记录整合/main.py | 2 +- {API => 服务}/Dockerfile | 0 {API => 服务}/main.py | 0 {API => 服务}/requirements.txt | 0 短视频合成自动化/@AutomationLog.txt | 2 - 短视频合成自动化/main.py | 2 +- 票据理赔自动化/__init__.py | 0 票据理赔自动化/a.py | 2 + 票据理赔自动化/common.py | 2 +- 票据理赔自动化/image.py | 7 +- 票据理赔自动化/main.py | 2 +- 票据理赔自动化/masterdata.py | 4 +- 23 files changed, 520 insertions(+), 497 deletions(-) create mode 100644 pyproject.toml create mode 100644 utils/feishu.py create mode 100644 utils/mysql.py rename utils/{client.py => request.py} (57%) rename utils/{rule_engine.py => rules_engine.py} (98%) create mode 100644 utils/sqlite.py rename {API => 服务}/Dockerfile (100%) rename {API => 服务}/main.py (100%) rename {API => 服务}/requirements.txt (100%) delete mode 100644 短视频合成自动化/@AutomationLog.txt create mode 100644 票据理赔自动化/__init__.py create mode 100644 票据理赔自动化/a.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e69de29 diff --git a/rfm/main.py b/rfm/main.py index 8144049..91dac56 100644 --- a/rfm/main.py +++ b/rfm/main.py @@ -14,7 +14,7 @@ from decimal import Decimal, ROUND_HALF_UP import pandas from jinja2 import Environment, FileSystemLoader -from utils.client import MySQLClient +from utils.mysql import MySQLClient from utils.pandas_extension import DrawAsHTML diff --git a/utils/__init__.py b/utils/__init__.py index e69de29..d91180a 100644 --- a/utils/__init__.py +++ b/utils/__init__.py @@ -0,0 +1,5 @@ +from mysql import MySQL +from sqlite import SQLite +from request import restrict, Authenticator, Request +from feishu import Feishu +from rules_engine import RulesEngine diff --git a/utils/feishu.py b/utils/feishu.py new file mode 100644 index 0000000..701cbd6 --- /dev/null +++ b/utils/feishu.py @@ -0,0 +1,242 @@ +# -*- coding: utf-8 -*- + +# 导入模块 + +import re +import time +from email.parser import BytesParser +from email.policy import default +from email.utils import parsedate_to_datetime +from imaplib import IMAP4_SSL + +import pandas + + +class Feishu: + """飞书客户端""" + + def __init__(self): + self.authenticator = Authenticator() + self.http_client = HTTPClient() + + def _headers(self): + """请求头""" + # 装配飞书访问凭证 + return { + "Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}", + } + + @staticmethod + def get_verification_code(): + try: + # 执行时间戳 + execute_timestamp = time.time() + # 超时时间戳 + timeout_timestamp = execute_timestamp + 65 + # 建立加密IMAP连接 + server = IMAP4_SSL("imap.feishu.cn", 993) + # 登录 + server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2") + + while True: + # 若当前时间戳大于超时时间戳则返回NONE + if time.time() <= timeout_timestamp: + # 等待10秒 + time.sleep(10) + # 选择文件夹(邮箱验证码) + server.select("&kK57sZqMi8F4AQ-") + # noinspection PyBroadException + try: + # 获取最后一封邮件索引,server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引 + index = server.search(None, "ALL")[1][0].split()[-1] + # 获取最后一封邮件内容并解析,server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文 + # noinspection PyUnresolvedReferences + contents = BytesParser(policy=default).parsebytes( + server.fetch(index, "(RFC822)")[1][0][1] + ) + # 遍历邮件内容,若正文内容类型为纯文本或HTML则解析发送时间和验证码 + for content in contents.walk(): + if ( + content.get_content_type() == "text/plain" + or content.get_content_type() == "text/html" + ): + # 邮件发送时间戳 + # noinspection PyUnresolvedReferences + send_timestamp = parsedate_to_datetime( + content["Date"] + ).timestamp() + # 若邮件发送时间戳大于执行时间戳则解析验证码并返回 + if ( + execute_timestamp + > send_timestamp + >= execute_timestamp - 35 + ): + # 登出 + server.logout() + # 解析验证码 + return re.search( + r"【普康健康】您的验证码是:(\d+)", + content.get_payload(decode=True).decode(), + ).group(1) + + # 若文件夹无邮件则继续 + except: + pass + + # 若超时则登出 + else: + server.logout() + return None + + except Exception: + raise RuntimeError("获取邮箱验证码发生其它异常") + + # 查询多维表格记录,单次最多查询500条记录 + @restrict(refill_rate=5, max_tokens=5) + def query_bitable_records( + self, + bitable: str, + table_id: str, + field_names: Optional[list[str]] = None, + filter_conditions: Optional[dict] = None, + ) -> pandas.DataFrame: + # 先查询多维表格记录,在根据字段解析记录 + # 装配多维表格查询记录地址 + url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20" + + response = self.http_client.post( + url=url, + headers=self._headers(), + json={"field_names": field_names, "filter": filter_conditions}, + ) + + # 响应业务码为0则定义为响应成功 + assert response.get("code") == 0, "查询多维表格记录发生异常" + + # 多维表格记录 + records = response.get("data").get("items") + + # 检查响应中是否包含还有下一页标识,若有则继续请求下一页 + while response.get("data").get("has_more"): + + url_next = url + "&page_token={}".format( + response.get("data").get("page_token") + ) + + response = self.http_client.post( + url=url_next, + headers=self._headers(), + json={"field_names": field_names, "filter": filter_conditions}, + ) + + assert response.get("code") == 0, "查询多维表格记录发生异常" + + # 合并记录 + records.append(response.get("data").get("items")) + + # 装配多维表格列出字段地址 + url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20" + + response = self.http_client.get( + url=url, + headers=self._headers(), + ) + + assert response.get("code") == 0, "列出多维表格字段发生异常" + + # 多维表格字段 + fields = response.get("data").get("items") + + while response.get("data").get("has_more"): + + url_next = url + "&page_token={}".format( + response.get("data").get("page_token") + ) + + response = self.http_client.get( + url=url_next, + headers=self._headers(), + ) + + assert response.get("code") == 0, "列出多维表格字段发生异常" + + fields.append(response.get("data").get("items")) + + # 字段映射 + field_mappings = {} + + for field in fields: + + # 字段名 + field_name = field["field_name"] + + # 根据字段类型匹配 + match field["type"]: + + case 1005: + + field_type = "主键" + + case 1: + + field_type = "文本" + + case 3: + + field_type = "单选" + + case 2: + + # 数字、公式字段的显示格式 + match field["property"]["formatter"]: + + case "0": + + field_type = "整数" + + case _: + + raise ValueError("未设置数字、公式字段的显示格式") + + case _: + + raise ValueError("未设置字段类型") + + # noinspection PyUnboundLocalVariable + field_mappings.update({field_name: field_type}) + + # 记录数据体 + records_data = [] + + # 解析记录 + for record in records: + + # 单条记录数据体 + record_data = {} + + for field_name, content in record["fields"].items(): + + match field_mappings[field_name]: + + case "主键" | "单选" | "整数": + + record_data.update({field_name: content}) + + case "文本": + + # 若存在多行文本则拼接 + fragments_content = "" + + for fragment_content in content: + + fragments_content += fragment_content["text"] + + record_data.update({field_name: fragments_content}) + + case _: + + raise ValueError("未设置字段解析方法") + + records_data.append(record_data) + + return pandas.DataFrame(records_data) diff --git a/utils/mysql.py b/utils/mysql.py new file mode 100644 index 0000000..7545ab1 --- /dev/null +++ b/utils/mysql.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- + +# 导入模块 +from urllib.parse import quote_plus + +import pandas +from sqlalchemy import create_engine, text + + +class MySQL: + """ + MySQL客户端 + """ + + def __init__( + self, + database: str, + host: str = "cdb-7z9lzx4y.cd.tencentcdb.com", # 默认为刘弼仁的腾讯云MySQL数据库 + port: int = 10039, + username: str = "root", + password: str = "Te198752", + ) -> None: + """ + 初始化 + :param database: 数据库名称 + :param host: 主机 + :param port: 端口 + :param username: 用户名 + :param password: 登录密码 + """ + # 就登录密码编码 + password = quote_plus(password) + # 构建数据库连接 + connection_url = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8" + + # 创建MySQL引擎并连接数据库 + self.engine = create_engine( + connection_url, + pool_size=5, + max_overflow=10, + pool_recycle=3600, + pool_pre_ping=True, + ) # 连接池中保持打开连接的数量为5,额外连接数为10,连接1小时后重新回收重连,检查连接有效性 + + def execute_query(self, sql: str) -> pandas.DataFrame: + """执行SQL查询并返回DATAFRAME""" + + if not hasattr(self, "engine") or not self.engine: + raise ConnectionError("未创建数据库连接") + + try: + with self.engine.connect() as connection: + dataframe = pandas.read_sql_query( + text(sql), connection, coerce_float=False + ) # 不尝试将非整数数值转为浮点(维持DECIMAL) + return dataframe + + except: + connection.rollback() + raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常") + + def __del__(self): + """析构时自动关闭连接""" + + if hasattr(self, "engine") and self.engine: + self.engine.dispose() diff --git a/utils/client.py b/utils/request.py similarity index 57% rename from utils/client.py rename to utils/request.py index 1c9cc0d..4dd9f1e 100644 --- a/utils/client.py +++ b/utils/request.py @@ -5,264 +5,18 @@ import hashlib import hmac import json -import re -import sqlite3 import threading import time -from email.parser import BytesParser -from email.policy import default -from email.utils import parsedate_to_datetime from functools import wraps -from imaplib import IMAP4_SSL from pathlib import Path -from typing import Any, Callable, Dict, Generator, List, Literal, Optional, Tuple, Union -from urllib.parse import quote_plus +from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union from xml.etree import ElementTree -import pandas from pydantic import BaseModel, Field, HttpUrl, model_validator from requests import Response, Session from requests.adapters import HTTPAdapter -from sqlalchemy import create_engine, text from urllib3.util.retry import Retry -""" -封装sqlalchemy,实现按照SQL查询 -类和函数的区别 -类作为对象的模板,定义了对象的结构和行为;而函数则作为实现特定功能或操作的代码块,提高了代码的可读性和可维护性。 -使用方法: -with MySQLClient(database='data_analysis') as client: - dataframe = client.execute_query(sql='select * from {{}}')') -""" - - -class MySQLClient: - """MySQL客户端""" - - def __init__( - self, - database, - host: str = "cdb-7z9lzx4y.cd.tencentcdb.com", - port: int = 10039, - username: str = "root", - password: str = "Te198752", - ): # 默认为刘弼仁的腾讯云MySQL数据库 - - # 数据库登录密码安全编码 - password = quote_plus(password) - - # 构建数据库连接字符 - connect_config = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8" - - # 创建MySQL引擎并连接数据库 - self.engine = create_engine( - connect_config, - pool_size=5, - max_overflow=10, - pool_recycle=3600, - pool_pre_ping=True, - ) # 连接池中保持打开连接的数量为5,额外连接数为10,连接1小时后重新回收重连,检查连接有效性 - - def __del__(self): - """析构时自动关闭连接""" - - if hasattr(self, "engine") and self.engine: - self.engine.dispose() - - def execute_query(self, sql: str) -> pandas.DataFrame: - """执行SQL查询并返回DATAFRAME""" - - if not hasattr(self, "engine") or not self.engine: - raise ConnectionError("未创建数据库连接") - - try: - with self.engine.connect() as connection: - dataframe = pandas.read_sql_query( - text(sql), connection, coerce_float=False - ) # 不尝试将非整数数值转为浮点(维持DECIMAL) - return dataframe - - except: - connection.rollback() - raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常") - - -class SQLiteClient: - """SQLite客户端""" - - def __init__(self, database: Union[str, Path]): - """ - 初始化SQLite客户端 - :param database: 数据库 - """ - self.database = database - # 初始化本地线程存储 - self.threads = threading.local() - - def _connect(self): - """为当前线程创建数据库连接和游标""" - # 检查当前线程有数据库连接,若有则继续否则创建数据库连接 - if hasattr(self.threads, "connection") and self.threads.connection is not None: - return - # 为当前线程关闭数据库连接和游标 - self._disconnect() - - # noinspection PyBroadException - try: - # 为当前线程创建数据库连接 - self.threads.connection = sqlite3.connect( - database=self.database, - check_same_thread=True, - timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 - ) - # 开启行映射,支持按照字段名取值 - self.threads.connection.row_factory = sqlite3.Row - # 为当前线程创建游标 - self.threads.cursor = self.threads.connection.cursor() - except Exception as exception: - self.threads.connection = None - self.threads.cursor = None - raise RuntimeError( - f"为当前线程创建数据库连接和游标发生异常,{str(exception)}" - ) from exception - - def _disconnect(self) -> None: - """为当前线程关闭数据库连接和游标""" - # 检查当前线程有游标,若有则关闭游标 - if hasattr(self.threads, "cursor") and self.threads.cursor is not None: - # noinspection PyBroadException - try: - # 为当前线程关闭游标 - self.threads.cursor.close() - self.threads.cursor = None - except Exception as exception: - raise RuntimeError( - f"为当前线程关闭游标发生异常,{str(exception)}" - ) from exception - - # 检查当前线程有数据库连接,若有则关闭数据库连接 - if hasattr(self.threads, "connection") and self.threads.connection is not None: - # noinspection PyBroadException - try: - # 为当前线程提交事务 - self.threads.connection.commit() - # 为当前线程关闭数据库连接 - self.threads.connection.close() - self.threads.connection = None - except Exception as exception: - raise RuntimeError( - f"为当前线程关闭数据库连接发生异常,{str(exception)}" - ) from exception - - def _query_one( - self, sql: str, parameters: Tuple[Any, ...] = () - ) -> Optional[Dict[str, Any]]: - """ - 为当前线程查询并获取单行数据 - :param sql: 查询SQL语句 - :param parameters: SQL参数 - :return: 单行数据 - """ - # noinspection PyBroadException - try: - # 为当前线程创建数据库连接和游标 - self._connect() - # 检查当前线程无游标,若无则抛出异常 - if not hasattr(self.threads, "cursor") or self.threads.cursor is None: - raise RuntimeError("为当前线程创建游标发生异常") - - # 为当前线程执行SQL - self.threads.cursor.execute(sql, parameters) - return ( - None - if (result := self.threads.cursor.fetchone()) is None - else dict(result) - ) - # 若发生异常则回滚事务并抛出异常 - except Exception as exception: - # 检查当前线程有数据库连接,若有则回滚 - if ( - hasattr(self.threads, "connection") - and self.threads.connection is not None - ): - self.threads.connection.rollback() - raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception - - def _query_all( - self, sql: str, parameters: Tuple[Any, ...] = () - ) -> List[Dict[str, Any]]: - """ - 为当前线程查询并获取多行数据 - :param sql: 查询SQL语句 - :param parameters: SQL参数 - :return: 多行数据 - """ - # noinspection PyBroadException - try: - # 为当前线程创建数据库连接和游标 - self._connect() - # 检查当前线程无游标,若无则抛出异常 - if not hasattr(self.threads, "cursor") or self.threads.cursor is None: - raise RuntimeError("为当前线程创建游标发生异常") - - # 为当前线程执行SQL - self.threads.cursor.execute(sql, parameters) - result = [] - while batch := self.threads.cursor.fetchmany(1000): - result.extend([dict(row) for row in batch]) - return result - # 若发生异常则回滚事务并抛出异常 - except Exception as exception: - # 检查当前线程有数据库连接,若有则回滚 - if ( - hasattr(self.threads, "connection") - and self.threads.connection is not None - ): - self.threads.connection.rollback() - raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception - - def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool: - """ - 为当前线程执行SQL - :param sql: 新增、删除和修改SQL语句 - :param parameters: SQL参数 - :return: 执行结果 - """ - try: - self._connect() - # 检查当前线程无游标,若无则抛出异常 - if not hasattr(self.threads, "cursor") or self.threads.cursor is None: - raise RuntimeError("为当前线程创建游标发生异常") - - # 为当前线程执行SQL - self.threads.cursor.execute(sql, parameters) - # 为当前线程提交事务 - self.threads.connection.commit() - return True - # 若发生异常则回滚事务并抛出异常 - except Exception as exception: - # 检查当前线程有数据库连接,若有则回滚 - if ( - hasattr(self.threads, "connection") - and self.threads.connection is not None - ): - self.threads.connection.rollback() - raise RuntimeError("为当前线程执行SQL发生异常") from exception - - def __enter__(self): - """进入上下文管理时为当前线程创建数据库连接和游标""" - self._connect() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """退出上下文管理时为当前线程关闭数据库连接和游标""" - self._disconnect() - return False - - def __del__(self): - """析构时为当前线程关闭数据库连接和游标""" - self._disconnect() - # 基于令牌桶限流算法的装饰器 def restrict(refill_rate: float = 5.0, max_tokens: int = 5): @@ -357,7 +111,7 @@ def restrict(refill_rate: float = 5.0, max_tokens: int = 5): return decorator -class HTTPClient: +class Request: """请求客户端""" class RequestException(Exception): @@ -915,233 +669,3 @@ class Authenticator: ) return token - - -class FeishuClient: - """飞书客户端""" - - def __init__(self): - self.authenticator = Authenticator() - self.http_client = HTTPClient() - - def _headers(self): - """请求头""" - # 装配飞书访问凭证 - return { - "Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}", - } - - @staticmethod - def get_verification_code(): - try: - # 执行时间戳 - execute_timestamp = time.time() - # 超时时间戳 - timeout_timestamp = execute_timestamp + 65 - # 建立加密IMAP连接 - server = IMAP4_SSL("imap.feishu.cn", 993) - # 登录 - server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2") - - while True: - # 若当前时间戳大于超时时间戳则返回NONE - if time.time() <= timeout_timestamp: - # 等待10秒 - time.sleep(10) - # 选择文件夹(邮箱验证码) - server.select("&kK57sZqMi8F4AQ-") - # noinspection PyBroadException - try: - # 获取最后一封邮件索引,server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引 - index = server.search(None, "ALL")[1][0].split()[-1] - # 获取最后一封邮件内容并解析,server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文 - # noinspection PyUnresolvedReferences - contents = BytesParser(policy=default).parsebytes( - server.fetch(index, "(RFC822)")[1][0][1] - ) - # 遍历邮件内容,若正文内容类型为纯文本或HTML则解析发送时间和验证码 - for content in contents.walk(): - if ( - content.get_content_type() == "text/plain" - or content.get_content_type() == "text/html" - ): - # 邮件发送时间戳 - # noinspection PyUnresolvedReferences - send_timestamp = parsedate_to_datetime( - content["Date"] - ).timestamp() - # 若邮件发送时间戳大于执行时间戳则解析验证码并返回 - if ( - execute_timestamp - > send_timestamp - >= execute_timestamp - 35 - ): - # 登出 - server.logout() - # 解析验证码 - return re.search( - r"【普康健康】您的验证码是:(\d+)", - content.get_payload(decode=True).decode(), - ).group(1) - - # 若文件夹无邮件则继续 - except: - pass - - # 若超时则登出 - else: - server.logout() - return None - - except Exception: - raise RuntimeError("获取邮箱验证码发生其它异常") - - # 查询多维表格记录,单次最多查询500条记录 - @restrict(refill_rate=5, max_tokens=5) - def query_bitable_records( - self, - bitable: str, - table_id: str, - field_names: Optional[list[str]] = None, - filter_conditions: Optional[dict] = None, - ) -> pandas.DataFrame: - # 先查询多维表格记录,在根据字段解析记录 - # 装配多维表格查询记录地址 - url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20" - - response = self.http_client.post( - url=url, - headers=self._headers(), - json={"field_names": field_names, "filter": filter_conditions}, - ) - - # 响应业务码为0则定义为响应成功 - assert response.get("code") == 0, "查询多维表格记录发生异常" - - # 多维表格记录 - records = response.get("data").get("items") - - # 检查响应中是否包含还有下一页标识,若有则继续请求下一页 - while response.get("data").get("has_more"): - - url_next = url + "&page_token={}".format( - response.get("data").get("page_token") - ) - - response = self.http_client.post( - url=url_next, - headers=self._headers(), - json={"field_names": field_names, "filter": filter_conditions}, - ) - - assert response.get("code") == 0, "查询多维表格记录发生异常" - - # 合并记录 - records.append(response.get("data").get("items")) - - # 装配多维表格列出字段地址 - url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20" - - response = self.http_client.get( - url=url, - headers=self._headers(), - ) - - assert response.get("code") == 0, "列出多维表格字段发生异常" - - # 多维表格字段 - fields = response.get("data").get("items") - - while response.get("data").get("has_more"): - - url_next = url + "&page_token={}".format( - response.get("data").get("page_token") - ) - - response = self.http_client.get( - url=url_next, - headers=self._headers(), - ) - - assert response.get("code") == 0, "列出多维表格字段发生异常" - - fields.append(response.get("data").get("items")) - - # 字段映射 - field_mappings = {} - - for field in fields: - - # 字段名 - field_name = field["field_name"] - - # 根据字段类型匹配 - match field["type"]: - - case 1005: - - field_type = "主键" - - case 1: - - field_type = "文本" - - case 3: - - field_type = "单选" - - case 2: - - # 数字、公式字段的显示格式 - match field["property"]["formatter"]: - - case "0": - - field_type = "整数" - - case _: - - raise ValueError("未设置数字、公式字段的显示格式") - - case _: - - raise ValueError("未设置字段类型") - - # noinspection PyUnboundLocalVariable - field_mappings.update({field_name: field_type}) - - # 记录数据体 - records_data = [] - - # 解析记录 - for record in records: - - # 单条记录数据体 - record_data = {} - - for field_name, content in record["fields"].items(): - - match field_mappings[field_name]: - - case "主键" | "单选" | "整数": - - record_data.update({field_name: content}) - - case "文本": - - # 若存在多行文本则拼接 - fragments_content = "" - - for fragment_content in content: - - fragments_content += fragment_content["text"] - - record_data.update({field_name: fragments_content}) - - case _: - - raise ValueError("未设置字段解析方法") - - records_data.append(record_data) - - return pandas.DataFrame(records_data) diff --git a/utils/rule_engine.py b/utils/rules_engine.py similarity index 98% rename from utils/rule_engine.py rename to utils/rules_engine.py index 8da8a6f..f3a8187 100644 --- a/utils/rule_engine.py +++ b/utils/rules_engine.py @@ -1,9 +1,5 @@ # -*- coding: utf-8 -*- -""" -封装ZenEngine -""" - # 导入模块 from datetime import datetime @@ -14,7 +10,7 @@ from typing import Any, Dict from zen import ZenDecision, ZenEngine -class RuleEngine: +class RulesEngine: """ 规则引擎,实现打开并读取规则,根据规则和输入评估并输出 """ diff --git a/utils/sqlite.py b/utils/sqlite.py new file mode 100644 index 0000000..6c28901 --- /dev/null +++ b/utils/sqlite.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- + +# 导入模块 + +import sqlite3 +import threading +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + + +class SQLite: + """ + SQLite客户端 + """ + + def __init__(self, database: Union[str, Path]): + """ + 初始化 + :param database: 数据库地址 + """ + self.database = database + # 初始化本地线程存储 + self.threads = threading.local() + + def _connect(self): + """为当前线程创建数据库连接和游标""" + # 检查当前线程有数据库连接,若有则继续否则创建数据库连接 + if hasattr(self.threads, "connection") and self.threads.connection is not None: + return + # 为当前线程关闭数据库连接和游标 + self._disconnect() + + # noinspection PyBroadException + try: + # 为当前线程创建数据库连接 + self.threads.connection = sqlite3.connect( + database=self.database, + check_same_thread=True, + timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 + ) + # 开启行映射,支持按照字段名取值 + self.threads.connection.row_factory = sqlite3.Row + # 为当前线程创建游标 + self.threads.cursor = self.threads.connection.cursor() + except Exception as exception: + self.threads.connection = None + self.threads.cursor = None + raise RuntimeError( + f"为当前线程创建数据库连接和游标发生异常,{str(exception)}" + ) from exception + + def _disconnect(self) -> None: + """为当前线程关闭数据库连接和游标""" + # 检查当前线程有游标,若有则关闭游标 + if hasattr(self.threads, "cursor") and self.threads.cursor is not None: + # noinspection PyBroadException + try: + # 为当前线程关闭游标 + self.threads.cursor.close() + self.threads.cursor = None + except Exception as exception: + raise RuntimeError( + f"为当前线程关闭游标发生异常,{str(exception)}" + ) from exception + + # 检查当前线程有数据库连接,若有则关闭数据库连接 + if hasattr(self.threads, "connection") and self.threads.connection is not None: + # noinspection PyBroadException + try: + # 为当前线程提交事务 + self.threads.connection.commit() + # 为当前线程关闭数据库连接 + self.threads.connection.close() + self.threads.connection = None + except Exception as exception: + raise RuntimeError( + f"为当前线程关闭数据库连接发生异常,{str(exception)}" + ) from exception + + def _query_one( + self, sql: str, parameters: Tuple[Any, ...] = () + ) -> Optional[Dict[str, Any]]: + """ + 为当前线程查询并获取单行数据 + :param sql: 查询SQL语句 + :param parameters: SQL参数 + :return: 单行数据 + """ + # noinspection PyBroadException + try: + # 为当前线程创建数据库连接和游标 + self._connect() + # 检查当前线程无游标,若无则抛出异常 + if not hasattr(self.threads, "cursor") or self.threads.cursor is None: + raise RuntimeError("为当前线程创建游标发生异常") + + # 为当前线程执行SQL + self.threads.cursor.execute(sql, parameters) + return ( + None + if (result := self.threads.cursor.fetchone()) is None + else dict(result) + ) + # 若发生异常则回滚事务并抛出异常 + except Exception as exception: + # 检查当前线程有数据库连接,若有则回滚 + if ( + hasattr(self.threads, "connection") + and self.threads.connection is not None + ): + self.threads.connection.rollback() + raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception + + def _query_all( + self, sql: str, parameters: Tuple[Any, ...] = () + ) -> List[Dict[str, Any]]: + """ + 为当前线程查询并获取多行数据 + :param sql: 查询SQL语句 + :param parameters: SQL参数 + :return: 多行数据 + """ + # noinspection PyBroadException + try: + # 为当前线程创建数据库连接和游标 + self._connect() + # 检查当前线程无游标,若无则抛出异常 + if not hasattr(self.threads, "cursor") or self.threads.cursor is None: + raise RuntimeError("为当前线程创建游标发生异常") + + # 为当前线程执行SQL + self.threads.cursor.execute(sql, parameters) + result = [] + while batch := self.threads.cursor.fetchmany(1000): + result.extend([dict(row) for row in batch]) + return result + # 若发生异常则回滚事务并抛出异常 + except Exception as exception: + # 检查当前线程有数据库连接,若有则回滚 + if ( + hasattr(self.threads, "connection") + and self.threads.connection is not None + ): + self.threads.connection.rollback() + raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception + + def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool: + """ + 为当前线程执行SQL + :param sql: 新增、删除和修改SQL语句 + :param parameters: SQL参数 + :return: 执行结果 + """ + try: + self._connect() + # 检查当前线程无游标,若无则抛出异常 + if not hasattr(self.threads, "cursor") or self.threads.cursor is None: + raise RuntimeError("为当前线程创建游标发生异常") + + # 为当前线程执行SQL + self.threads.cursor.execute(sql, parameters) + # 为当前线程提交事务 + self.threads.connection.commit() + return True + # 若发生异常则回滚事务并抛出异常 + except Exception as exception: + # 检查当前线程有数据库连接,若有则回滚 + if ( + hasattr(self.threads, "connection") + and self.threads.connection is not None + ): + self.threads.connection.rollback() + raise RuntimeError("为当前线程执行SQL发生异常") from exception + + def __enter__(self): + """进入上下文管理时为当前线程创建数据库连接和游标""" + self._connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """退出上下文管理时为当前线程关闭数据库连接和游标""" + self._disconnect() + return False + + def __del__(self): + """析构时为当前线程关闭数据库连接和游标""" + self._disconnect() diff --git a/普康健康发票查验/main.py b/普康健康发票查验/main.py index c8fdc6f..c710360 100644 --- a/普康健康发票查验/main.py +++ b/普康健康发票查验/main.py @@ -18,7 +18,7 @@ import cv2 import numpy import pandas -from utils.client import Authenticator, HTTPClient, RequestException, restrict +from utils.mysql import Authenticator, HTTPClient, RequestException, restrict from utils.pandas_extension import open_csv, save_as_workbook, traverse_directory diff --git a/普康健康发票识别/main.py b/普康健康发票识别/main.py index 25bd9e3..eb7b728 100644 --- a/普康健康发票识别/main.py +++ b/普康健康发票识别/main.py @@ -22,7 +22,7 @@ from 普康健康发票查验.main import image_compression from utils.pandas_extension import traverse_directory, save_as_workbook -from utils.client import restrict, HTTPClient, RequestException, Authenticator +from utils.mysql import restrict, HTTPClient, RequestException, Authenticator if __name__ == "__main__": diff --git a/普康健康审核机器人/pageobject.py b/普康健康审核机器人/pageobject.py index a8a0d85..c5a2f0a 100644 --- a/普康健康审核机器人/pageobject.py +++ b/普康健康审核机器人/pageobject.py @@ -38,7 +38,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from utils.logger import Logger -from utils.rule_engine import FeishuMail +from utils.rules_engine import FeishuMail # 创建日志记录器 logger = Logger(logger_name="pageobject").get_logger() diff --git a/普康健康客服会话记录整合/main.py b/普康健康客服会话记录整合/main.py index e235588..7698c65 100644 --- a/普康健康客服会话记录整合/main.py +++ b/普康健康客服会话记录整合/main.py @@ -13,7 +13,7 @@ from pathlib import Path import pandas from jinja2 import Environment, FileSystemLoader -from utils.client import Authenticator, HTTPClient +from utils.mysql import Authenticator, HTTPClient # 创建目录地址对象 directory_path = Path("客服会话记录") diff --git a/API/Dockerfile b/服务/Dockerfile similarity index 100% rename from API/Dockerfile rename to 服务/Dockerfile diff --git a/API/main.py b/服务/main.py similarity index 100% rename from API/main.py rename to 服务/main.py diff --git a/API/requirements.txt b/服务/requirements.txt similarity index 100% rename from API/requirements.txt rename to 服务/requirements.txt diff --git a/短视频合成自动化/@AutomationLog.txt b/短视频合成自动化/@AutomationLog.txt deleted file mode 100644 index db78d81..0000000 --- a/短视频合成自动化/@AutomationLog.txt +++ /dev/null @@ -1,2 +0,0 @@ -2026-01-05 15:44:22.377 export.py[409] export -> Find Control Timeout(10s): {NameContains: '删除', VisibleOnly: True, ControlType: MenuItemControl} -2026-01-05 15:46:56.070 export.py[409] export -> Find Control Timeout(10s): {NameContains: '删除', VisibleOnly: True, ControlType: MenuItemControl} diff --git a/短视频合成自动化/main.py b/短视频合成自动化/main.py index 77c9ed8..dfe3a5c 100644 --- a/短视频合成自动化/main.py +++ b/短视频合成自动化/main.py @@ -10,7 +10,7 @@ if __name__ == "__main__": # 实例化 JianYingExport jianying_export = JianYingExport( materials_folder_path=r"E:\jianying\materials\260104", - draft_counts=2, + draft_counts=1, ) # 导出草稿 diff --git a/票据理赔自动化/__init__.py b/票据理赔自动化/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/票据理赔自动化/a.py b/票据理赔自动化/a.py new file mode 100644 index 0000000..ab3cf4d --- /dev/null +++ b/票据理赔自动化/a.py @@ -0,0 +1,2 @@ +a = 1 +print(a) \ No newline at end of file diff --git a/票据理赔自动化/common.py b/票据理赔自动化/common.py index d2fc51f..b3a0fcf 100644 --- a/票据理赔自动化/common.py +++ b/票据理赔自动化/common.py @@ -3,7 +3,7 @@ from pathlib import Path from masterdata import MasterData -from utils.rule_engine import RuleEngine +# from ..utils import RuleEngine # 初始化赔案档案(保险公司将提供投保公司、保险分公司和报案时间等,TPA作业系统签收后生成赔案号) dossier = { diff --git a/票据理赔自动化/image.py b/票据理赔自动化/image.py index 130b065..fc7766e 100644 --- a/票据理赔自动化/image.py +++ b/票据理赔自动化/image.py @@ -16,12 +16,15 @@ from fuzzywuzzy import fuzz from jionlp import parse_location from common import dossier, master_data, rule_engine -from utils.client import Authenticator, HTTPClient + +print(1) +exit() +from utils import Authenticator, Request # 实例化认证器 authenticator = Authenticator() # 实例化请求客户端 -http_client = HTTPClient(timeout=300, cache_enabled=True) # 使用缓存 +http_client = Request(timeout=300, cache_enabled=True) # 使用缓存 # noinspection PyShadowingNames diff --git a/票据理赔自动化/main.py b/票据理赔自动化/main.py index 8023ca4..3e53463 100644 --- a/票据理赔自动化/main.py +++ b/票据理赔自动化/main.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ -票据理赔自动化最小化实现 +票据理赔自动化 功能清单 https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink """ diff --git a/票据理赔自动化/masterdata.py b/票据理赔自动化/masterdata.py index f565f0e..6311068 100644 --- a/票据理赔自动化/masterdata.py +++ b/票据理赔自动化/masterdata.py @@ -4,10 +4,10 @@ from datetime import datetime from decimal import Decimal, ROUND_HALF_UP from typing import Any, Dict, List, Optional -from utils.client import SQLiteClient +from utils import SQLite -class MasterData(SQLiteClient): +class MasterData(SQLite): """主数据""" def __init__(self):