Compare commits

...

4 Commits

Author SHA1 Message Date
fanbook-wangdage
036b64c845 支持测试版本的资源筛选,增强用户列表的查询功能 2026-02-10 20:14:37 +08:00
fanbook-wangdage
cb925f7200 添加下载资源和下载资源管理功能 2026-02-05 21:53:26 +08:00
fanbook-wangdage
6b82806931 添加类型注释、修复抽卡id问题 2026-02-03 12:12:45 +08:00
fanbook-wangdage
aa82a19ac7 更新验证码的存储方式、支持html验证码邮件 2026-01-31 14:25:33 +08:00
16 changed files with 893 additions and 59 deletions

View File

@@ -48,12 +48,18 @@ print("Keys generated.")
},
"EMAIL": {
"GMAIL_USER": "wdgwdg889@gmail.com",
"APP_PASSWORD": ""
"APP_PASSWORD": "",
"APP_NAME": "WDG Snap Hutao",
"OFFICIAL_WEBSITE": "https://htserver.wdg.cloudns.ch/",
"SUBJECT": "WDG Snap Hutao 验证码"
},
"RSA": {
"PRIVATE_KEY_FILE": "private.pem",
"PUBLIC_KEY_FILE": "public.pem"
},
"VERIFICATION_CODE": {
"EXPIRE_MINUTES": 10
},
"LOGGING": {
"LEVEL": "DEBUG",
"FORMAT": ""
@@ -76,8 +82,12 @@ print("Keys generated.")
| JWT.EXPIRATION_HOURS | JWT过期时间小时 |
| EMAIL.GMAIL_USER | 用于发送验证邮件的Gmail账号 |
| EMAIL.APP_PASSWORD | Gmail应用专用密码 |
| EMAIL.APP_NAME | 应用名称,用于邮件显示 |
| EMAIL.OFFICIAL_WEBSITE | 官方网站地址,用于邮件中的链接 |
| EMAIL.SUBJECT | 验证邮件的主题 |
| RSA.PRIVATE_KEY_FILE | RSA私钥文件路径 |
| RSA.PUBLIC_KEY_FILE | RSA公钥文件路径 |
| VERIFICATION_CODE.EXPIRE_MINUTES | 验证码过期时间(分钟) |
| LOGGING.LEVEL | 日志记录级别生产环境建议设置为INFO |
| LOGGING.FORMAT | 日志记录格式 |

View File

@@ -9,12 +9,16 @@ app_password = config_loader.EMAIL_APP_PASSWORD
to_email = ""
def send_email(gmail_user, app_password, to_email, subject, body="这是一封测试邮件。"):
def send_email(gmail_user, app_password, to_email, subject, body="这是一封测试邮件。", app_name=None, body_type="plain"):
msg = MIMEMultipart()
# 如果提供了 app_name使用带显示名称的格式
if app_name:
msg["From"] = f"{app_name} <{gmail_user}>"
else:
msg["From"] = gmail_user
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
msg.attach(MIMEText(body, body_type))
# Gmail SMTP 服务器
server = smtplib.SMTP("smtp.gmail.com", 587)

View File

@@ -6,3 +6,7 @@ class Config:
MONGO_URI = config_loader.MONGO_URI
TIMEZONE = config_loader.TIMEZONE
ISTEST_MODE = config_loader.ISTEST_MODE
EMAIL_APP_NAME = config_loader.EMAIL_APP_NAME
EMAIL_OFFICIAL_WEBSITE = config_loader.EMAIL_OFFICIAL_WEBSITE
EMAIL_SUBJECT = config_loader.EMAIL_SUBJECT
VERIFICATION_CODE_EXPIRE_MINUTES = config_loader.VERIFICATION_CODE_EXPIRE_MINUTES

View File

@@ -22,7 +22,7 @@ class ConfigLoader:
return self._config
def get(self, key: str, default=None):
def get(self, key: str, default=None) -> Any:
"""获取配置值,支持点号分隔的嵌套键"""
config = self.load_config()
keys = key.split('.')
@@ -96,5 +96,21 @@ class ConfigLoader:
def LOGGING_FORMAT(self) -> str:
return self.get('LOGGING.FORMAT', '%(asctime)s %(name)s %(levelname)s %(message)s')
@property
def EMAIL_APP_NAME(self) -> str:
return self.get('EMAIL.APP_NAME', 'WDG Snap Hutao')
@property
def EMAIL_OFFICIAL_WEBSITE(self) -> str:
return self.get('EMAIL.OFFICIAL_WEBSITE', 'https://htserver.wdg.cloudns.ch/')
@property
def EMAIL_SUBJECT(self) -> str:
return self.get('EMAIL.SUBJECT', 'WDG Snap Hutao 验证码')
@property
def VERIFICATION_CODE_EXPIRE_MINUTES(self) -> int:
return self.get('VERIFICATION_CODE.EXPIRE_MINUTES', 10)
# 创建全局配置实例
config_loader = ConfigLoader()

View File

@@ -26,10 +26,10 @@ def init_mongo(uri: str, test_mode=False):
logger.error(f"MongoDB connection failed: {e}")
raise
def generate_code(length=6):
def generate_code(length=6) -> str:
"""生成数字验证码"""
return ''.join(secrets.choice('0123456789') for _ in range(length))
def generate_numeric_id(length=8):
def generate_numeric_id(length=8) -> str:
"""生成数字ID"""
return ''.join(secrets.choice(string.digits) for _ in range(length))

View File

@@ -15,12 +15,14 @@ def create_app():
from routes.gacha_log import gacha_log_bp
from routes.web_api import web_api_bp
from routes.misc import misc_bp
from routes.download_resource import download_resource_bp
app.register_blueprint(announcement_bp, url_prefix="/Announcement")
app.register_blueprint(auth_bp)
app.register_blueprint(gacha_log_bp)
app.register_blueprint(web_api_bp)
app.register_blueprint(misc_bp)
app.register_blueprint(download_resource_bp)
# CORS
@app.after_request

View File

@@ -3,14 +3,42 @@ import datetime
from flask import current_app
from app.config_loader import config_loader
def create_token(user_id):
def create_token(user_id: str) -> str:
"""
创建JWT访问令牌有效期由配置文件中的JWT_EXPIRATION_HOURS决定。
:param user_id: 用户ID
:return: JWT 访问令牌
"""
payload = {
"user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS)
}
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM)
def verify_token(token):
# 创建刷新token有效期是访问token的两倍
def create_refresh_token(user_id: str) -> str:
"""
创建JWT刷新令牌有效期为访问令牌的两倍。
:param user_id: 用户ID
:return: JWT 刷新令牌
"""
payload = {
"user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS * 2)
}
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM)
def verify_token(token: str)-> str | None:
"""
验证JWT令牌并返回用户ID如果无效则返回None。
:param token: JWT令牌字符串
:type token: str
:return: 用户ID或None
:rtype: str | None
"""
try:
data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=[config_loader.JWT_ALGORITHM])
return data["user_id"]

