日常更新

明天入职新公司
This commit is contained in:
liubiren 2025-12-06 23:37:20 +08:00
parent 973ca8113c
commit ae932ab487
5 changed files with 301 additions and 463 deletions

View File

@ -1 +1 @@
{"szkt": {"token": "a62c56ace614a6546191d5af8ca8b1513cfaeaea7ce67d0a37de994ab6c2aa4e2a0b058e0da575ff376dd51dc19c5ad353ab2761cb6d9db4d521b83adeee2979b78f7ae70765b26985165b6266d084b75f2f918008966e72a116d8bca5ec4c7cecc5223f78fa47b4d40aa9cf5277a11b0b967ad06e84ef7c4acbc53ccdef936c062b2d037ae0dad8c29d50426b668ec349cc8c0099a0270e16f97d31e4f058bc086334468f88d934c7fd1464ed3800833d2f486dc06f0689b99abbb78a8ebf4a3877bd82d0dd765dc09b7a1594fa8849d51f59282a81048c52e82e8320d1ad042a6c307ca831647cba4356564704780f", "expired_timestamp": 1759201579.393386}}
{"szkt": {"token": "a62c56ace614a6546191d5af8ca8b1513cfaeaea7ce67d0a37de994ab6c2aa4e2a0b058e0da575ff376dd51dc19c5ad353ab2761cb6d9db4d521b83adeee2979b78f7ae70765b26985165b6266d084b75f2f918008966e72a116d8bca5ec4c7cecc5223f78fa47b4d40aa9cf5277a11b0b967ad06e84ef7c4acbc53ccdef936c062b2d037ae0dad8c29d50426b668ec349cc8c0099a0270e16f97d31e4f058bc086334468f88d934c7fd1464ed3800833d2f486dc06f0689b99abbb78a8ebf4a3877bd82d0dd765dc09b7a1594fa8849d51f59282a81048c52e82e8320d1ad042a6c307ca831647cba4356564704780f", "expired_timestamp": 1859201579.393386}}

View File

