# -*- coding: utf-8 -*- """基于FastAPI和Uvicorn的RESTfulAPI服务""" from contextlib import asynccontextmanager from datetime import date, datetime from typing import AsyncGenerator, Literal, Optional, Union from urllib.parse import quote_plus import pytz from distance import levenshtein from fastapi import Depends, FastAPI, HTTPException, Header, status from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from lunarcalendar import Lunar from pydantic import BaseModel, Field, field_validator from sqlalchemy import create_engine, select, text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.ext.automap import automap_base # ------------------------- # 应用初始化配置 # ------------------------- # 创建FastAPI对象 application = FastAPI( title="api.liubiren.cloud", description="刘弼仁工作室的请求服务中心,如需使用请联系liubiren@qq.com", version="0.0.1", swagger_ui_init_oauth={ "clientId": "api-docs-client", "scopes": {}, "usePkceWithAuthorizationCodeGrant": True, }, redoc_url=None, ) # 配置全局安全策略(所有请求均需认证) application.openapi_security = [{"BearerAuth": []}] # ------------------------- # 请求和响应模型 # ------------------------- class Request(BaseModel): """统一请求模型""" service: Literal["divination", "query_institution", "query_drug"] = Field( ..., description="服务标识,数据类型为枚举,必填", json_schema_extra={ "枚举描述": { "divination": "小六壬速断", "query_institution": "根据名称精准查询医药机构信息", } }, ) data: Union["DivinationRequest", "QueryInstitutionRequest", "QueryDrugRequest"] = ( Field( ..., description="请求数据模型,根据服务标识传入相应的请求数据模型", ) ) # 根据服务标识校验请求数据模型 # noinspection PyNestedDecorators @field_validator("data") @classmethod def validate_data(cls, value, values): service = values.data.get("service") if service == "divination" and not isinstance(value, DivinationRequest): raise ValueError("小六壬速断服务需要 DivinationRequest 请求数据模型") if service == "query_institution" and not isinstance( value, QueryInstitutionRequest ): raise ValueError( "根据名称精准查询医药机构信息服务需要 QueryInstitutionRequest 请求数据模型" ) if service == "query_drug" and not isinstance(value, QueryDrugRequest): raise ValueError( "根据类型和名称模糊查询药品信息服务需要 QueryDrugRequest 请求数据模型" ) return value class Config: json_schema_extra = { "example": { "service": "query_institution", "data": {"name": "浙江大学医学院附属第一医院"}, } } class Response(BaseModel): """统一响应模型""" code: int = Field( default=0, description="错误码,0表示成功,其它表示发生错误或异常" ) message: str = Field( default="成功", description="错误描述", ) data: Union[ "DivinationResponse", "QueryInstitutionResponse", "QueryDrugResponse" ] = Field(default=None, description="响应数据模型") # noinspection PyUnusedLocal @application.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, error: RequestValidationError): return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content=Response( code=422, message="校验模型失败", ).model_dump(), ) # noinspection PyUnusedLocal @application.exception_handler(HTTPException) async def http_exception_handler(request: Request, exception: HTTPException): return JSONResponse( status_code=exception.status_code, content=Response( code=exception.status_code, message="请求发生异常", ).model_dump(), ) # noinspection PyUnusedLocal @application.exception_handler(Exception) async def general_exception_handler(request: Request, exception: Exception): return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=Response( code=500, message="服务内部发生异常", ).model_dump(), ) class DivinationRequest(BaseModel): """小六壬速断的请求数据模型""" pass class DivinationResponse(BaseModel): """小六壬速断的响应数据模型""" fallen_palace: str = Field( ..., description="落宫,数据类型为字符串,非空", examples=["小吉"] ) divination_verse: str = Field( ..., description="卦辞,数据类型为字符串,非空", examples=["小吉最吉昌,路上好商量,阴人来报喜,失物在坤方"], ) class QueryInstitutionRequest(BaseModel): """根据名称精准查询医药机构信息的请求数据模型""" name: str = Field( ..., max_length=255, description="医药机构名称,数据类型为字符串,非空", examples=["浙江大学医学院附属第一医院"], ) # noinspection PyNestedDecorators @field_validator("name") @classmethod def validate_name(cls, value: str) -> str: """删除名称前后空格""" return value.strip() class QueryInstitutionResponse(BaseModel): """根据名称精准查询医药机构信息的响应数据模型""" name: str = Field( ..., description="机构名称,数据类型为字符串,非空", examples=["浙江大学医学院附属第一医院"], ) province: str = Field( ..., description="机构所在省,数据类型为字符串,非空", examples=["浙江省"], ) city: str = Field( ..., description="机构所在地,数据类型为字符串,非空", examples=["杭州市"], ) type: str = Field( ..., description="机构类型,数据类型为字符串,非空", examples=["医疗机构"], ) incurred: str = Field( ..., description="是否为医保定点机构,数据类型为字符串,非空", examples=["是"], ) level: Optional[str] = Field( None, description="机构等级,数据类型为字符串,可空", examples=["三级甲等"], ) attribute: Optional[str] = Field( None, description="机构属性,数据类型为字符串,可空", examples=["公立医院、非营利性医院"], ) class QueryDrugRequest(BaseModel): """根据类型和名称模糊查询药品信息的请求数据模型""" type: Literal["西药", "中草药", "中成药"] = Field( ..., description="药品类型,数据类型为枚举,非空", examples=["西药"], ) name: str = Field( ..., max_length=255, description="药品名称,数据类型为字符串,非空", examples=["[达悦宁]盐酸二甲双胍缓释片 0.5*30"], ) # noinspection PyNestedDecorators @field_validator("name") @classmethod def validate_name(cls, value: str) -> str: """删除名称前后空格""" return value.strip() class QueryDrugResponse(BaseModel): """根据类型和名称模糊查询药品信息的响应数据模型""" name: str = Field( ..., description="药品名称,数据类型为字符串,非空", examples=["盐酸二甲双胍缓释片"], ) # ------------------------- # 依赖项与工具函数 # ------------------------- async def authenticate_headers( credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()), content_type: str = Header( default="application/json; charset=utf-8", description="媒体类型", ), ) -> bool: """校验请求头中Content-Type和Authorization Bearer token""" if "application/json" not in content_type: raise HTTPException(status_code=415, detail="只接受JSON格式数据") if not credentials or credentials.credentials != "779E0501265CDF7B8124EB87199994B8": raise HTTPException(status_code=403, detail="认证失败") return True # 创建MySQL连接引擎(默认使用DATABASE数据库) engine = create_async_engine( url=f"mysql+asyncmy://root:{quote_plus('Te198752')}@cdb-7z9lzx4y.cd.tencentcdb.com:10039/database?charset=utf8", pool_size=10, # 连接池常驻连接数 max_overflow=10, # 连接池最大溢出连接数 pool_recycle=3600, # 连接回收时间(秒) pool_pre_ping=True, # 连接前验证有效性 ) # 创建ORM对象 Base = automap_base() Base.prepare( autoload_with=create_engine( f"mysql+pymysql://root:{quote_plus('Te198752')}@cdb-7z9lzx4y.cd.tencentcdb.com:10039/database?charset=utf8" ) ) # 初始化MySQL会话工厂 AsyncSessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False) @asynccontextmanager async def create_session() -> AsyncGenerator[AsyncSession, None]: """数据库会话上下文管理器""" async with AsyncSessionLocal() as session: try: yield session await session.commit() except: await session.rollback() raise finally: await session.close() # ------------------------- # 服务路由 # ------------------------- @application.post( path="/", dependencies=[Depends(authenticate_headers)], response_model=Response, response_description="响应成功", responses={ 200: { "model": Response, "content": { "application/json": { "example": { "code": 200, "message": "医药机构信息不存在", "data": None, } } }, "description": "不存在", }, 422: { "model": Response, "content": { "application/json": { "example": {"code": 422, "message": "校验模型失败", "data": None} } }, "description": "校验模型失败", }, 500: { "model": Response, "content": { "application/json": { "example": { "code": 500, "message": "服务内部发生异常", "data": None, } } }, "description": "服务内部发生异常", }, }, name="服务中心", description="所有请求均由本中心提供响应服务", ) async def service( request: Request, ) -> Response: # 根据服务标识匹配服务 # noinspection PyUnreachableCode match request.service: case "divination": return await divination() case "query_institution": return await query_institution(request) case "query_drug": return await query_drug(request) case _: return Response(code=400, message="无效的服务标识") async def divination() -> Response: """小六壬速断""" # 起算日期时间 starting = datetime.now(tz=pytz.timezone("Asia/Shanghai")) # 起算日期转为农历 lunar = Lunar.from_date(date(starting.year, starting.month, starting.day)) # 根据农历月日和时辰匹配落宫和卦辞 divination = [ { "fallen_palace": "空亡", "divination_verse": "空亡事不详,阴人少乖张,求财无利益,行人有灾殃", }, { "fallen_palace": "大安", "divination_verse": "大安事事昌,求财在坤方,失物去不远,宅舍保安康", }, { "fallen_palace": "留连", "divination_verse": "留连事难成,求谋日未明,官事只宜缓,去者未回程", }, { "fallen_palace": "速喜", "divination_verse": "速喜喜来临,求财向南行,失物申午未,寻人路上寻", }, { "fallen_palace": "赤口", "divination_verse": "赤口主口舌,是非要紧防,失物速速讨,行人有惊慌", }, { "fallen_palace": "小吉", "divination_verse": "小吉最吉昌,路上好商量,阴人来报喜,失物在坤方", }, ][ (lunar.month + lunar.day + ((starting.hour + 3) // 2) % 12 + 4) % 6 ] # 需先将24制小时转为时辰,再根据月、日和时辰数落宫 return Response( data=DivinationResponse( fallen_palace=divination["fallen_palace"], divination_verse=divination["divination_verse"], ) ) async def query_institution(request: Request) -> Response: """根据名称精准查询医药机构信息""" async with create_session() as session: # noinspection PyTypeChecker institution = ( await session.execute( select(Base.classes.institution) .join(Base.classes.institution_alias) .where(Base.classes.institution_alias.name == request.data.name) ) ).scalar_one_or_none() if institution is None: return Response(code=204, message="医药机构信息不存在") return Response( data=QueryInstitutionResponse( name=institution.name, province=institution.province, city=institution.city, type=institution.type, incurred=institution.incurred, level=institution.level, attribute=institution.attribute, ) ) async def query_drug(request: Request) -> Response: """根据类型和名称模型查询药品信息""" async with create_session() as session: # 基于MySQL全文检索能力,召回与检索词高度相关的药品名称 drugs_name = ( ( await session.execute( text( """ SELECT name FROM drug WHERE MATCH(name) AGAINST(:name IN NATURAL LANGUAGE MODE) AND type = :type ORDER BY (name = :name) DESC, MATCH(name) AGAINST(:name) DESC, LENGTH(name) ASC LIMIT 10 """ ).bindparams(type=request.data.type, name=request.data.name) ) ) .scalars() .all() ) result = None for drug_name in drugs_name: # 若检索词包含药品名称则以此作为结果 if drug_name in request.data.name: result = drug_name break # 若 if result is None: round((1 - levenshtein(string1, string2) / (len(string1) + len(string2))) * 100) print(drugs_name) if not drugs_name: return Response(code=204, message="药品信息不存在") return Response( data=QueryDrugResponse( name=drugs_name[0], ) ) """ """