View File

@@ -1,10 +1,12 @@
from flask import Blueprint, request, jsonify, session
from app.utils.jwt_utils import create_token, verify_token
from flask import Blueprint, request, jsonify
from app.utils.jwt_utils import create_token, verify_token, create_refresh_token
from services.auth_service import (
decrypt_data, send_verification_email, verify_user_credentials,
create_user_account, get_user_by_id
)
from services.verification_code_service import save_verification_code, verify_code
from app.extensions import generate_code, logger , config_loader
from app.config import Config
auth_bp = Blueprint("auth", __name__)
@@ -28,8 +30,8 @@ def passport_verify():
# 生成验证码
code = generate_code(6)
session['verification_code'] = code
session['email'] = decrypted_email
# 使用 MongoDB TTL 存储验证码
save_verification_code(decrypted_email, code, expire_minutes=Config.VERIFICATION_CODE_EXPIRE_MINUTES)
# 发送邮件
if send_verification_email(decrypted_email, code):
@@ -68,9 +70,8 @@ def passport_register():
"data": None
}), 400
# 验证验证码
if (session.get('verification_code') != decrypted_code or
session.get('email') != decrypted_email):
# 使用 MongoDB 验证验证码
if not verify_code(decrypted_email, decrypted_code):
logger.warning("Invalid verification code")
return jsonify({
"retcode": 2,
@@ -88,12 +89,10 @@ def passport_register():
"data": None
})
# 删除session中的验证码和邮箱
session.pop('verification_code', None)
session.pop('email', None)
# 创建token
access_token = create_token(str(new_user['_id']))
# 刷新token
refresh_token = create_refresh_token(str(new_user['_id']))
logger.info(f"User registered: {decrypted_email}")
return jsonify({
@@ -101,7 +100,7 @@ def passport_register():
"message": "success",
"data": {
"AccessToken": access_token,
"RefreshToken": access_token,
"RefreshToken": refresh_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
@@ -139,6 +138,7 @@ def passport_login():
# 创建token
access_token = create_token(str(user['_id']))
refresh_token = create_refresh_token(str(user['_id']))
logger.info(f"User logged in: {decrypted_email}")
return jsonify({
@@ -147,7 +147,7 @@ def passport_login():
"l10nKey": "ServerPassportLoginSucceed",
"data": {
"AccessToken": access_token,
"RefreshToken": access_token,
"RefreshToken": refresh_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
@@ -217,6 +217,7 @@ def passport_refresh_token():
})
access_token = create_token(user_id)
refresh_token = create_refresh_token(user_id)
logger.info(f"Token refreshed for user_id: {user_id}")
return jsonify({
@@ -224,7 +225,7 @@ def passport_refresh_token():
"message": "success",
"data": {
"AccessToken": access_token,
"RefreshToken": access_token,
"RefreshToken": refresh_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})

270
routes/download_resource.py Normal file
View File

@@ -0,0 +1,270 @@
from flask import Blueprint, request, jsonify
from app.decorators import require_maintainer_permission
from app.extensions import logger
from services.download_resource_service import (
create_download_resource,
get_download_resources,
get_download_resource_by_id,
update_download_resource,
delete_download_resource,
get_latest_version
)
download_resource_bp = Blueprint("download_resource", __name__)
# 公开API - 获取下载资源列表
@download_resource_bp.route('/download-resources', methods=['GET'])
def get_public_download_resources():
"""
获取下载资源列表公开API
可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回所有
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
"""
package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_test参数默认只返回正式版本
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
# 只返回激活的资源
resources = get_download_resources(package_type=package_type, is_active=True, is_test=is_test)
return jsonify({
"code": 0,
"message": "success",
"data": resources
})
@download_resource_bp.route('/download-resources/latest', methods=['GET'])
def get_latest_download_resource():
"""
获取最新版本公开API
可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回最新的任意类型
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
"""
package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_test参数
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resource = get_latest_version(package_type=package_type, is_test=is_test)
if resource:
return jsonify({
"code": 0,
"message": "success",
"data": resource
})
else:
return jsonify({
"code": 1,
"message": "No resource found",
"data": None
}), 404
# Web管理端API - 增删改查
@download_resource_bp.route('/web-api/download-resources', methods=['POST'])
@require_maintainer_permission
def web_api_create_download_resource():
"""创建下载资源"""
data = request.get_json()
# 验证必需字段
required_fields = ['version', 'package_type', 'download_url']
if not all(k in data for k in required_fields):
return jsonify({
"code": 1,
"message": f"Missing required fields: {', '.join(required_fields)}",
"data": None
}), 400
# 验证package_type
if data['package_type'] not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 添加创建者信息
data['created_by'] = str(request.current_user['_id'])
# 创建资源
resource_id = create_download_resource(data)
if resource_id:
logger.info(f"Download resource created with ID: {resource_id} by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource created successfully",
"data": {
"id": str(resource_id)
}
})
else:
logger.error("Failed to create download resource")
return jsonify({
"code": 2,
"message": "Failed to create download resource",
"data": None
}), 500
@download_resource_bp.route('/web-api/download-resources', methods=['GET'])
@require_maintainer_permission
def web_api_get_download_resources():
"""获取下载资源列表(管理端,包含所有资源,包括未激活的)"""
package_type = request.args.get('package_type')
is_active_str = request.args.get('is_active')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_active参数
is_active = None
if is_active_str is not None:
is_active = is_active_str.lower() == 'true'
# 处理is_test参数
is_test = None
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resources = get_download_resources(package_type=package_type, is_active=is_active, is_test=is_test)
return jsonify({
"code": 0,
"message": "success",
"data": resources
})
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['GET'])
@require_maintainer_permission
def web_api_get_download_resource(resource_id):
"""获取单个下载资源详情"""
resource = get_download_resource_by_id(resource_id)
if resource:
return jsonify({
"code": 0,
"message": "success",
"data": resource
})
else:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['PUT'])
@require_maintainer_permission
def web_api_update_download_resource(resource_id):
"""更新下载资源"""
data = request.get_json()
# 检查资源是否存在
existing_resource = get_download_resource_by_id(resource_id)
if not existing_resource:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
# 验证package_type如果提供
if 'package_type' in data and data['package_type'] not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 添加更新者信息
data['updated_by'] = str(request.current_user['_id'])
# 更新资源
success = update_download_resource(resource_id, data)
if success:
logger.info(f"Download resource {resource_id} updated by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource updated successfully",
"data": None
})
else:
logger.error(f"Failed to update download resource {resource_id}")
return jsonify({
"code": 2,
"message": "Failed to update download resource",
"data": None
}), 500
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['DELETE'])
@require_maintainer_permission
def web_api_delete_download_resource(resource_id):
"""删除下载资源"""
# 检查资源是否存在
existing_resource = get_download_resource_by_id(resource_id)
if not existing_resource:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
# 删除资源
success = delete_download_resource(resource_id)
if success:
logger.info(f"Download resource {resource_id} deleted by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource deleted successfully",
"data": None
})
else:
logger.error(f"Failed to delete download resource {resource_id}")
return jsonify({
"code": 2,
"message": "Failed to delete download resource",
"data": None
}), 500

