日常更新

from NUC
This commit is contained in:
liubiren 2026-01-06 15:12:33 +08:00
parent cd615b06da
commit 7379c992bf
23 changed files with 520 additions and 497 deletions

0
pyproject.toml Normal file
View File

View File

@ -14,7 +14,7 @@ from decimal import Decimal, ROUND_HALF_UP
import pandas
from jinja2 import Environment, FileSystemLoader
from utils.client import MySQLClient
from utils.mysql import MySQLClient
from utils.pandas_extension import DrawAsHTML

View File

@ -0,0 +1,5 @@
from mysql import MySQL
from sqlite import SQLite
from request import restrict, Authenticator, Request
from feishu import Feishu
from rules_engine import RulesEngine

242
utils/feishu.py Normal file
View File

@ -0,0 +1,242 @@
# -*- coding: utf-8 -*-
# 导入模块
import re
import time
from email.parser import BytesParser
from email.policy import default
from email.utils import parsedate_to_datetime
from imaplib import IMAP4_SSL
import pandas
class Feishu:
"""飞书客户端"""
def __init__(self):
self.authenticator = Authenticator()
self.http_client = HTTPClient()
def _headers(self):
"""请求头"""
# 装配飞书访问凭证
return {
"Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}",
}
@staticmethod
def get_verification_code():
try:
# 执行时间戳
execute_timestamp = time.time()
# 超时时间戳
timeout_timestamp = execute_timestamp + 65
# 建立加密IMAP连接
server = IMAP4_SSL("imap.feishu.cn", 993)
# 登录
server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2")
while True:
# 若当前时间戳大于超时时间戳则返回NONE
if time.time() <= timeout_timestamp:
# 等待10秒
time.sleep(10)
# 选择文件夹(邮箱验证码)
server.select("&kK57sZqMi8F4AQ-")
# noinspection PyBroadException
try:
# 获取最后一封邮件索引server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引
index = server.search(None, "ALL")[1][0].split()[-1]
# 获取最后一封邮件内容并解析server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文
# noinspection PyUnresolvedReferences
contents = BytesParser(policy=default).parsebytes(
server.fetch(index, "(RFC822)")[1][0][1]
)
# 遍历邮件内容若正文内容类型为纯文本或HTML则解析发送时间和验证码
for content in contents.walk():
if (
content.get_content_type() == "text/plain"
or content.get_content_type() == "text/html"
):
# 邮件发送时间戳
# noinspection PyUnresolvedReferences
send_timestamp = parsedate_to_datetime(
content["Date"]
).timestamp()
# 若邮件发送时间戳大于执行时间戳则解析验证码并返回
if (
execute_timestamp
> send_timestamp
>= execute_timestamp - 35
):
# 登出
server.logout()
# 解析验证码
return re.search(
r"【普康健康】您的验证码是:(\d+)",
content.get_payload(decode=True).decode(),
).group(1)
# 若文件夹无邮件则继续
except:
pass
# 若超时则登出
else:
server.logout()
return None
except Exception:
raise RuntimeError("获取邮箱验证码发生其它异常")
# 查询多维表格记录单次最多查询500条记录
@restrict(refill_rate=5, max_tokens=5)
def query_bitable_records(
self,
bitable: str,
table_id: str,
field_names: Optional[list[str]] = None,
filter_conditions: Optional[dict] = None,
) -> pandas.DataFrame:
# 先查询多维表格记录,在根据字段解析记录
# 装配多维表格查询记录地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20"
response = self.http_client.post(
url=url,
headers=self._headers(),
json={"field_names": field_names, "filter": filter_conditions},
)
# 响应业务码为0则定义为响应成功
assert response.get("code") == 0, "查询多维表格记录发生异常"
# 多维表格记录
records = response.get("data").get("items")
# 检查响应中是否包含还有下一页标识,若有则继续请求下一页
while response.get("data").get("has_more"):
url_next = url + "&page_token={}".format(
response.get("data").get("page_token")
)
response = self.http_client.post(
url=url_next,
headers=self._headers(),
json={"field_names": field_names, "filter": filter_conditions},
)
assert response.get("code") == 0, "查询多维表格记录发生异常"
# 合并记录
records.append(response.get("data").get("items"))
# 装配多维表格列出字段地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20"
response = self.http_client.get(
url=url,
headers=self._headers(),
)
assert response.get("code") == 0, "列出多维表格字段发生异常"
# 多维表格字段
fields = response.get("data").get("items")
while response.get("data").get("has_more"):
url_next = url + "&page_token={}".format(
response.get("data").get("page_token")
)
response = self.http_client.get(
url=url_next,
headers=self._headers(),
)
assert response.get("code") == 0, "列出多维表格字段发生异常"
fields.append(response.get("data").get("items"))
# 字段映射
field_mappings = {}
for field in fields:
# 字段名
field_name = field["field_name"]
# 根据字段类型匹配
match field["type"]:
case 1005:
field_type = "主键"
case 1:
field_type = "文本"
case 3:
field_type = "单选"
case 2:
# 数字、公式字段的显示格式
match field["property"]["formatter"]:
case "0":
field_type = "整数"
case _:
raise ValueError("未设置数字、公式字段的显示格式")
case _:
raise ValueError("未设置字段类型")
# noinspection PyUnboundLocalVariable
field_mappings.update({field_name: field_type})
# 记录数据体
records_data = []
# 解析记录
for record in records:
# 单条记录数据体
record_data = {}
for field_name, content in record["fields"].items():
match field_mappings[field_name]:
case "主键" | "单选" | "整数":
record_data.update({field_name: content})
case "文本":
# 若存在多行文本则拼接
fragments_content = ""
for fragment_content in content:
fragments_content += fragment_content["text"]
record_data.update({field_name: fragments_content})
case _:
raise ValueError("未设置字段解析方法")
records_data.append(record_data)
return pandas.DataFrame(records_data)