@ -103,7 +103,7 @@ class CacheClient:
try:
# 创建缓存数据库连接
self.connection = sqlite3.connect(
database=database,
database=(Path(__file__).parent.resolve() / database), # 当前目录下创建缓存数据库
check_same_thread=False,
timeout=30, # 缓存数据库锁超时时间单位默认为30秒避免并发锁死
)
@ -164,7 +164,6 @@ class CacheClient:
(guid, time.time() - self.cache_ttl * 86400),
)
if result := cursor.fetchone():
print()
return json.loads(result[0])
return None
# 若发生异常则回滚事务并返回None
@ -218,19 +217,10 @@ class CacheClient:
pass
"""
封装urllib.request的相关操作
使用方法
client = HTTPClient()
response = client.post(url)
"""
class TokenBucket:
def __init__(self, refill_rate, max_tokens):
"""令牌桶,基于令牌桶算法限制请求频率"""
# 填充令牌速率(个/秒)
self.refill_rate = refill_rate
# 令牌桶最大令牌数
@ -243,18 +233,15 @@ class TokenBucket:
# 获取令牌
# noinspection PyMissingReturnStatement
def acquire(self) -> tuple[bool, float]:
with threading.Lock():
# 本次填充令牌时间戳
refill_timestamp = time.monotonic()
# 重新计算令牌桶中令牌数
self.tokens = min(
self.max_tokens,
self.tokens
+ self.refill_rate * (refill_timestamp - self.refill_timestamp),
)
self.refill_timestamp = refill_timestamp
# 若令牌桶当前令牌数大于等于1则减少令牌
@ -323,7 +310,6 @@ class Arguments(BaseModel):
:param stream: 是否启用流式传输
:param guid: 全局唯一标识
"""
# 统一资源定位符
url: HttpUrl = Field(default=...)
# 查询参数
@ -365,43 +351,38 @@ class Arguments(BaseModel):
return self
# HTTP客户端
class HTTPClient:
"""请求客户端"""
def __init__(
self,
timeout: int = 60,
default_headers: Optional[Dict[str, str]] = None,
total: int = 3,
backoff_factor: float = 0.5,
timeout: int = 60,
cache_enabled: bool = False,
cache_ttl: int = 360,
):
"""
:param timeout: 超时时间单位为秒
:param default_headers: 默认请求头
:param total: 最大重试次数
:param backoff_factor: 重试间隔退避因子
:param cache_enabled: 是否使用缓存
:param timeout: 超时时间单位为秒
:param cache_enabled: 使用缓存
:param cache_ttl: 缓存生存时间单位为天
"""
# 超时时间
self.timeout = timeout
# 创建HTTP会话并挂载适配器
# 创建请求会话并挂载适配器
self.session = self._create_session(
default_headers=default_headers, total=total, backoff_factor=backoff_factor
)
# 是否使用缓存
# 初始化超时时间
self.timeout = timeout
# 初始化使用缓存
self.cache_enabled = cache_enabled
# 缓存生存时间
self.cache_ttl = cache_ttl
# 若使用缓存,则初始化缓存数据库
if self.cache_enabled:
self._initialize_cache_database()
# 初始化缓存生存时间,单位为秒
self.cache_ttl = cache_ttl * 24 * 60 * 60
# 创建HTTP会话并挂载适配器
# 创建请求会话并挂载适配器
@staticmethod
def _create_session(
total: int,
@ -409,11 +390,11 @@ class HTTPClient:
default_headers: Optional[Dict[str, str]] = None,
) -> Session:
"""
:param default_headers 默认请求头
:param total 最大重试次数
:param backoff_factor 重试间隔退避因子
:param default_headers: 默认请求头
:param total: 最大重试次数
:param backoff_factor: 重试间隔退避因子
:return Session: 会话对象
"""
# 创建会话对象
session = Session()
@ -429,7 +410,7 @@ class HTTPClient:
502,
503,
504,
], # 408:请求超时502:网关错误503:服务不可用504:网关超时
], # 408为请求超时502为网关错误503为服务不可用504为网关超时
total=total,
respect_retry_after_header=True,
backoff_factor=backoff_factor,
@ -444,94 +425,21 @@ class HTTPClient:
return session
def _initialize_cache_database(self):
"""初始化缓存数据库"""
# 创建缓存数据库连接使用SQLite
self.cache_connection = sqlite3.connect(
database="SQLite.db", check_same_thread=False
)
self.cache_connection.execute(
"""CREATE TABLE IF NOT EXISTS caches (guid TEXT PRIMARY KEY, response TEXT, timestamp REAL)"""
)
# 创建时间戳索引
self.cache_connection.execute(
"""CREATE INDEX IF NOT EXISTS index_timestamp ON caches(timestamp)"""
)
# 删除过期缓存
self.cache_connection.execute(
"DELETE FROM caches WHERE timestamp < ?",
(time.time() - self.cache_ttl * 86400,), # 缓存生存时间单位转为秒
)
# 提交事物
self.cache_connection.commit()
# 在缓存数据库查询响应
def _query_response(self, guid: str) -> Optional[Dict]:
with threading.Lock():
cursor = None
try:
# 创建游标
cursor = self.cache_connection.cursor()
# 根据请求唯一标识查询响应
cursor.execute(
"SELECT response FROM caches WHERE guid = ? AND timestamp >= ?",
(guid, time.time() - self.cache_ttl * 86400),
)
if result := cursor.fetchone():
return json.loads(result[0])
return None
# 若发生异常则返回NONE
except:
self.cache_connection.rollback()
return None
finally:
if cursor:
cursor.close()
# 将响应保存至缓存数据库
def _save_response(self, guid: str, response: Dict):
with threading.Lock():
cursor = None
try:
# 创建游标
cursor = self.cache_connection.cursor()
# 新增或覆盖响应
cursor.execute(
"INSERT OR REPLACE INTO caches (guid, response, timestamp) VALUES (?, ?, ?)",
(guid, json.dumps(response, ensure_ascii=False), time.time()),
)
# 提交事物
self.cache_connection.commit()
# 若发生异常则返回NONE
except:
self.cache_connection.rollback()
finally:
if cursor:
cursor.close()
# GET请求
def get(self, **kwargs) -> Union[Dict, str]:
return self._request(method="GET", arguments=Arguments(**kwargs))
# POST请求
def post(self, **kwargs) -> Union[Dict, str]:
return self._request(method="POST", arguments=Arguments(**kwargs))
# 文件下载
# DOWNLOAD请求
def download(
self, stream=False, chunk_size=1024, **kwargs
) -> Union[Dict, str, Generator[bytes, None, None]]:
response = self._request(
method="GET", arguments=Arguments(**{"stream": stream, **kwargs})
)
# 若禁用流式传输,则返回响应
if not stream:
return response
@ -539,100 +447,91 @@ class HTTPClient:
return self._process_stream_response(response=response, chunk_size=chunk_size)
def _request(self, method: Literal["GET", "POST"], arguments: Arguments) -> Any:
"""发送请求"""
"""请求"""
# 请求参数模型
arguments = arguments.model_dump(exclude_none=True, by_alias=True)
# URL转为字符串
# URL对象转为字符串
arguments["url"] = str(arguments["url"])
# 重构表单数据
if arguments.get("data") is not None:
arguments["data"] = {
key: value
for key, value in arguments["data"].items()
if value is not None
k: v
for k, v in arguments["data"].items()
if v is not None
}
# 重构JSON格式数据
# 重构JSON数据
if arguments.get("json_data") is not None:
arguments["json_data"] = {
key: value
for key, value in arguments["json_data"].items()
if value is not None
k: v
for k, v in arguments["json_data"].items()
if v is not None
}
# 重构文件数据
if arguments.get("files") is not None:
files_valid = {}
# 遍历文件数据键值对
for key, value in arguments["files"].items():
if isinstance(value, (tuple, list)):
match len(value):
# 若文件数据包括文件名称和文件内容
files = {}
for k, v in arguments["files"].items():
if isinstance(v, (tuple, list)):
match len(v):
# 文件数据包括文件名称和文件内容
case 2:
files_valid[key] = (value[0], value[1], None, None)
# 文件数据包含文件名称、文件内容和内容类型
files[k] = (v[0], v[1], None, None)
# 文件数据包含文件名称、文件内容和内容类型
case 3:
files_valid[key] = (value[0], value[1], value[2], None)
# 文件数据包含文件名称、文件内容、内容类型和请求头
files[k] = (v[0], v[1], v[2], None)
# 文件数据包含文件名称、文件内容、内容类型和请求头
case 4:
files_valid[key] = (value[0], value[1], value[2], value[3])
arguments.update({"files": files_valid})
files[k] = (v[0], v[1], v[2], v[3])
arguments.update({"files": files})
# 全局唯一标识
guid = arguments.pop("guid", None)
# 若使用缓存且本次请求参数包含全局唯一标识,则优先返回缓存数据库中响应
# 若使用缓存且全局唯一标识非空则查询缓存
if self.cache_enabled and guid is not None:
# 在缓存数据库查询响应
response = self._query_response(guid=guid)
# 若缓存响应非空则返回
if response is not None:
return response
with CacheClient(cache_ttl=self.cache_ttl) as cache_client:
cache = cache_client.query(guid)
# 若缓存非空则返回
if cache is not None:
return cache
try:
# 发送请求
response = self.session.request(
method=method, timeout=self.timeout, **arguments
)
# 若返回错误状态码则抛出异常
response.raise_for_status()
# 处理响应
response = self._process_response(response=response)
# 若请求全局唯一标识非NONE则响应保存至缓存数据库
# noinspection PyUnboundLocalVariable
if guid is not None:
# noinspection PyUnboundLocalVariable
self._save_response(guid=guid, response=response)
# 若使用缓存且全局唯一标识非空则更新缓存
if self.cache_enabled and guid is not None:
with CacheClient(cache_ttl=self.cache_ttl) as cache_client:
cache_client.update(guid, response)
return response
except Exception as exception:
# 尝试根据响应解析响应状态码和错误信息,否则进行构造
# 尝试根据响应解析错误状态码和错误信息,否则进行构造
# noinspection PyBroadException
try:
# JSON反序列化
# noinspection PyUnboundLocalVariable
# 响应反序列化
response_decoded = response.json()
# 响应状态码
# 错误状态码
status = response_decoded["status"]
# 错误信息
message = response_decoded["message"]
except:
status = getattr(getattr(exception, "response", None), "status", None)
url = arguments["url"]
message = str(exception).split("\n")[0]
# 重新构建错误信息
message = f"{method} {url} failed: {message}"
# 重构错误信息
message = f"{method} {arguments["url"]} failed: {str(exception).split("\n")[0]}"
raise RequestException(status=status, message=message)
# 处理响应
@staticmethod
def _process_response(response: Response) -> Any:
# 响应内容
content = response.content
# 若响应内容为空则返回NONE
@ -643,12 +542,11 @@ class HTTPClient:
content_type = (
response.headers.get("Content-Type", "").split(";")[0].strip().lower()
)
# 根据内容类型匹配解析返回内容方法
# noinspection PyUnreachableCode
match content_type:
case "application/json" | "text/json":
# JSON反序列化
# 响应反序列化
return response.json()
case "application/xml" | "text/xml":
# 解析为XMLELEMENT对象
@ -667,7 +565,6 @@ class HTTPClient:
def _process_stream_response(
response: Response, chunk_size: int
) -> Generator[bytes, None, None]: # 生成器不接受发SEND发送至、结束时返回NONE
# 检查数据分块
if not isinstance(chunk_size, int) and isinstance(chunk_size, bool):
raise ValueError("chunk_size must type=int")
@ -689,14 +586,12 @@ class Authenticator:
self,
):
"""认证器(用于获取访问令牌)"""
# 初始化
self._initialize()
def _initialize(self):
"""初始化访问凭证"""
# 创建访问凭证地址对象
"""初始化"""
# 初始化访问凭证地址对象
self.certifications_path = (
Path(__file__).parent.resolve() / "certifications.json"
)
@ -709,40 +604,34 @@ class Authenticator:
ensure_ascii=False,
)
# 初始化HTTP客户端
# 初始化请求客户端
self.http_client = HTTPClient()
def _szkt_get_certification(self) -> tuple[str, float]:
"""获取深圳快瞳访问凭证"""
# 请求深圳快瞳访问凭证获取接口
response = self.http_client.get(
url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d"
)
# 若响应成功则抛出异常
# 若响应成功则抛出异常
if not (response["status"] == 200 and response["code"] == 0):
raise RuntimeError("获取深圳快瞳访问凭证发生异常")
# 返回令牌、失效时间戳
# 返回访问令牌、失效时间戳
# noinspection PyTypeChecker
return (
response["data"]["access_token"],
time.time() + response["data"]["expires_in"],
)
def _hlyj_get_certification(self) -> tuple[str, float]:
def _hlyj_get_certification(self) -> Tuple[str, float]:
"""获取合力亿捷访问凭证"""
# 企业访问标识
access_key_id = "25938f1c190448829dbdb5d344231e42"
# 签名秘钥
secret_access_key = "44dc0299aff84d68ae27712f8784f173"
# 时间戳(秒级)
timestamp = int(time.time())
# 签名企业访问标识、签名秘钥和时间戳拼接后计算的十六进制的HMAC-SHA256
signature = hmac.new(
secret_access_key.encode("utf-8"),
@ -750,26 +639,23 @@ class Authenticator:
hashlib.sha256,
).hexdigest()
# 请求合力亿捷访问凭证获取接口
response = self.http_client.get(
url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}&timestamp={timestamp}&signature={signature}"
)
# 若响应成功则抛出异常
# 若响应成功则抛出异常
if not response["success"]:
raise RuntimeError("获取合力亿捷访问凭证发生异常")
# 返回令牌、失效时间戳
# 返回访问令牌、失效时间戳
# noinspection PyTypeChecker
return (
response["data"],
time.time() + 3600, # 访问令牌有效期为1小时
time.time() + 1 * 60 * 60, # 访问令牌有效期为1小时
)
def _feishu_get_certification(self) -> tuple[str, float]:
"""获取飞书访问凭证"""
# 请求飞书访问凭证获取接口
response = self.http_client.post(
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
data={
@ -778,11 +664,11 @@ class Authenticator:
},
)
# 若响应成功则抛出异常
# 若响应成功则抛出异常
if not response["code"] == 0:
raise RuntimeError("获取飞书访问凭证发生异常")
# 返回令牌、失效时间戳
# 返回访问令牌、失效时间戳
# noinspection PyTypeChecker
return (
response["tenant_access_token"],
@ -790,28 +676,28 @@ class Authenticator:
)
def get_token(self, servicer: str) -> str | None:
"""获取访问令牌"""
"""
:param servicer: 服务商数据类型为字符串
获取访问令牌
:param servicer: 服务商暂仅支持深圳快瞳合力亿捷和飞书
:return token: 访问令牌
"""
with threading.Lock():
# 初始化令牌和失效时间戳
# 初始化访问令牌和失效时间戳
token, expired_timestamp = None, 0
try:
with open(self.certifications_path, "r", encoding="utf-8") as file:
# 读取所有服务商访问凭证
# 本地打开并读取所有服务商访问凭证
certifications = json.load(file)
# 获取服务商访问凭证
# 获取指定服务商访问凭证
certification = certifications.get(servicer, None)
# 若服务商访问凭证非NONE则解析令牌和失效时间戳
# 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳
if certification is not None:
# 解析服务商访问令牌
# 访问令牌
token = certification["token"]
# 解析服务商访问令牌失效时间戳
# 失效时间戳
expired_timestamp = certification["expired_timestamp"]
# 若JSON反序列化发生异常则重置访问凭证
# 若反序列化发生异常则重置访问凭证储存文件
except json.decoder.JSONDecodeError:
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
@ -823,21 +709,17 @@ class Authenticator:
except Exception:
raise RuntimeError("获取访问令牌发生异常")
# 若当前时间戳大于失效时间戳,则请求服务商获取访问凭证接口
if time.time() > expired_timestamp:
# noinspection PyUnreachableCode
match servicer:
# 获取深圳快瞳访问凭证
case "szkt":
token, expired_timestamp = self._szkt_get_certification()
case "feishu":
token, expired_timestamp = self._feishu_get_certification()
# 获取合力亿捷访问凭证
case "hlyj":
token, expired_timestamp = self._hlyj_get_certification()
case "feishu":
token, expired_timestamp = self._feishu_get_certification()
case _:
raise RuntimeError(f"未设置服务商:({servicer})")
raise RuntimeError(f"未设置服务商:{servicer}获取访问凭证方法")
# 更新服务商访问凭证
certifications[servicer] = {
"token": token,
@ -855,22 +737,15 @@ class Authenticator:
return token
"""
封装飞书客户端实现获取验证码操作多维表格等
"""
class FeishuClinet:
class FeishuClient:
"""飞书客户端"""
def __init__(self):
self.authenticator = Authenticator()
self.http_client = HTTPClient()
def _headers(self):
"""请求头"""
# 装配飞书访问凭证
return {
"Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}",
@ -878,67 +753,51 @@ class FeishuClinet:
@staticmethod
def get_verification_code():
try:
# 执行时间戳
execute_timestamp = time.time()
# 超时时间戳
timeout_timestamp = execute_timestamp + 65
# 建立加密IMAP连接
server = IMAP4_SSL("imap.feishu.cn", 993)
# 登录
server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2")
while True:
# 若当前时间戳大于超时时间戳则返回NONE
if time.time() <= timeout_timestamp:
# 等待10秒
time.sleep(10)
# 选择文件夹(邮箱验证码)
server.select("&kK57sZqMi8F4AQ-")
# noinspection PyBroadException
try:
# 获取最后一封邮件索引server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引
index = server.search(None, "ALL")[1][0].split()[-1]
# 获取最后一封邮件内容并解析server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文
# noinspection PyUnresolvedReferences
contents = BytesParser(policy=default).parsebytes(
server.fetch(index, "(RFC822)")[1][0][1]
)
# 遍历邮件内容若正文内容类型为纯文本或HTML则解析发送时间和验证码
for content in contents.walk():
if (
content.get_content_type() == "text/plain"
or content.get_content_type() == "text/html"
):
# 邮件发送时间戳
# noinspection PyUnresolvedReferences
send_timestamp = parsedate_to_datetime(
content["Date"]
).timestamp()
# 若邮件发送时间戳大于执行时间戳则解析验证码并返回
if (
execute_timestamp
> send_timestamp
>= execute_timestamp - 35
):
# 登出
server.logout()
# 解析验证码
return re.search(
r"【普康健康】您的验证码是:(\d+)",
@ -947,18 +806,14 @@ class FeishuClinet:
# 若文件夹无邮件则继续
except:
pass
# 若超时则登出
else:
server.logout()
return None
except Exception:
raise RuntimeError("获取邮箱验证码发生其它异常")
# 查询多维表格记录单次最多查询500条记录
@ -970,9 +825,7 @@ class FeishuClinet:
field_names: Optional[list[str]] = None,
filter_conditions: Optional[dict] = None,
) -> pandas.DataFrame:
# 先查询多维表格记录,在根据字段解析记录
# 装配多维表格查询记录地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20"

Binary file not shown.

View File

@ -27,175 +27,6 @@ from utils.client import Authenticator, HTTPClient, CacheClient
# from utils.ocr import fuzzy_match
# -------------------------
# 封装方法
# -------------------------
# noinspection PyShadowingNames
def image_read(
image_path: Path,
) -> Tuple[numpy.ndarray | None, str | None, bytes | None]:
"""
本地打开并读取影像件
:param image_path: 影像件路径对象
:return: 影像件数组影像件格式和影像件字节流
"""
# noinspection PyBroadException
try:
# 影像件打开并读取(默认转为单通道灰度图)
image_ndarray = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE)
if image_ndarray is None:
raise RuntimeError("影像件打开并读取发生异常")
# 影像件格式
image_format = image_path.suffix.lower()
# 按照影像件格式将影像件数组编码
success, image_ndarray_encoded = cv2.imencode(image_format, image_ndarray)
if not success or image_ndarray_encoded is None:
raise RuntimeError("编码为图像字节数组发生异常")
# 将编码后图像数组转为字节流
image_bytes = image_ndarray_encoded.tobytes()
return image_ndarray, image_format, image_bytes
except Exception:
return None, None, None
# noinspection PyShadowingNames
def images_compress(
image_ndarray, image_format, image_bytes, image_size_specified=2
) -> Optional[str]:
"""
影像件压缩
:param image_ndarray: 影像件数组
:param image_format: 影像件格式
:param image_bytes: 影像件字节流
:param image_size_specified: 指定影像件大小单位为兆字节MB
:return: 压缩后影像件BASE64编码
"""
# 将指定影像件大小单位由兆字节转为字节
image_size_specified = image_size_specified * 1024 * 1024
# 影像件BASE64编码
image_base64 = b64encode(image_bytes).decode("utf-8")
if len(image_base64) <= image_size_specified:
return image_base64
# 通过调整影像件质量和尺寸达到压缩影像件目的
# 外循环压缩:通过调整影像件质量实现压缩影像件大小
for quality in range(90, 50, -10):
image_ndarray_copy = image_ndarray.copy()
# 内循环压缩:通过调整影像件尺寸实现压缩影像件大小
for i in range(25):
# 按照影像件格式和影像件质量将影像件数组编码
success, image_ndarray_encoded = cv2.imencode(
image_format,
image_ndarray_copy,
params=(
[cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10]
if image_format == "png"
else [cv2.IMWRITE_JPEG_QUALITY, quality]
),
)
if not success or image_ndarray_encoded is None:
break
# 影像件BASE64编码
image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode("utf-8")
if len(image_base64) <= image_size_specified:
return image_base64
# 调整影像件尺寸
image_ndarray_copy = cv2.resize(
image_ndarray_copy,
(
int(image_ndarray_copy.shape[0] * 0.9),
int(image_ndarray_copy.shape[1] * 0.9),
),
interpolation=cv2.INTER_AREA,
)
# 若调整后影像件尺寸中长或宽小于350像素则停止调整影像件尺寸
if min(image_ndarray_copy.shape[:2]) < 350:
break
return None
def images_classify(
image, image_ndarray, image_format, image_bytes
) -> tuple[str | None, str | None]:
"""
影像件分类并旋正
:param image: 影像件数据
:param image_ndarray: 影像件数组
:param image_format: 影像件格式
:param image_bytes: 影像件字节流
:return: 压缩后影像件BASE64编码
"""
# 影像件唯一标识
image_uuid = image["影像件唯一标识"]
with CacheClient() as client:
# 根据作业环节和影像件唯一标识生成缓存唯一标识
cache_guid = md5(("初审" + image_uuid).encode("utf-8")).hexdigest().upper()
cache = client.query(cache_guid)
if cache is not None:
# 影像件类型
image_type = cache["image_type"]
# 影像件方向
image_o
# 请求深圳快瞳影像件分类接口
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/genalClassify"),
# 用于和深圳快瞳联查定位
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(
servicer="szkt"
), # 使用全局变量
"imgBase64": f"data:image/{image_format};base64,{image_base64}", # 深圳快瞳要求修饰影像件BASE64编码的DATAURI
},
guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# 若响应非成功则返回NONE
if not (response.get("status") == 200 and response.get("code") == 0):
return None, None
# 根据票据类型和票据二级分类映射影像件类型
match (response["data"]["flag"], response["data"]["type"]):
case (7, "idcard-front-back"):
image_type = "居民身份证(正背面)"
case (7, "idcard-front"):
image_type = "居民身份证(正面)" # 包含国徽一面
case (7, "idcard-back"):
image_type = "居民身份证(背面)" # 包含头像一面
case (8, _):
image_type = "银行卡"
case (4, _):
image_type = "增值税发票"
case (5, _):
image_type = "门诊收费票据"
case (3, _):
image_type = "住院收费票据"
case (18, _):
image_type = "理赔申请书"
case _:
return None, None
# 影像件方向
image_orientation = {
"0": "0度",
"90": "顺时针90度",
"180": "180度",
"270": "逆时针90度",
}.get(response["data"]["angle"], "0度")
return image_type, image_orientation
def idcard_extraction(**kwargs) -> dict | None:
"""居民身份证数据提取"""
@ -1140,29 +971,25 @@ def disease_diagnosis(**kwargs) -> str | None:
# -------------------------
# 主逻辑部分
# 主逻辑
# -------------------------
if __name__ == "__main__":
# 初始化HTTP客户端
http_client = HTTPClient(timeout=300, cache_enabled=True)
# 初始化认证器
# 实例认证器
authenticator = Authenticator()
# 实例请求客户端
http_client = HTTPClient(timeout=300, cache_enabled=True) # 使用缓存
# 初始化影像件识别规则引擎
recognize_decision = decision(Path("rules/影像件是否需要数据提取.json"))
# 初始化工作目录地址对象
directory_path = Path("directory")
# 若不存在则创建
directory_path.mkdir(parents=True, exist_ok=True)
# 初始化影像件识别规则引擎
recognize_decision = decision(Path("rules/影像件是否需要数据提取.json"))
# 初始化JINJA2环境
environment = Environment(loader=FileSystemLoader("."))
# 添加DATE过滤器
environment.filters["date"] = lambda date: (
date.strftime("%Y-%m-%d") if date else "长期"
@ -1170,21 +997,212 @@ if __name__ == "__main__":
# 加载赔案档案模版
template = environment.get_template("template.html")
# 遍历工作目录中赔案目录,根据赔案创建赔案档案(模拟自动化域就待自动化任务创建理赔档案)
# -------------------------
# 自定义方法
# -------------------------
# noinspection PyShadowingNames
def image_read(
image_path: Path,
) -> Optional[numpy.ndarray | None]:
"""
本地打开并读取影像件
:param image_path: 影像件路径对象
:return: 影像件数组
"""
# noinspection PyBroadException
try:
# 影像件打开并读取(默认转为单通道灰度图)
image_ndarray = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE)
if image_ndarray is None:
raise RuntimeError("影像件打开并读取发生异常")
return image_ndarray
except Exception:
# 若本地打开并读取影像件发生异常则抛出异常(实际作业需从影像件服务器下载并读取影像件,因签收时会转存,故必可下载)
raise RuntimeError("影像件打开并读取发生异常")
# noinspection PyShadowingNames
def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str:
"""
影像件序列化
:param image_format: 影像件格式
:param image_ndarray: 影像件数组
:return: 影像件唯一标识
"""
# 按照影像件格式就影像件数组编码
success, image_ndarray_encoded = cv2.imencode(image_format, image_ndarray)
if not success or image_ndarray_encoded is None:
raise RuntimeError("编码为图像字节数组发生异常")
# 将编码后图像数组转为字节流
image_bytes = image_ndarray_encoded.tobytes()
# 生成影像件唯一标识
image_guid = md5(image_bytes).hexdigest().upper()
return image_guid
# noinspection PyShadowingNames
def images_classify(
image_guid: str, image_format: str, image_ndarray: numpy.ndarray
) -> Optional[Tuple[str, str, str]]:
"""
影像件分类并旋正
:param image_guid: 影像件唯一标识
:param image_format: 影像件格式
:param image_ndarray: 影像件数据
:return: 压缩后影像件BASE64编码影像件类型和影像件方向
"""
# noinspection PyShadowingNames
def images_compress(
image_format, image_ndarray, image_size_specified=2
) -> Optional[str]:
"""
影像件压缩
:param image_ndarray: 影像件数组
:param image_format: 影像件格式
:param image_size_specified: 指定影像件大小单位为兆字节MB
:return: 压缩后影像件BASE64编码
"""
# 将指定影像件大小单位由兆字节转为字节
image_size_specified = image_size_specified * 1024 * 1024
# 通过调整影像件质量和尺寸达到压缩影像件目的
# 外循环压缩:通过调整影像件质量实现压缩影像件大小
for quality in range(100, 50, -10):
image_ndarray_copy = image_ndarray.copy()
# 内循环压缩:通过调整影像件尺寸实现压缩影像件大小
for i in range(10):
# 按照影像件格式和影像件质量将影像件数组编码
success, image_ndarray_encoded = cv2.imencode(
image_format,
image_ndarray_copy,
params=(
[cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10]
if image_format == "png"
else [cv2.IMWRITE_JPEG_QUALITY, quality]
),
)
# 若编码发生异常则停止循环
if not success or image_ndarray_encoded is None:
break
# 影像件BASE64编码
image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode("utf-8")
if len(image_base64) <= image_size_specified:
return image_base64
# 调整影像件尺寸
image_ndarray_copy = cv2.resize(
image_ndarray_copy,
(
int(image_ndarray_copy.shape[0] * 0.95),
int(image_ndarray_copy.shape[1] * 0.95),
),
interpolation=cv2.INTER_AREA,
)
# 若调整后影像件尺寸中长或宽小于350像素则停止调整影像件尺寸
if min(image_ndarray_copy.shape[:2]) < 350:
break
return None
# 影像件压缩
image_base64 = images_compress(image_format, image_ndarray, image_size_specified=2) # 深圳快瞳要求为2兆字节
# TODO: 若影像件压缩发生异常则流转至人工处理
if image_base64 is None:
raise RuntimeError("影像件压缩发生异常")
# 请求深圳快瞳影像件分类接口
response = http_client.post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/genalClassify"),
headers={"X-RequestId-Header": image_guid}, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={
"token": authenticator.get_token(
servicer="szkt"
), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", # 影像件BASE64编码嵌入数据统一资源标识符
},
guid=md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# 若响应非成功则抛出异常
# TODO: 若响应非成功则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise RuntimeError("请求深圳快瞳影像件分类接口发生异常")
# 解析影像件类型
# TODO: 后续完成居民户口簿、中国港澳台地区及境外护照和医疗费用清单
# noinspection PyTypeChecker
match (response["data"]["flag"], response["data"]["type"]):
case (14, _):
image_type = "居民户口簿"
case (7, "idcard-front-back"):
image_type = "居民身份证(国徽、头像面)"
case (7, "idcard-front"):
image_type = "居民身份证(国徽面)"
case (7, "idcard-back"):
image_type = "居民身份证(头像面)"
case (11, _):
image_type = "中国港澳台地区及境外护照"
case (8, _):
image_type = "银行卡"
case (4, _):
image_type = "增值税发票"
case (1, _):
image_type = "医疗费用清单"
case (5, _):
image_type = "门诊收费票据"
case (3, _):
image_type = "住院收费票据"
case (18, _):
image_type = "理赔申请书"
case _:
image_type = "其它"
# 解析影像件方向
# noinspection PyTypeChecker
image_orientation = {
"0": "0度",
"90": "顺时针90度",
"180": "180度",
"270": "逆时针90度",
}.get(response["data"]["angle"], "0度")
# 若影像件方向非0度则旋正
if image_orientation != "0度":
image_ndarray = cv2.rotate(
image_ndarray,
{
"顺时针90度": cv2.ROTATE_90_COUNTERCLOCKWISE, # 逆时针旋转90度
"180度": cv2.ROTATE_180, # 旋转180度
"逆时针90度": cv2.ROTATE_90_CLOCKWISE, # 顺时针旋转90度
}[image_orientation],
)
# 旋正后影像件再次压缩
image_base64 = images_compress(image_format, image_ndarray, image_size_specified=2)
# TODO: 若旋正后影像件再次压缩发生异常则流转至人工处理
if image_base64 is None:
raise RuntimeError("旋正后影像件再次压缩发生异常")
return image_base64, image_type, image_orientation
# 遍历工作目录中赔案目录并创建赔案档案(模拟自动化域就待自动化任务创建理赔档案)
for case_path in [
case_path for case_path in directory_path.iterdir() if case_path.is_dir()
x for x in directory_path.iterdir() if x.is_dir()
]:
# 初始化赔案档案(实际报案层包括保险分公司名称、报案渠道、批次号、报案号和报案时间等)
# 报案渠道包括:保司定义,例如中银项目包括总行和各地分行驻场报案和普康宝自助报案等
dossier = {
"报案层": {
"保险分公司": "中银保险有限公司广东分公司", # 设定:保险分公司
"保险分公司": "中银保险有限公司广东分公司", # 设定:保险分公司为中银保险有限公司广东分公司
"赔案号": (case_number := case_path.stem), # 设定:赔案目录名称为赔案号
},
"影像件层": [],
}
# 遍历赔案目录中影像件地址
# 遍历赔案目录中影像件路径对象
for image_index, image_path in enumerate(
sorted(
[
@ -1201,64 +1219,31 @@ if __name__ == "__main__":
"原始影像件": {
"影像件地址": image_path.as_posix(), # 将影像件路径对象转为字符串
"影像件名称": (image_name := image_path.stem),
"影像件格式": (image_format := image_path.suffix.lower()),
},
}
# 本地打开并读取影像件(实际作业为从作业系统的影像件服务器下载并读取影像件,因赔案签收时会转存影像件至影像件服务器,若下载并读取影像件发生异常则技术支持排查)
image_ndarray, image_format, image_bytes = image_read(image_path)
# 若本地打开并读取影像件发生异常则跳过该影像件
if image_format is None or image_bytes is None:
raise RuntimeError("本地打开并读取影像件发生异常")
# 本地打开并读取影像件
image_ndarray = image_read(image_path)
image["原始影像件"]["影像件格式"] = image_format
# 生成影像件唯一标识
# 影像件序列化
# noinspection PyTypeChecker
image["影像件唯一标识"] = md5(image_bytes).hexdigest().upper()
image["影像件唯一标识"] = (image_guid := image_serialize(image_format, image_ndarray))
# 影像件分类
# 影像件分类并旋正(较初审自动化,无使能检查)
image_base64, image_type, image_orientation = images_classify(image_guid, image_format, image_ndarray)
image["影像件类型"] = image_type
image["影像件方向"] = image_orientation
# 将影像件数据添加至影像件层
dossier["影像件层"].append(image)
print(image)
exit()
# 影像件识别
print(dossier)
exit()
"""
# 影像件压缩输出BASE64编码
image_guid, image_base64 = images_compression()
# 若发生异常则跳过该影像件
if image_guid is None or image_base64 is None:
dossier["影像件层"][-1]["已分类"] = "否,压缩异常"
continue
# 通过请求深圳快瞳影像件分类接口获取影像件类型和方向
image_type, image_orientation = images_classification()
# 若发生异常则跳过该影像件
if image_type is None or image_orientation is None:
dossier["影像件层"][-1]["已分类"] = "否,影像件分类异常"
continue
#
dossier["影像件层"].append(
{
"影像件序号": (image_index := f"{image_index:02d}"),
"影像件名称": (image_name := image_path.name),
}
)
# 若影像件方向非0度则影像件旋正并在此压缩
if image_orientation != "0度":
# 影像件旋正
image = cv2.rotate(
image,
{
"顺时针90度": cv2.ROTATE_90_COUNTERCLOCKWISE, # 逆时针旋转90度
"180度": cv2.ROTATE_180, # 旋转180度
"逆时针90度": cv2.ROTATE_90_CLOCKWISE, # 顺时针旋转90度
}[image_orientation],
)
# 影像件再次压缩
image_guid, image_base64 = images_compression()
if image_guid is None or image_base64 is None:
dossier["影像件层"][-1]["已分类"] = "否,压缩异常"
continue
dossier["影像件层"][-1].update({"已分类": "", "影像件类型": image_type})
# 根据保险总公司和影像件类型评估影像件是否需要数据提取,若无需数据提取则跳过该影像件(例如,中银保险有限公司理赔申请书包含户名、开户银行和银行账号,无需识别银行卡)
if not recognize_decision.evaluate(