diff --git a/任务调度服务器/Dockerfile b/任务调度服务器/Dockerfile deleted file mode 100644 index ecb824e..0000000 --- a/任务调度服务器/Dockerfile +++ /dev/null @@ -1,33 +0,0 @@ -#指定基础镜像 -FROM python:3.11.4 - -#指定镜像标签 -LABEL maintainer="marslbr" \ -description="building dagster based on docker" - -#创建目录 -RUN mkdir -p /opt/dagster/dagster_home /opt/dagster/app - -#指定时区 -RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ -&& echo "Asia/Shanghai" > /etc/timezone - -#指定PIP下载源 -RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ - -#复制依赖列表 -COPY ./requirements.txt /requirements.txt - -#更新PIP并安装依赖 -RUN pip install --upgrade pip \ -&& pip install --requirement /requirements.txt - -#复制程序 -COPY ./scripts /scripts - -#指定工作目录 -WORKDIR /scripts/ - -EXPOSE 5102 - -CMD ["uvicorn", "main:service", "--host", "0.0.0.0"] \ No newline at end of file diff --git a/任务调度服务器/dagster.yaml b/任务调度服务器/dagster.yaml deleted file mode 100644 index ae945e1..0000000 --- a/任务调度服务器/dagster.yaml +++ /dev/null @@ -1,16 +0,0 @@ -storage: - sqlite: - base_dir: - env: SQLITE_STORAGE_BASE_DIR - -compute_logs: - module: dagster.core.storage.local_compute_log_manager - class: LocalComputeLogManager - config: - env: LOCAL_COMPUTE_LOG_MANAGER_DIRECTORY - -local_artifact_storage: - module: dagster.core.storage.root - class: LocalArtifactStorage - config: - env: DAGSTER_LOCAL_ARTIFACT_STORAGE_DIR \ No newline at end of file diff --git a/任务调度服务器/requirements.txt b/任务调度服务器/requirements.txt deleted file mode 100644 index db5d302..0000000 --- a/任务调度服务器/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -dagster -dagster-webserver -json -time \ No newline at end of file diff --git a/任务调度服务器/scheduled_tasks.py b/任务调度服务器/scheduled_tasks.py deleted file mode 100644 index c10fd30..0000000 --- a/任务调度服务器/scheduled_tasks.py +++ /dev/null @@ -1,361 +0,0 @@ -# -*- coding: utf-8 -*- - -''' - -脚本说明: - -本脚本用于Docker中Python执行定时任务 - -''' - -#使用APScheduler -from apscheduler.schedulers.background import BackgroundScheduler - -#初始化调度器 -scheduler = BackgroundScheduler() - -import pytz - -#定义时区 -timezone = pytz.timezone('Asia/Shanghai') - -from datetime import datetime - -import time - -from sqlalchemy import create_engine, types - -from urllib import parse, request, error - -import json - -import pandas - -import smtplib - -from email.mime.text import MIMEText - -from email.header import Header - -''' - - - -''' - -#定义类:读取数据结构 -class Response(): - - def __init__(self, code, messsage, data): - - self.code = code - - self.messsage = messsage - - self.data = data - -#定义函数:POST方式通过请求接口获取数据 -def read_request_post(url, headers, data): - - #dict转为字符串 - data = bytes(json.dumps(obj = data), encoding = 'utf8') - - try : - - #访问接口并发送请求 - api_request = request.Request(url = url, headers = headers, data = data, method = 'POST') - - #获取响应并解码 - api_response = request.urlopen(url = api_request).read().decode('utf8') - - #json转为dict - data = json.loads(api_response) - - return Response(100, '读取成功', data) - - except : - - return Response(900, '读取失败', '') - -#定义函数:GET方式通过请求接口获取数据 -def read_request_get(url, headers, data): - - try : - - #访问接口并发送请求 - api_request = request.Request(url = url, headers = headers, data = data, method = 'GET') - - #获取响应并解码 - api_response = request.urlopen(url = api_request).read().decode('utf8') - - #json转为dict - data = json.loads(api_response) - - return Response(100, '读取成功', data) - - except : - - return Response(900, '读取失败', '') - -#定义函数:基于SQLAlchemy格式化数据库连接信息 -def database_connect(username, password, host, port, database): - - return 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(username, password, host, port, database) - -#定义函数:连接数据库并读取数据 -def read_mysql(database_connect, query): - - try : - - #初始化数据库连接 - database = create_engine(database_connect) - - #创建连接 - connection = database.connect() - - data = pandas.read_sql_query(con = connection, sql = query) - - #关闭连接 - connection.close() - - database.dispose() - - return Response(100, '读取成功', data) - - except : - - return Response(900, '读取失败', '') - -#定义函数:连接数据库并写入数据 -def write_mysql(database_connect, table_name, dataset, data_types): - - try : - - #初始化数据库连接 - database = create_engine(database_connect) - - #创建连接 - connection = database.connect() - - dataset.to_sql(con = connection, name = table_name, if_exists = 'replace', index = False, dtype = data_types) - - #关闭连接 - connection.close() - - database.dispose() - - return Response(100, '写入成功', '') - - except : - - return Response(900, '写入失败', '') - -#定义函数:基础定时任务,支持连接出库,查询数据,写入入库 -def task_basic(task_id, database_connect_read, query, database_connect_write, table_name, data_types): - - #查询数据 - response = read_mysql(database_connect_read, query) - - if response.code == 100: - - dataset = response.data - - #写入数据 - response = write_mysql(database_connect_write, table_name, dataset, data_types) - - if response.code == 100: - - code = 100 - - messsage = '执行成功' - - else: - - code = 902 - - messsage = '执行失败(连接或写入异常)' - - else: - - code = 901 - - messsage = '执行失败(连接或查询异常)' - - messsage = '[定时任务]-[{}]-[{}]-[{}]'.format(task_id, messsage, datetime.now(timezone).strftime('%y-%m-%d %H:%M:%S')) - - return Response(code, messsage, '') - -#定义函数:发送邮件 -def send_mail(messsage): - - mail_message = MIMEText(messsage, 'plain', 'utf-8') - - mail_message['From'] = Header('我', 'utf-8') - - mail_message['To'] = Header('刘弼仁', 'utf-8') - - mail_message['Subject'] = Header('Scheduled Task Report', 'utf-8') - - try : - - #连接SMTP服务器 - smtp_server = smtplib.SMTP_SSL('smtp.feishu.cn', 465) - - #登录 - smtp_server.login('mars@liubiren.cloud', '8AI2kUKn5x5Le4Hg') - - smtp_server.sendmail('mars@liubiren.cloud', ['mars@liubiren.cloud'], mail_message.as_string()) - - return Response(100, '发送成功', '') - - except : - - return Response(900, '发送失败', '') - -''' - - - -''' - -#刘弼仁私人数据库连接信息 -my_database_connect = database_connect('root', 'Te198752', 'cdb-7z9lzx4y.cd.tencentcdb.com', '10039', 'hzdd') - -#杭州刀豆数据库链接信息 -hzdd_database_connect = database_connect('liubiren', 'G18HamnVf96frq0K', 'daodou-prod-proxy-public.rwlb.rds.aliyuncs.com', '3306', 'whaleip') - -''' - - - -''' - -''' - -定时任务001 - -从杭州刀豆数据库抽取租户注册数据并每日同步至刘弼仁私人数据库 - -''' - -def scheduled_task_001(): - - #定义查询语句 - query = "select date_format(table1.insert_time, '%%y-%%m-%%d') as 'register_date', table1.customer_id as 'tenant_id', (case table1.customer_type when 1 then '企业' when 2 then '个人' else null end) as 'tenant_type', table1.full_name as 'tenant_name', table1.company_USCI as 'tenant_uuid', s_customer_register_channel.channel_name as 'register_channel' from ( select insert_time, id as 'customer_id', customer_type, full_name, company_USCI, source_type from s_customer_info where status = 0 and test_status = 0 and full_name not regexp '花豆|刀豆|鲸版权|维权骑士|测试|test') as table1 left join s_customer_register_channel on table1.source_type = s_customer_register_channel.channel_key order by tenant_id desc" - - #定义数据类型 - data_types = {'register_date': types.Date(), 'tenant_id': types.Integer(), 'tenant_type': types.Text(), 'tenant_name': types.Text(), 'tenant_uuid': types.Text(), 'register_channel': types.Text()} - - return task_basic('001', hzdd_database_connect, query, my_database_connect, 'hzdd_tenant', data_types) - -''' - -定时任务002 - -从杭州刀豆数据库抽取作品权利证明数据并每日同步至刘弼仁私人数据库 - -''' - -def scheduled_task_002(): - - #定义查询语句 - query = "select date_format(s_customer_works_base.insert_time, '%%y-%%m-%%d') as 'works_enter_date', s_customer_works_base.id as 'works_id', s_customer_works_base.customer_id as 'tenant_id', ( case s_customer_works_base.source_type when 0 then '用户添加' when 1 then '天网添加' when 2 then '骑士添加' when 3 then '它方推送' when 4 then '前台添加' else '其它' end) as 'works_enter_type', date_format(s_customer_file_tsa.right_time, '%%y-%%m-%%d') as 'works_tsa_date', ( case s_customer_file_tsa.source_type when 0 then '用户申请' when 1 then '它方推送' when 2 then '用户上传' else null end) as 'tsa_enter_type', date_format(s_customer_copyright_task.insert_time, '%%y-%%m-%%d') as 'works_copyright_date', ( case s_customer_copyright_task.source_type when 1 then '用户申请' when 2 then '用户上传' else null end) as 'copyright_enter_type', ( case s_customer_copyright_task.works_type when 1 then '文字' when 2 then '口述' when 3 then '音乐' when 4 then '戏剧' when 5 then '曲艺' when 6 then '舞蹈' when 7 then '杂技' when 8 then '美术' when 9 then '建筑' when 10 then '摄影' when 11 then '电影' when 13 then '影视' when 14 then '设计图' when 15 then '地图' when 16 then '模型' when 18 then '录音' when 19 then '录像' when 20 then '视听' else null end) as 'copyright_works_type', date_format(s_customer_other_confirm_right.insert_time, '%%y-%%m-%%d') as 'works_tpa_date' from s_customer_works_base left join s_customer_file_tsa on s_customer_works_base.id = s_customer_file_tsa.works_id left join s_customer_copyright_task on s_customer_works_base.id = s_customer_copyright_task.works_id left join s_customer_other_confirm_right on s_customer_works_base.id = s_customer_other_confirm_right.work_id where s_customer_works_base.status = 0 having tenant_id in ( select id from s_customer_info where status = 0 and test_status = 0 and full_name not regexp '花豆|刀豆|鲸版权|维权骑士|测试|test' )" - - #定义数据类型 - data_types = {'works_enter_date': types.Date(), 'works_id': types.Integer(), 'tenant_id': types.Integer(), 'works_enter_type': types.Text(), 'works_tsa_date': types.Date(), 'tsa_enter_type': types.Text(), 'works_copyright_date': types.Date(), 'copyright_enter_type': types.Text(), 'copyright_works_type': types.Text(), 'works_tpa_date': types.Date()} - - return task_basic('002', hzdd_database_connect, query, my_database_connect, 'hzdd_works_rights', data_types) - -''' - -定时任务003 - -从杭州刀豆数据库抽取作品版权检测保护数据并每日同步至刘弼仁私人数据库 - -''' - -def scheduled_task_003(): - - #定义查询语句 - query = "select date_format(s_customer_works_base.insert_time, '%%y-%%m-%%d') as 'works_enter_date', s_customer_works_base.id as 'works_id', s_customer_works_base.customer_id as 'tenant_id', ( case s_customer_works_base.source_type when 0 then '用户添加' when 1 then '天网添加' when 2 then '骑士添加' when 3 then '它方推送' when 4 then '前台添加' else '其它' end) as 'works_enter_type', date_format(s_customer_monitor_work.start_time, '%%y-%%m-%%d') as 'monitoring_start_date', date_format(s_customer_monitor_work.end_time, '%%y-%%m-%%d') as 'monitoring_end_date', date_format(s_customer_protect_works.start_time, '%%y-%%m-%%d') as 'protection_start_date', date_format(s_customer_protect_works.end_time, '%%y-%%m-%%d') as 'protection_end_date' from s_customer_works_base left join s_customer_monitor_work on s_customer_works_base.id = s_customer_monitor_work.work_id left join s_customer_protect_works on s_customer_works_base.id = s_customer_protect_works.works_id" - - #定义数据类型 - data_types = {'works_enter_date': types.Date(), 'works_id': types.Integer(), 'tenant_id': types.Integer(), 'works_enter_type': types.Text(), 'monitoring_start_date': types.Date(), 'monitoring_end_date': types.Date(), 'protection_start_date': types.Date(), 'protection_end_date': types.Date()} - - return task_basic('003', hzdd_database_connect, query, my_database_connect, 'hzdd_works_protection', data_types) - -''' - -定时任务004 - -从杭州刀豆数据库抽取订单数据并每日同步至刘弼仁私人数据库 - -''' - -def scheduled_task_004(): - - #定义查询语句 - query = "select table_temporary.tenant_id, table_temporary.order_id, table_temporary.service_name, ( case when service_name = '监控定制版' then if(service_price = 0, 2000 * order_count, order_price) when service_name = '风控定制版' then if(service_price = 0, 2000 * order_count, order_price) when service_name = '敏感词定制版' then if(service_price = 0, 2000 * order_count, order_price) when service_name = '保护定制版' then if(service_price = 0, if(order_count >= 12, 350000, reference_price * order_count), order_price) else if(service_price = 0, reference_price * order_count, order_price) end) as 'order_amount', table_temporary.order_creat_date from ( select tenant_id, order_id, service.name as 'service_name', service.price as 'reference_price', service_price, order_count, order_price, order_creat_date from ( select customer_id as 'tenant_id', order_number as 'order_id', unit_price as 'service_price', number as 'order_count', price as 'order_price', date_format(order_detail.insert_time, '%%y-%%m-%%d') as 'order_creat_date', service_id from order_detail left join ( select fk_refund_order_id from order_detail where fk_refund_order_id != 0 ) as table_temporary on id = table_temporary.fk_refund_order_id where order_detail.fk_refund_order_id = 0 and table_temporary.fk_refund_order_id is null ) as table_temporary left join service on table_temporary.service_id = service.id where service.name != '监控试用版' and tenant_id in ( select id from s_customer_info where status = 0 and test_status = 0 and full_name not regexp '花豆|刀豆|鲸版权|维权骑士|测试|test' ) ) as table_temporary left join order_info on table_temporary.order_id = order_info.order_number where order_info.status = 2 order by order_creat_date desc" - - #定义数据类型 - data_types = {'tenant_id': types.Integer(), 'order_id': types.Text(), 'service_name': types.Text(), 'order_amount': types.Integer(), 'order_creat_date': types.Date()} - - return task_basic('004', hzdd_database_connect, query, my_database_connect, 'hzdd_orders', data_types) - -''' - -每日定时项目 - -''' - -def scheduled_tasks(): - - ''' - - 依次执行定时任务 - - ''' - - response1 = scheduled_task_001() - - time.sleep(10) - - response2 = scheduled_task_002() - - time.sleep(10) - - response3 = scheduled_task_003() - - time.sleep(10) - - response4 = scheduled_task_004() - - time.sleep(10) - - messsage = response1.messsage + '\n' + response2.messsage + '\n' + response3.messsage + '\n' + response4.messsage - - response = send_mail(messsage) - - return response.code - -def scheduled_tasks(): - - print('[定时任务]-[{}]'.format(datetime.now(timezone).strftime('%y-%m-%d %H:%M:%S'))) - - -try: - - #scheduler.add_job(func = scheduled_tasks, trigger = 'cron', day = '*/1', hour = 8, minute = 30, second = 0, timezone = timezone, id = 'daily') - - scheduler.add_job(func = scheduled_tasks, trigger = 'cron', minute = '*/1', second = 0, timezone = timezone, id = 'daily') - - scheduler.start() - - print('定时任务脚本已启动') - -except: - - scheduler.shutdown(wait = False) - - print('定时任务脚本启动失败') - diff --git a/任务调度服务器/workspace.yaml b/任务调度服务器/workspace.yaml deleted file mode 100644 index f0d30c9..0000000 --- a/任务调度服务器/workspace.yaml +++ /dev/null @@ -1,2 +0,0 @@ -load_from: - - python_file: dagster.py \ No newline at end of file