66
utils/mysql.py Normal file
View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# 导入模块
from urllib.parse import quote_plus
import pandas
from sqlalchemy import create_engine, text
class MySQL:
"""
MySQL客户端
"""
def __init__(
self,
database: str,
host: str = "cdb-7z9lzx4y.cd.tencentcdb.com", # 默认为刘弼仁的腾讯云MySQL数据库
port: int = 10039,
username: str = "root",
password: str = "Te198752",
) -> None:
"""
初始化
:param database: 数据库名称
:param host: 主机
:param port: 端口
:param username: 用户名
:param password: 登录密码
"""
# 就登录密码编码
password = quote_plus(password)
# 构建数据库连接
connection_url = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8"
# 创建MySQL引擎并连接数据库
self.engine = create_engine(
connection_url,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
pool_pre_ping=True,
) # 连接池中保持打开连接的数量为5额外连接数为10连接1小时后重新回收重连检查连接有效性
def execute_query(self, sql: str) -> pandas.DataFrame:
"""执行SQL查询并返回DATAFRAME"""
if not hasattr(self, "engine") or not self.engine:
raise ConnectionError("未创建数据库连接")
try:
with self.engine.connect() as connection:
dataframe = pandas.read_sql_query(
text(sql), connection, coerce_float=False
) # 不尝试将非整数数值转为浮点维持DECIMAL
return dataframe
except:
connection.rollback()
raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常")
def __del__(self):
"""析构时自动关闭连接"""
if hasattr(self, "engine") and self.engine:
self.engine.dispose()

View File