View File

@@ -119,6 +119,7 @@ def gacha_log_retrieve():
filtered_items = retrieve_gacha_log(user_id, uid, end_ids)
logger.info(f"Gacha log retrieved for user_id: {user_id}, uid: {uid}, items count: {len(filtered_items)}")
logger.debug(f"end_ids: {end_ids}")
return jsonify({
"retcode": 0,

View File

@@ -239,9 +239,15 @@ def web_api_get_users():
"data": None
}), 403
# 获取搜索参数
# 获取查询参数
q = request.args.get("q", "").strip()
users = get_users_with_search(q)
role = request.args.get("role", "").strip() if request.args.get("role") else None
email = request.args.get("email", "").strip() if request.args.get("email") else None
username = request.args.get("username", "").strip() if request.args.get("username") else None
id_param = request.args.get("id", "").strip() if request.args.get("id") else None
is_licensed = request.args.get("is", "").strip() if request.args.get("is") else None
users = get_users_with_search(q, role, email, username, id_param, is_licensed)
return jsonify({
"code": 0,

View File

@@ -2,6 +2,12 @@ from app.extensions import client, logger
from app.config import Config
def get_announcements(request_data: list):
"""
获取公告列表,过滤掉用户已关闭的公告
:param request_data: 用户已关闭的公告ID列表
:type request_data: list
"""
if Config.ISTEST_MODE:
return []
# 记录请求体到日志请求体中是用户已关闭的公告ID列表

View File

@@ -2,9 +2,9 @@ from bson import ObjectId
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import client, logger
from app.config import Config
from app.config_loader import config_loader
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from app.config_loader import config_loader
from datetime import timezone
from zoneinfo import ZoneInfo
import datetime
@@ -12,7 +12,7 @@ import SendEmailTool
import re
import base64
def decrypt_data(encrypted_data):
def decrypt_data(encrypted_data: str) -> str:
"""使用RSA私钥解密数据"""
try:
private_key_file = config_loader.RSA_PRIVATE_KEY_FILE
@@ -26,18 +26,123 @@ def decrypt_data(encrypted_data):
raise
def send_verification_email(email, code):
"""发送验证码邮件"""
def send_verification_email(email: str, code: str, ACTION_NAME="注册", EXPIRE_MINUTES=None) -> bool:
"""发送验证码邮件,目前只有注册场景,后续再扩展其他场景"""
try:
subject = "Snap Hutao 验证码"
body = f"您的验证码是: {code}"
subject = Config.EMAIL_SUBJECT
textbody = f"您的验证码是: {code}"
APP_NAME = Config.EMAIL_APP_NAME
OFFICIAL_WEBSITE = Config.EMAIL_OFFICIAL_WEBSITE
if EXPIRE_MINUTES is None:
EXPIRE_MINUTES = Config.VERIFICATION_CODE_EXPIRE_MINUTES
htmlbody = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>验证码邮件</title>
</head>
<body style="margin:0;padding:0;background-color:#f5f6f7;font-family:Arial,Helvetica,sans-serif;">
<table width="100%" cellpadding="0" cellspacing="0">
<tr>
<td align="center" style="padding:40px 0;">
<!-- 主体卡片 -->
<table width="480" cellpadding="0" cellspacing="0" style="background:#ffffff;border-radius:8px;padding:32px;">
<!-- 应用名 -->
<tr>
<td align="center" style="font-size:22px;font-weight:bold;color:#333333;padding-bottom:16px;">
{APP_NAME}
</td>
</tr>
<!-- 操作提示 -->
<tr>
<td style="font-size:14px;color:#555555;padding-bottom:24px;text-align:center;">
你正在进行 <strong>{ACTION_NAME}</strong> 操作,请使用以下验证码完成验证:
</td>
</tr>
<!-- 验证码 -->
<tr>
<td align="center" style="padding:20px 0;">
<div style="
display:inline-block;
font-size:32px;
font-weight:bold;
letter-spacing:6px;
color:#2e86de;
padding:12px 24px;
border:1px dashed #2e86de;
border-radius:6px;
">
{code}
</div>
</td>
</tr>
<!-- 有效期说明 -->
<tr>
<td style="font-size:13px;color:#888888;text-align:center;padding-top:16px;">
验证码有效期 {EXPIRE_MINUTES} 分钟,请勿泄露给他人。
</td>
</tr>
<!-- 分割线 -->
<tr>
<td style="padding:24px 0;">
<hr style="border:none;border-top:1px solid #eeeeee;">
</td>
</tr>
<!-- 底部信息 -->
<tr>
<td style="font-size:12px;color:#999999;text-align:center;line-height:1.6;">
本邮件由 {APP_NAME} 系统自动发送,请勿回复<br>
如非本人操作,请忽略本邮件
</td>
</tr>
<!-- 官网链接 -->
<tr>
<td align="center" style="padding-top:16px;">
<a href="{OFFICIAL_WEBSITE}"
style="font-size:12px;color:#2e86de;text-decoration:none;">
访问官网
</a>
</td>
</tr>
</table>
</td>
</tr>
</table>
</body>
</html>
"""
try:
SendEmailTool.send_email(
SendEmailTool.gmail_user,
SendEmailTool.app_password,
config_loader.EMAIL_GMAIL_USER,
config_loader.EMAIL_APP_PASSWORD,
email,
subject,
body
htmlbody,
app_name=APP_NAME,
body_type="html"
)
logger.info(f"HTML Verification email sent to {email}")
except Exception as e:
logger.error(f"Failed to send HTML verification email to {email}: {e}")
# 如果 HTML 邮件发送失败,尝试发送纯文本邮件
SendEmailTool.send_email(
config_loader.EMAIL_GMAIL_USER,
config_loader.EMAIL_APP_PASSWORD,
email,
subject,
textbody,
app_name=APP_NAME,
body_type="plain"
)
logger.info(f"Verification email sent to {email}")
return True
@@ -46,7 +151,7 @@ def send_verification_email(email, code):
return False
def verify_user_credentials(email, password):
def verify_user_credentials(email: str, password: str) -> dict | None:
"""验证用户凭据"""
user = client.ht_server.users.find_one({"email": email})
@@ -56,7 +161,7 @@ def verify_user_credentials(email, password):
return user
def create_user_account(email, password):
def create_user_account(email: str, password: str) -> dict | None:
"""创建新用户账户"""
# 检查用户是否已存在
existing_user = client.ht_server.users.find_one({"email": email})
@@ -86,7 +191,7 @@ def create_user_account(email, password):
return new_user
def get_user_by_id(user_id):
def get_user_by_id(user_id: str) -> dict | None:
"""根据ID获取用户信息"""
try:
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
@@ -98,32 +203,82 @@ def get_user_by_id(user_id):
return None
def get_users_with_search(query_text=""):
"""获取用户列表,支持搜索"""
def get_users_with_search(query_text="", role=None, email=None, username=None, id=None, is_licensed=None) -> list:
"""获取用户列表,支持多种筛选条件"""
import re
# 构建查询条件
query = {}
or_conditions = []
and_conditions = []
# 通用搜索q 参数)- 匹配用户名、邮箱、ID
if query_text:
or_conditions = []
# 用户名模糊搜索
or_conditions.append({
"UserName": {"$regex": re.escape(query_text), "$options": "i"}
})
# 邮箱模糊搜索
or_conditions.append({
"email": {"$regex": re.escape(query_text), "$options": "i"}
})
# _id 搜索(支持完整或前缀)
if ObjectId.is_valid(query_text):
or_conditions.append({
"_id": ObjectId(query_text)
})
and_conditions.append({"$or": or_conditions})
query = {"$or": or_conditions}
# 按角色筛选
if role:
if role == "maintainer":
and_conditions.append({"IsMaintainer": True})
elif role == "developer":
and_conditions.append({"IsLicensedDeveloper": True})
elif role == "user":
# user 表示既不是 maintainer 也不是 developer
and_conditions.append({
"$and": [
{"IsMaintainer": {"$ne": True}},
{"IsLicensedDeveloper": {"$ne": True}}
]
})
# 按邮箱筛选(支持模糊匹配)
if email:
and_conditions.append({
"email": {"$regex": re.escape(email), "$options": "i"}
})
# 按用户名筛选(支持模糊匹配)
if username:
and_conditions.append({
"UserName": {"$regex": re.escape(username), "$options": "i"}
})
# 按用户ID筛选支持模糊匹配
if id:
if ObjectId.is_valid(id):
and_conditions.append({"_id": ObjectId(id)})
else:
# 如果不是有效的 ObjectId尝试匹配字符串形式的 _id
and_conditions.append({
"_id": {"$regex": re.escape(id), "$options": "i"}
})
# 按状态筛选
if is_licensed:
if is_licensed == "licensed":
and_conditions.append({"IsLicensedDeveloper": True})
elif is_licensed == "not-licensed":
and_conditions.append({"IsLicensedDeveloper": False})
# 构建 AND 查询
if and_conditions:
if len(and_conditions) == 1:
query = and_conditions[0]
else:
query = {"$and": and_conditions}
# 查询数据库(排除密码)
cursor = client.ht_server.users.find(query, {"password": 0})

View File

@@ -0,0 +1,241 @@
import datetime
from app.extensions import client, logger
from app.config import Config
def create_download_resource(data):
"""
创建下载资源
:param data: 包含以下字段的数据
- version: 版本号
- package_type: 包类型 (msi/msix)
- download_url: 下载链接
- features: 新功能描述
- file_size: 文件大小 (可选)
- file_hash: 文件哈希 (可选)
- is_active: 是否激活 (可选默认为True)
- is_test: 是否为测试版本 (可选默认为False)
:return: 创建的资源ID或None
"""
try:
resource_doc = {
"version": data['version'],
"package_type": data['package_type'],
"download_url": data['download_url'],
"features": data.get('features', ''),
"file_size": data.get('file_size'),
"file_hash": data.get('file_hash'),
"is_active": data.get('is_active', True),
"is_test": data.get('is_test', False),
"created_at": datetime.datetime.utcnow(),
"created_by": data.get('created_by')
}
result = client.ht_server.download_resources.insert_one(resource_doc)
logger.info(f"Download resource created with ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Failed to create download resource: {e}")
return None
def get_download_resources(package_type=None, is_active=None, is_test=None):
"""
获取下载资源列表
:param package_type: 包类型过滤 (msi/msix)None表示获取所有
:param is_active: 是否激活过滤None表示获取所有
:param is_test: 是否为测试版本过滤None表示获取所有
:return: 资源列表
"""
try:
query = {}
if package_type:
query['package_type'] = package_type
if is_active is not None:
query['is_active'] = is_active
if is_test is not None:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
if is_test:
query['is_test'] = True
else:
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
resources = list(client.ht_server.download_resources.find(query, sort=[("created_at", -1)]))
# 移除 _id 字段并转换日期
result = []
for r in resources:
r = dict(r)
# 转换_id为字符串并存为id字段
r['id'] = str(r.pop('_id'))
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in r:
r['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in r and isinstance(r['created_at'], datetime.datetime):
dt = r['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
r['created_at'] = dt.isoformat()
if 'updated_at' in r and isinstance(r['updated_at'], datetime.datetime):
dt = r['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
r['updated_at'] = dt.isoformat()
result.append(r)
return result
except Exception as e:
logger.error(f"Failed to get download resources: {e}")
return []
def get_download_resource_by_id(resource_id):
"""
根据ID获取下载资源
:param resource_id: 资源ID
:return: 资源对象或None
"""
try:
from bson import ObjectId
resource = client.ht_server.download_resources.find_one({"_id": ObjectId(resource_id)})
if resource:
resource = dict(resource)
resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['created_at'] = dt.isoformat()
if 'updated_at' in resource and isinstance(resource['updated_at'], datetime.datetime):
dt = resource['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['updated_at'] = dt.isoformat()
return resource
return None
except Exception as e:
logger.error(f"Failed to get download resource by ID: {e}")
return None
def update_download_resource(resource_id, data):
"""
更新下载资源
:param resource_id: 资源ID
:param data: 要更新的字段
:return: 是否成功
"""
try:
from bson import ObjectId
# 构建更新数据
update_data = {"updated_at": datetime.datetime.utcnow()}
if 'version' in data:
update_data['version'] = data['version']
if 'package_type' in data:
update_data['package_type'] = data['package_type']
if 'download_url' in data:
update_data['download_url'] = data['download_url']
if 'features' in data:
update_data['features'] = data['features']
if 'file_size' in data:
update_data['file_size'] = data['file_size']
if 'file_hash' in data:
update_data['file_hash'] = data['file_hash']
if 'is_active' in data:
update_data['is_active'] = data['is_active']
if 'is_test' in data:
update_data['is_test'] = data['is_test']
if 'updated_by' in data:
update_data['updated_by'] = data['updated_by']
result = client.ht_server.download_resources.update_one(
{"_id": ObjectId(resource_id)},
{"$set": update_data}
)
if result.modified_count > 0:
logger.info(f"Download resource {resource_id} updated successfully")
return True
return False
except Exception as e:
logger.error(f"Failed to update download resource: {e}")
return False
def delete_download_resource(resource_id):
"""
删除下载资源
:param resource_id: 资源ID
:return: 是否成功
"""
try:
from bson import ObjectId
result = client.ht_server.download_resources.delete_one({"_id": ObjectId(resource_id)})
if result.deleted_count > 0:
logger.info(f"Download resource {resource_id} deleted successfully")
return True
return False
except Exception as e:
logger.error(f"Failed to delete download resource: {e}")
return False
def get_latest_version(package_type=None, is_test=False):
"""
获取最新版本
:param package_type: 包类型 (msi/msix)None表示获取所有类型的最新版本
:param is_test: 是否包含测试版本默认为False只返回正式版本
:return: 资源对象或None
"""
try:
query = {"is_active": True}
if not is_test:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
else:
query['is_test'] = True
if package_type:
query['package_type'] = package_type
resource = client.ht_server.download_resources.find_one(
query,
sort=[("created_at", -1)]
)
if resource:
resource = dict(resource)
resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['created_at'] = dt.isoformat()
if 'updated_at' in resource and isinstance(resource['updated_at'], datetime.datetime):
dt = resource['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['updated_at'] = dt.isoformat()
return resource
return None
except Exception as e:
logger.error(f"Failed to get latest version: {e}")
return None

View File

@@ -1,5 +1,17 @@
from app.extensions import client, logger
"""
注意记录中有两种类型GachaType和QueryType(uigf_gacha_type)GachaType多了一个400类型其实就是QueryType的301类型客户端传的end_ids是按QueryType来的如果按照GachaType来筛选会多出400类型的记录
映射关系
| `uigf_gacha_type` | `gacha_type` |
|-------------------|----------------|
| `100` | `100` |
| `200` | `200` |
| `301` | `301` or `400` |
| `302` | `302` |
| `500` | `500` |
"""
def get_gacha_log_entries(user_id):
"""获取用户的祈愿记录条目列表"""
@@ -41,6 +53,9 @@ def get_gacha_log_end_ids(user_id, uid):
if gacha_type in end_ids:
end_ids[gacha_type] = max(end_ids[gacha_type], item_id)
# 400类型对应301类型
end_ids["400"] = end_ids["301"]
return end_ids
@@ -82,6 +97,11 @@ def retrieve_gacha_log(user_id, uid, end_ids):
# 筛选出比end_ids更旧的记录
filtered_items = []
# 需要将end_ids的key从QueryType转换为GachaType给400赋值为301的值即可
if "301" in end_ids:
end_ids["400"] = end_ids["301"]
for item in gacha_log['data']:
gacha_type = str(item.get('GachaType', ''))
item_id = item.get('Id', 0)

View File

@@ -0,0 +1,70 @@
import datetime
from app.extensions import client, logger
def init_verification_code_collection():
"""初始化验证码集合并创建 TTL 索引"""
db = client.ht_server
collection = db.verification_codes
indexes = collection.list_indexes()
ttl_index_exists = False
for index in indexes:
if index.get('expireAfterSeconds') is not None:
ttl_index_exists = True
break
# 如果不存在 TTL 索引,则创建
if not ttl_index_exists:
collection.create_index(
[("expire_at", 1)],
expireAfterSeconds=0,
name="expire_at_ttl"
)
logger.info("Created TTL index on verification_codes collection")
def save_verification_code(email: str, code: str, expire_minutes: int = 10):
"""保存验证码到 MongoDB自动过期时间由 TTL 索引控制"""
db = client.ht_server
collection = db.verification_codes
# 确保集合已初始化
init_verification_code_collection()
# 计算过期时间
expire_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=expire_minutes)
# 插入验证码记录
result = collection.insert_one({
"email": email,
"code": code,
"created_at": datetime.datetime.utcnow(),
"expire_at": expire_at,
"used": False
})
logger.debug(f"Saved verification code for email: {email}")
return result.inserted_id
def verify_code(email: str, code: str) -> bool:
"""验证验证码是否正确,验证成功后删除验证码记录"""
db = client.ht_server
collection = db.verification_codes
# 查找未使用的验证码
verification_record = collection.find_one({
"email": email,
"code": code,
"used": False
})
if verification_record:
# 验证成功,删除该验证码记录
collection.delete_one({"_id": verification_record["_id"]})
logger.info(f"Verification code validated and deleted for email: {email}")
return True
logger.warning(f"Invalid or expired verification code for email: {email}")
return False