@ -5,264 +5,18 @@
import hashlib
import hmac
import json
import re
import sqlite3
import threading
import time
from email.parser import BytesParser
from email.policy import default
from email.utils import parsedate_to_datetime
from functools import wraps
from imaplib import IMAP4_SSL
from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Literal, Optional, Tuple, Union
from urllib.parse import quote_plus
from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union
from xml.etree import ElementTree
import pandas
from pydantic import BaseModel, Field, HttpUrl, model_validator
from requests import Response, Session
from requests.adapters import HTTPAdapter
from sqlalchemy import create_engine, text
from urllib3.util.retry import Retry
"""
封装sqlalchemy实现按照SQL查询
类和函数的区别
类作为对象的模板定义了对象的结构和行为而函数则作为实现特定功能或操作的代码块提高了代码的可读性和可维护性
使用方法
with MySQLClient(database='data_analysis') as client:
dataframe = client.execute_query(sql='select * from {{}}')')
"""
class MySQLClient:
"""MySQL客户端"""
def __init__(
self,
database,
host: str = "cdb-7z9lzx4y.cd.tencentcdb.com",
port: int = 10039,
username: str = "root",
password: str = "Te198752",
): # 默认为刘弼仁的腾讯云MySQL数据库
# 数据库登录密码安全编码
password = quote_plus(password)
# 构建数据库连接字符
connect_config = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8"
# 创建MySQL引擎并连接数据库
self.engine = create_engine(
connect_config,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
pool_pre_ping=True,
) # 连接池中保持打开连接的数量为5额外连接数为10连接1小时后重新回收重连检查连接有效性
def __del__(self):
"""析构时自动关闭连接"""
if hasattr(self, "engine") and self.engine:
self.engine.dispose()
def execute_query(self, sql: str) -> pandas.DataFrame:
"""执行SQL查询并返回DATAFRAME"""
if not hasattr(self, "engine") or not self.engine:
raise ConnectionError("未创建数据库连接")
try:
with self.engine.connect() as connection:
dataframe = pandas.read_sql_query(
text(sql), connection, coerce_float=False
) # 不尝试将非整数数值转为浮点维持DECIMAL
return dataframe
except:
connection.rollback()
raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常")
class SQLiteClient:
"""SQLite客户端"""
def __init__(self, database: Union[str, Path]):
"""
初始化SQLite客户端
:param database: 数据库
"""
self.database = database
# 初始化本地线程存储
self.threads = threading.local()
def _connect(self):
"""为当前线程创建数据库连接和游标"""
# 检查当前线程有数据库连接,若有则继续否则创建数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
return
# 为当前线程关闭数据库连接和游标
self._disconnect()
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接
self.threads.connection = sqlite3.connect(
database=self.database,
check_same_thread=True,
timeout=30, # 数据库锁超时时间单位默认为30秒避免并发锁死
)
# 开启行映射,支持按照字段名取值
self.threads.connection.row_factory = sqlite3.Row
# 为当前线程创建游标
self.threads.cursor = self.threads.connection.cursor()
except Exception as exception:
self.threads.connection = None
self.threads.cursor = None
raise RuntimeError(
f"为当前线程创建数据库连接和游标发生异常,{str(exception)}"
) from exception
def _disconnect(self) -> None:
"""为当前线程关闭数据库连接和游标"""
# 检查当前线程有游标,若有则关闭游标
if hasattr(self.threads, "cursor") and self.threads.cursor is not None:
# noinspection PyBroadException
try:
# 为当前线程关闭游标
self.threads.cursor.close()
self.threads.cursor = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭游标发生异常,{str(exception)}"
) from exception
# 检查当前线程有数据库连接,若有则关闭数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
# noinspection PyBroadException
try:
# 为当前线程提交事务
self.threads.connection.commit()
# 为当前线程关闭数据库连接
self.threads.connection.close()
self.threads.connection = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭数据库连接发生异常,{str(exception)}"
) from exception
def _query_one(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> Optional[Dict[str, Any]]:
"""
为当前线程查询并获取单行数据
:param sql: 查询SQL语句
:param parameters: SQL参数
:return: 单行数据
"""
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
return (
None
if (result := self.threads.cursor.fetchone()) is None
else dict(result)
)
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _query_all(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> List[Dict[str, Any]]:
"""
为当前线程查询并获取多行数据
:param sql: 查询SQL语句
:param parameters: SQL参数
:return: 多行数据
"""
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
result = []
while batch := self.threads.cursor.fetchmany(1000):
result.extend([dict(row) for row in batch])
return result
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool:
"""
为当前线程执行SQL
:param sql: 新增删除和修改SQL语句
:param parameters: SQL参数
:return: 执行结果
"""
try:
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
# 为当前线程提交事务
self.threads.connection.commit()
return True
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程执行SQL发生异常") from exception
def __enter__(self):
"""进入上下文管理时为当前线程创建数据库连接和游标"""
self._connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文管理时为当前线程关闭数据库连接和游标"""
self._disconnect()
return False
def __del__(self):
"""析构时为当前线程关闭数据库连接和游标"""
self._disconnect()
# 基于令牌桶限流算法的装饰器
def restrict(refill_rate: float = 5.0, max_tokens: int = 5):
@ -357,7 +111,7 @@ def restrict(refill_rate: float = 5.0, max_tokens: int = 5):
return decorator
class HTTPClient:
class Request:
"""请求客户端"""
class RequestException(Exception):
@ -915,233 +669,3 @@ class Authenticator:
)
return token
class FeishuClient:
"""飞书客户端"""
def __init__(self):
self.authenticator = Authenticator()
self.http_client = HTTPClient()
def _headers(self):
"""请求头"""
# 装配飞书访问凭证
return {
"Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}",
}
@staticmethod
def get_verification_code():
try:
# 执行时间戳
execute_timestamp = time.time()
# 超时时间戳
timeout_timestamp = execute_timestamp + 65
# 建立加密IMAP连接
server = IMAP4_SSL("imap.feishu.cn", 993)
# 登录
server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2")
while True:
# 若当前时间戳大于超时时间戳则返回NONE
if time.time() <= timeout_timestamp:
# 等待10秒
time.sleep(10)
# 选择文件夹(邮箱验证码)
server.select("&kK57sZqMi8F4AQ-")
# noinspection PyBroadException
try:
# 获取最后一封邮件索引server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引
index = server.search(None, "ALL")[1][0].split()[-1]
# 获取最后一封邮件内容并解析server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文
# noinspection PyUnresolvedReferences
contents = BytesParser(policy=default).parsebytes(
server.fetch(index, "(RFC822)")[1][0][1]
)
# 遍历邮件内容若正文内容类型为纯文本或HTML则解析发送时间和验证码
for content in contents.walk():
if (
content.get_content_type() == "text/plain"
or content.get_content_type() == "text/html"
):
# 邮件发送时间戳
# noinspection PyUnresolvedReferences
send_timestamp = parsedate_to_datetime(
content["Date"]
).timestamp()
# 若邮件发送时间戳大于执行时间戳则解析验证码并返回
if (
execute_timestamp
> send_timestamp
>= execute_timestamp - 35
):
# 登出
server.logout()
# 解析验证码
return re.search(
r"【普康健康】您的验证码是:(\d+)",
content.get_payload(decode=True).decode(),
).group(1)
# 若文件夹无邮件则继续
except:
pass
# 若超时则登出
else:
server.logout()
return None
except Exception:
raise RuntimeError("获取邮箱验证码发生其它异常")
# 查询多维表格记录单次最多查询500条记录
@restrict(refill_rate=5, max_tokens=5)
def query_bitable_records(
self,
bitable: str,
table_id: str,
field_names: Optional[list[str]] = None,
filter_conditions: Optional[dict] = None,
) -> pandas.DataFrame:
# 先查询多维表格记录,在根据字段解析记录
# 装配多维表格查询记录地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20"
response = self.http_client.post(
url=url,
headers=self._headers(),
json={"field_names": field_names, "filter": filter_conditions},
)
# 响应业务码为0则定义为响应成功
assert response.get("code") == 0, "查询多维表格记录发生异常"
# 多维表格记录
records = response.get("data").get("items")
# 检查响应中是否包含还有下一页标识,若有则继续请求下一页
while response.get("data").get("has_more"):
url_next = url + "&page_token={}".format(
response.get("data").get("page_token")
)
response = self.http_client.post(
url=url_next,
headers=self._headers(),
json={"field_names": field_names, "filter": filter_conditions},
)
assert response.get("code") == 0, "查询多维表格记录发生异常"
# 合并记录
records.append(response.get("data").get("items"))
# 装配多维表格列出字段地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20"
response = self.http_client.get(
url=url,
headers=self._headers(),
)
assert response.get("code") == 0, "列出多维表格字段发生异常"
# 多维表格字段
fields = response.get("data").get("items")
while response.get("data").get("has_more"):
url_next = url + "&page_token={}".format(
response.get("data").get("page_token")
)
response = self.http_client.get(
url=url_next,
headers=self._headers(),
)
assert response.get("code") == 0, "列出多维表格字段发生异常"
fields.append(response.get("data").get("items"))
# 字段映射
field_mappings = {}
for field in fields:
# 字段名
field_name = field["field_name"]
# 根据字段类型匹配
match field["type"]:
case 1005:
field_type = "主键"
case 1:
field_type = "文本"
case 3:
field_type = "单选"
case 2:
# 数字、公式字段的显示格式
match field["property"]["formatter"]:
case "0":
field_type = "整数"
case _:
raise ValueError("未设置数字、公式字段的显示格式")
case _:
raise ValueError("未设置字段类型")
# noinspection PyUnboundLocalVariable
field_mappings.update({field_name: field_type})
# 记录数据体
records_data = []
# 解析记录
for record in records:
# 单条记录数据体
record_data = {}
for field_name, content in record["fields"].items():
match field_mappings[field_name]:
case "主键" | "单选" | "整数":
record_data.update({field_name: content})
case "文本":
# 若存在多行文本则拼接
fragments_content = ""
for fragment_content in content:
fragments_content += fragment_content["text"]
record_data.update({field_name: fragments_content})
case _:
raise ValueError("未设置字段解析方法")
records_data.append(record_data)
return pandas.DataFrame(records_data)

View File

@ -1,9 +1,5 @@
# -*- coding: utf-8 -*-
"""
封装ZenEngine
"""
# 导入模块
from datetime import datetime
@ -14,7 +10,7 @@ from typing import Any, Dict
from zen import ZenDecision, ZenEngine
class RuleEngine:
class RulesEngine:
"""
规则引擎实现打开并读取规则根据规则和输入评估并输出
"""

187
utils/sqlite.py Normal file
View File

@ -0,0 +1,187 @@
# -*- coding: utf-8 -*-
# 导入模块
import sqlite3
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
class SQLite:
"""
SQLite客户端
"""
def __init__(self, database: Union[str, Path]):
"""
初始化
:param database: 数据库地址
"""
self.database = database
# 初始化本地线程存储
self.threads = threading.local()
def _connect(self):
"""为当前线程创建数据库连接和游标"""
# 检查当前线程有数据库连接,若有则继续否则创建数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
return
# 为当前线程关闭数据库连接和游标
self._disconnect()
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接
self.threads.connection = sqlite3.connect(
database=self.database,
check_same_thread=True,
timeout=30, # 数据库锁超时时间单位默认为30秒避免并发锁死
)
# 开启行映射,支持按照字段名取值
self.threads.connection.row_factory = sqlite3.Row
# 为当前线程创建游标
self.threads.cursor = self.threads.connection.cursor()
except Exception as exception:
self.threads.connection = None
self.threads.cursor = None
raise RuntimeError(
f"为当前线程创建数据库连接和游标发生异常,{str(exception)}"
) from exception
def _disconnect(self) -> None:
"""为当前线程关闭数据库连接和游标"""
# 检查当前线程有游标,若有则关闭游标
if hasattr(self.threads, "cursor") and self.threads.cursor is not None:
# noinspection PyBroadException
try:
# 为当前线程关闭游标
self.threads.cursor.close()
self.threads.cursor = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭游标发生异常,{str(exception)}"
) from exception
# 检查当前线程有数据库连接,若有则关闭数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
# noinspection PyBroadException
try:
# 为当前线程提交事务
self.threads.connection.commit()
# 为当前线程关闭数据库连接
self.threads.connection.close()
self.threads.connection = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭数据库连接发生异常,{str(exception)}"
) from exception
def _query_one(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> Optional[Dict[str, Any]]:
"""
为当前线程查询并获取单行数据
:param sql: 查询SQL语句
:param parameters: SQL参数
:return: 单行数据
"""
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
return (
None
if (result := self.threads.cursor.fetchone()) is None
else dict(result)
)
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _query_all(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> List[Dict[str, Any]]:
"""
为当前线程查询并获取多行数据
:param sql: 查询SQL语句
:param parameters: SQL参数
:return: 多行数据
"""
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
result = []
while batch := self.threads.cursor.fetchmany(1000):
result.extend([dict(row) for row in batch])
return result
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool:
"""
为当前线程执行SQL
:param sql: 新增删除和修改SQL语句
:param parameters: SQL参数
:return: 执行结果
"""
try:
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
# 为当前线程提交事务
self.threads.connection.commit()
return True
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程执行SQL发生异常") from exception
def __enter__(self):
"""进入上下文管理时为当前线程创建数据库连接和游标"""
self._connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文管理时为当前线程关闭数据库连接和游标"""
self._disconnect()
return False
def __del__(self):
"""析构时为当前线程关闭数据库连接和游标"""
self._disconnect()

View File

@ -18,7 +18,7 @@ import cv2
import numpy
import pandas
from utils.client import Authenticator, HTTPClient, RequestException, restrict
from utils.mysql import Authenticator, HTTPClient, RequestException, restrict
from utils.pandas_extension import open_csv, save_as_workbook, traverse_directory

View File

@ -22,7 +22,7 @@ from 普康健康发票查验.main import image_compression
from utils.pandas_extension import traverse_directory, save_as_workbook
from utils.client import restrict, HTTPClient, RequestException, Authenticator
from utils.mysql import restrict, HTTPClient, RequestException, Authenticator
if __name__ == "__main__":

View File

@ -38,7 +38,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from utils.logger import Logger
from utils.rule_engine import FeishuMail
from utils.rules_engine import FeishuMail
# 创建日志记录器
logger = Logger(logger_name="pageobject").get_logger()

View File

@ -13,7 +13,7 @@ from pathlib import Path
import pandas
from jinja2 import Environment, FileSystemLoader
from utils.client import Authenticator, HTTPClient
from utils.mysql import Authenticator, HTTPClient
# 创建目录地址对象
directory_path = Path("客服会话记录")

View File

@ -1,2 +0,0 @@
2026-01-05 15:44:22.377 export.py[409] export -> Find Control Timeout(10s): {NameContains: '删除', VisibleOnly: True, ControlType: MenuItemControl}
2026-01-05 15:46:56.070 export.py[409] export -> Find Control Timeout(10s): {NameContains: '删除', VisibleOnly: True, ControlType: MenuItemControl}

View File

@ -10,7 +10,7 @@ if __name__ == "__main__":
# 实例化 JianYingExport
jianying_export = JianYingExport(
materials_folder_path=r"E:\jianying\materials\260104",
draft_counts=2,
draft_counts=1,
)
# 导出草稿

View File

View File

@ -0,0 +1,2 @@
a = 1
print(a)

View File

@ -3,7 +3,7 @@
from pathlib import Path
from masterdata import MasterData
from utils.rule_engine import RuleEngine
# from ..utils import RuleEngine
# 初始化赔案档案保险公司将提供投保公司、保险分公司和报案时间等TPA作业系统签收后生成赔案号
dossier = {

View File

@ -16,12 +16,15 @@ from fuzzywuzzy import fuzz
from jionlp import parse_location
from common import dossier, master_data, rule_engine
from utils.client import Authenticator, HTTPClient
print(1)
exit()
from utils import Authenticator, Request
# 实例化认证器
authenticator = Authenticator()
# 实例化请求客户端
http_client = HTTPClient(timeout=300, cache_enabled=True) # 使用缓存
http_client = Request(timeout=300, cache_enabled=True) # 使用缓存
# noinspection PyShadowingNames

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
"""
票据理赔自动化最小化实现
票据理赔自动化
功能清单
https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink
"""

View File

@ -4,10 +4,10 @@ from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
from typing import Any, Dict, List, Optional
from utils.client import SQLiteClient
from utils import SQLite
class MasterData(SQLiteClient):
class MasterData(SQLite):
"""主数据"""
def __init__(self):