Compare commits

...

13 Commits

Author SHA1 Message Date
wangdage12
3d2a620820 Update README with API and server details
Added information about API services, server infrastructure, and usage recommendations for metadata repositories.
2026-02-12 23:26:24 +08:00
wangdage12
fb977fdeb5 Merge pull request #9 from wangdage12/dev
支持测试版本的资源筛选,增强用户列表的查询功能
2026-02-10 20:26:23 +08:00
fanbook-wangdage
036b64c845 支持测试版本的资源筛选,增强用户列表的查询功能 2026-02-10 20:14:37 +08:00
wangdage12
01f51d82cd Update README with deployment requirements
Added resource and environment requirements for deployment.
2026-02-07 21:56:24 +08:00
wangdage12
c6004bec96 Merge pull request #8 from wangdage12/dev
添加下载资源和下载资源管理功能
2026-02-05 22:06:13 +08:00
fanbook-wangdage
cb925f7200 添加下载资源和下载资源管理功能 2026-02-05 21:53:26 +08:00
wangdage12
6e09869df0 Merge pull request #7 from wangdage12/dev
添加类型注释、修复抽卡id问题导致的返回异常和刷新token的问题
2026-02-03 12:25:01 +08:00
fanbook-wangdage
6b82806931 添加类型注释、修复抽卡id问题 2026-02-03 12:12:45 +08:00
wangdage12
40bd74c101 Merge pull request #5 from wangdage12/dev
更新验证码的存储方式、支持html验证码邮件
2026-01-31 15:22:35 +08:00
fanbook-wangdage
aa82a19ac7 更新验证码的存储方式、支持html验证码邮件 2026-01-31 14:25:33 +08:00
wangdage12
f2ca58182e Merge pull request #1 from wangdage12/dev
初始版本
2026-01-25 19:58:39 +08:00
fanbook-wangdage
199308fab1 添加文档并修复问题 2026-01-25 19:22:49 +08:00
fanbook-wangdage
c1339a9858 修复问题 2026-01-25 18:49:34 +08:00
18 changed files with 1068 additions and 94 deletions

146
README.md
View File

@@ -1,2 +1,148 @@
# Snap.Server # Snap.Server
Snap.Hutao新后端API Snap.Hutao新后端API
## 部署方法
> **资源和环境要求**
> 服务器硬件:
> 最低1核CPU1GB内存
>
> 运行环境:
> `Windows10`及以上、`Windows Server 2019`及以上、`Linux`
> `Python3.12`及以上
> `MongoDB`
### 在服务器生成RSA密钥
执行以下代码在根目录生成密钥:
```python
from Crypto.PublicKey import RSA
# 生成 2048 位 RSA 密钥对
key = RSA.generate(2048)
private_key = key.export_key()
public_key = key.publickey().export_key()
with open("private.pem", "wb") as f:
f.write(private_key)
with open("public.pem", "wb") as f:
f.write(public_key)
print("Keys generated.")
```
**确保客户端的公钥和生成的相同,否则将无法使用账户功能**
### 创建配置文件
创建`config.json`文件,示例内容如下:
```json
{
"SECRET_KEY": "jwt_secret_key",
"MONGO_URI": "mongodb+srv://wdgwdg889_db_user:xxxxxx@cluster0.eplrcvl.mongodb.net/?appName=Cluster0",
"TIMEZONE": "Asia/Shanghai",
"ISTEST_MODE": false,
"SERVER": {
"HOST": "0.0.0.0",
"PORT": 5222,
"DEBUG": false
},
"JWT": {
"ALGORITHM": "HS256",
"EXPIRATION_HOURS": 24
},
"EMAIL": {
"GMAIL_USER": "wdgwdg889@gmail.com",
"APP_PASSWORD": "",
"APP_NAME": "WDG Snap Hutao",
"OFFICIAL_WEBSITE": "https://htserver.wdg.cloudns.ch/",
"SUBJECT": "WDG Snap Hutao 验证码"
},
"RSA": {
"PRIVATE_KEY_FILE": "private.pem",
"PUBLIC_KEY_FILE": "public.pem"
},
"VERIFICATION_CODE": {
"EXPIRE_MINUTES": 10
},
"LOGGING": {
"LEVEL": "DEBUG",
"FORMAT": ""
}
}
```
参数说明:
| 参数 | 说明 |
|------|------|
| SECRET_KEY | 用于JWT签名的密钥请设置为复杂字符串 |
| MONGO_URI | MongoDB连接字符串 |
| TIMEZONE | 服务器时区 |
| ISTEST_MODE | 是否启用测试模式,测试模式下部分功能将返回默认值,不连接数据库 |
| SERVER.HOST | 服务器监听地址 |
| SERVER.PORT | 服务器监听端口 |
| SERVER.DEBUG | 是否启用Flask的调试模式 |
| JWT.ALGORITHM | JWT签名算法 |
| JWT.EXPIRATION_HOURS | JWT过期时间小时 |
| EMAIL.GMAIL_USER | 用于发送验证邮件的Gmail账号 |
| EMAIL.APP_PASSWORD | Gmail应用专用密码 |
| EMAIL.APP_NAME | 应用名称,用于邮件显示 |
| EMAIL.OFFICIAL_WEBSITE | 官方网站地址,用于邮件中的链接 |
| EMAIL.SUBJECT | 验证邮件的主题 |
| RSA.PRIVATE_KEY_FILE | RSA私钥文件路径 |
| RSA.PUBLIC_KEY_FILE | RSA公钥文件路径 |
| VERIFICATION_CODE.EXPIRE_MINUTES | 验证码过期时间(分钟) |
| LOGGING.LEVEL | 日志记录级别生产环境建议设置为INFO |
| LOGGING.FORMAT | 日志记录格式 |
### 开发环境启动方法
确保已安装依赖:
```
pip install -r requirements.txt
```
运行Flask应用
```
python app.py
```
### 生产环境启动方法
建议使用Gunicorn部署
```
pip install -r requirements.txt && python -m gunicorn run:app --bind 0.0.0.0:5222 --workers 4 --threads 2 --access-logfile - --error-logfile -
```
请根据服务器性能调整`--workers``--threads`参数。
### API文档和官方开放平台
**API文档可以在该地址访问**
https://rdgm3wrj7r.apifox.cn/
> 项目官方API和文件资源服务、仓库镜像为各开源项目免费提供
本项目官方API地址https://htserver.wdg.cloudns.ch/api/
<img width="901" height="522" alt="服务拓补结构" src="https://github.com/user-attachments/assets/9cd2f0d5-372e-46b6-b64b-643df661b445" />
我们的项目官方服务采用多台服务器,其中日本的主服务器用来提供所有基础服务,甘肃的自建服务器提供加速和负载均衡,以确保服务稳定
元数据仓库镜像可以随时调用获取元数据仓库API`/git-repository/all`得到
由于Git仓库几乎无法被CDN缓存频繁拉取镜像仓库会对服务器造成压力请各位在使用量较大的情况下自建仓库镜像
甘肃服务器只能使用ipv6访问同时比较不稳定只能用于加速不要将它直接作为主要服务使用
甘肃服务器2为备用服务器计划在用户量较大时作为API以负载均衡的方式提供不在极端情况下不采用和仓库镜像使用
### 注意事项
在轻量使用的场景下可以直接使用MongoDB Atlas的免费套餐但在高并发场景下建议使用自建MongoDB服务器以获得更好的性能和稳定性。
新MongoDB数据库会在写入数据时自动创建无需手动创建数据库和集合。

View File

@@ -9,12 +9,16 @@ app_password = config_loader.EMAIL_APP_PASSWORD
to_email = "" to_email = ""
def send_email(gmail_user, app_password, to_email, subject, body="这是一封测试邮件。"): def send_email(gmail_user, app_password, to_email, subject, body="这是一封测试邮件。", app_name=None, body_type="plain"):
msg = MIMEMultipart() msg = MIMEMultipart()
# 如果提供了 app_name使用带显示名称的格式
if app_name:
msg["From"] = f"{app_name} <{gmail_user}>"
else:
msg["From"] = gmail_user msg["From"] = gmail_user
msg["To"] = to_email msg["To"] = to_email
msg["Subject"] = subject msg["Subject"] = subject
msg.attach(MIMEText(body, "plain")) msg.attach(MIMEText(body, body_type))
# Gmail SMTP 服务器 # Gmail SMTP 服务器
server = smtplib.SMTP("smtp.gmail.com", 587) server = smtplib.SMTP("smtp.gmail.com", 587)

View File

@@ -6,3 +6,7 @@ class Config:
MONGO_URI = config_loader.MONGO_URI MONGO_URI = config_loader.MONGO_URI
TIMEZONE = config_loader.TIMEZONE TIMEZONE = config_loader.TIMEZONE
ISTEST_MODE = config_loader.ISTEST_MODE ISTEST_MODE = config_loader.ISTEST_MODE
EMAIL_APP_NAME = config_loader.EMAIL_APP_NAME
EMAIL_OFFICIAL_WEBSITE = config_loader.EMAIL_OFFICIAL_WEBSITE
EMAIL_SUBJECT = config_loader.EMAIL_SUBJECT
VERIFICATION_CODE_EXPIRE_MINUTES = config_loader.VERIFICATION_CODE_EXPIRE_MINUTES

View File

@@ -22,7 +22,7 @@ class ConfigLoader:
return self._config return self._config
def get(self, key: str, default=None): def get(self, key: str, default=None) -> Any:
"""获取配置值,支持点号分隔的嵌套键""" """获取配置值,支持点号分隔的嵌套键"""
config = self.load_config() config = self.load_config()
keys = key.split('.') keys = key.split('.')
@@ -96,5 +96,21 @@ class ConfigLoader:
def LOGGING_FORMAT(self) -> str: def LOGGING_FORMAT(self) -> str:
return self.get('LOGGING.FORMAT', '%(asctime)s %(name)s %(levelname)s %(message)s') return self.get('LOGGING.FORMAT', '%(asctime)s %(name)s %(levelname)s %(message)s')
@property
def EMAIL_APP_NAME(self) -> str:
return self.get('EMAIL.APP_NAME', 'WDG Snap Hutao')
@property
def EMAIL_OFFICIAL_WEBSITE(self) -> str:
return self.get('EMAIL.OFFICIAL_WEBSITE', 'https://htserver.wdg.cloudns.ch/')
@property
def EMAIL_SUBJECT(self) -> str:
return self.get('EMAIL.SUBJECT', 'WDG Snap Hutao 验证码')
@property
def VERIFICATION_CODE_EXPIRE_MINUTES(self) -> int:
return self.get('VERIFICATION_CODE.EXPIRE_MINUTES', 10)
# 创建全局配置实例 # 创建全局配置实例
config_loader = ConfigLoader() config_loader = ConfigLoader()

View File

@@ -26,10 +26,10 @@ def init_mongo(uri: str, test_mode=False):
logger.error(f"MongoDB connection failed: {e}") logger.error(f"MongoDB connection failed: {e}")
raise raise
def generate_code(length=6): def generate_code(length=6) -> str:
"""生成数字验证码""" """生成数字验证码"""
return ''.join(secrets.choice('0123456789') for _ in range(length)) return ''.join(secrets.choice('0123456789') for _ in range(length))
def generate_numeric_id(length=8): def generate_numeric_id(length=8) -> str:
"""生成数字ID""" """生成数字ID"""
return ''.join(secrets.choice(string.digits) for _ in range(length)) return ''.join(secrets.choice(string.digits) for _ in range(length))

View File

@@ -15,12 +15,14 @@ def create_app():
from routes.gacha_log import gacha_log_bp from routes.gacha_log import gacha_log_bp
from routes.web_api import web_api_bp from routes.web_api import web_api_bp
from routes.misc import misc_bp from routes.misc import misc_bp
from routes.download_resource import download_resource_bp
app.register_blueprint(announcement_bp, url_prefix="/Announcement") app.register_blueprint(announcement_bp, url_prefix="/Announcement")
app.register_blueprint(auth_bp) app.register_blueprint(auth_bp)
app.register_blueprint(gacha_log_bp) app.register_blueprint(gacha_log_bp)
app.register_blueprint(web_api_bp) app.register_blueprint(web_api_bp)
app.register_blueprint(misc_bp) app.register_blueprint(misc_bp)
app.register_blueprint(download_resource_bp)
# CORS # CORS
@app.after_request @app.after_request

View File

@@ -3,14 +3,42 @@ import datetime
from flask import current_app from flask import current_app
from app.config_loader import config_loader from app.config_loader import config_loader
def create_token(user_id): def create_token(user_id: str) -> str:
"""
创建JWT访问令牌有效期由配置文件中的JWT_EXPIRATION_HOURS决定。
:param user_id: 用户ID
:return: JWT 访问令牌
"""
payload = { payload = {
"user_id": user_id, "user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS) "exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS)
} }
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM) return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM)
def verify_token(token): # 创建刷新token有效期是访问token的两倍
def create_refresh_token(user_id: str) -> str:
"""
创建JWT刷新令牌有效期为访问令牌的两倍。
:param user_id: 用户ID
:return: JWT 刷新令牌
"""
payload = {
"user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS * 2)
}
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM)
def verify_token(token: str)-> str | None:
"""
验证JWT令牌并返回用户ID如果无效则返回None。
:param token: JWT令牌字符串
:type token: str
:return: 用户ID或None
:rtype: str | None
"""
try: try:
data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=[config_loader.JWT_ALGORITHM]) data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=[config_loader.JWT_ALGORITHM])
return data["user_id"] return data["user_id"]

View File

@@ -5,3 +5,4 @@ PyJWT==2.10.1
pymongo==4.15.5 pymongo==4.15.5
Werkzeug==3.1.4 Werkzeug==3.1.4
sentry-sdk[flask] sentry-sdk[flask]
gunicorn

View File

@@ -1,10 +1,12 @@
from flask import Blueprint, request, jsonify, session from flask import Blueprint, request, jsonify
from app.utils.jwt_utils import create_token, verify_token from app.utils.jwt_utils import create_token, verify_token, create_refresh_token
from services.auth_service import ( from services.auth_service import (
decrypt_data, send_verification_email, verify_user_credentials, decrypt_data, send_verification_email, verify_user_credentials,
create_user_account, get_user_by_id create_user_account, get_user_by_id
) )
from app.extensions import generate_code, logger from services.verification_code_service import save_verification_code, verify_code
from app.extensions import generate_code, logger , config_loader
from app.config import Config
auth_bp = Blueprint("auth", __name__) auth_bp = Blueprint("auth", __name__)
@@ -28,8 +30,8 @@ def passport_verify():
# 生成验证码 # 生成验证码
code = generate_code(6) code = generate_code(6)
session['verification_code'] = code # 使用 MongoDB TTL 存储验证码
session['email'] = decrypted_email save_verification_code(decrypted_email, code, expire_minutes=Config.VERIFICATION_CODE_EXPIRE_MINUTES)
# 发送邮件 # 发送邮件
if send_verification_email(decrypted_email, code): if send_verification_email(decrypted_email, code):
@@ -68,9 +70,8 @@ def passport_register():
"data": None "data": None
}), 400 }), 400
# 验证验证码 # 使用 MongoDB 验证验证码
if (session.get('verification_code') != decrypted_code or if not verify_code(decrypted_email, decrypted_code):
session.get('email') != decrypted_email):
logger.warning("Invalid verification code") logger.warning("Invalid verification code")
return jsonify({ return jsonify({
"retcode": 2, "retcode": 2,
@@ -88,12 +89,10 @@ def passport_register():
"data": None "data": None
}) })
# 删除session中的验证码和邮箱
session.pop('verification_code', None)
session.pop('email', None)
# 创建token # 创建token
access_token = create_token(str(new_user['_id'])) access_token = create_token(str(new_user['_id']))
# 刷新token
refresh_token = create_refresh_token(str(new_user['_id']))
logger.info(f"User registered: {decrypted_email}") logger.info(f"User registered: {decrypted_email}")
return jsonify({ return jsonify({
@@ -101,8 +100,8 @@ def passport_register():
"message": "success", "message": "success",
"data": { "data": {
"AccessToken": access_token, "AccessToken": access_token,
"RefreshToken": access_token, "RefreshToken": refresh_token,
"ExpiresIn": 3600 "ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
} }
}) })
@@ -139,6 +138,7 @@ def passport_login():
# 创建token # 创建token
access_token = create_token(str(user['_id'])) access_token = create_token(str(user['_id']))
refresh_token = create_refresh_token(str(user['_id']))
logger.info(f"User logged in: {decrypted_email}") logger.info(f"User logged in: {decrypted_email}")
return jsonify({ return jsonify({
@@ -147,8 +147,8 @@ def passport_login():
"l10nKey": "ServerPassportLoginSucceed", "l10nKey": "ServerPassportLoginSucceed",
"data": { "data": {
"AccessToken": access_token, "AccessToken": access_token,
"RefreshToken": access_token, "RefreshToken": refresh_token,
"ExpiresIn": 3600 "ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
} }
}) })
@@ -217,6 +217,7 @@ def passport_refresh_token():
}) })
access_token = create_token(user_id) access_token = create_token(user_id)
refresh_token = create_refresh_token(user_id)
logger.info(f"Token refreshed for user_id: {user_id}") logger.info(f"Token refreshed for user_id: {user_id}")
return jsonify({ return jsonify({
@@ -224,8 +225,8 @@ def passport_refresh_token():
"message": "success", "message": "success",
"data": { "data": {
"AccessToken": access_token, "AccessToken": access_token,
"RefreshToken": access_token, "RefreshToken": refresh_token,
"ExpiresIn": 3600 "ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
} }
}) })

270
routes/download_resource.py Normal file
View File

@@ -0,0 +1,270 @@
from flask import Blueprint, request, jsonify
from app.decorators import require_maintainer_permission
from app.extensions import logger
from services.download_resource_service import (
create_download_resource,
get_download_resources,
get_download_resource_by_id,
update_download_resource,
delete_download_resource,
get_latest_version
)
download_resource_bp = Blueprint("download_resource", __name__)
# 公开API - 获取下载资源列表
@download_resource_bp.route('/download-resources', methods=['GET'])
def get_public_download_resources():
"""
获取下载资源列表公开API
可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回所有
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
"""
package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_test参数默认只返回正式版本
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
# 只返回激活的资源
resources = get_download_resources(package_type=package_type, is_active=True, is_test=is_test)
return jsonify({
"code": 0,
"message": "success",
"data": resources
})
@download_resource_bp.route('/download-resources/latest', methods=['GET'])
def get_latest_download_resource():
"""
获取最新版本公开API
可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回最新的任意类型
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
"""
package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_test参数
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resource = get_latest_version(package_type=package_type, is_test=is_test)
if resource:
return jsonify({
"code": 0,
"message": "success",
"data": resource
})
else:
return jsonify({
"code": 1,
"message": "No resource found",
"data": None
}), 404
# Web管理端API - 增删改查
@download_resource_bp.route('/web-api/download-resources', methods=['POST'])
@require_maintainer_permission
def web_api_create_download_resource():
"""创建下载资源"""
data = request.get_json()
# 验证必需字段
required_fields = ['version', 'package_type', 'download_url']
if not all(k in data for k in required_fields):
return jsonify({
"code": 1,
"message": f"Missing required fields: {', '.join(required_fields)}",
"data": None
}), 400
# 验证package_type
if data['package_type'] not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 添加创建者信息
data['created_by'] = str(request.current_user['_id'])
# 创建资源
resource_id = create_download_resource(data)
if resource_id:
logger.info(f"Download resource created with ID: {resource_id} by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource created successfully",
"data": {
"id": str(resource_id)
}
})
else:
logger.error("Failed to create download resource")
return jsonify({
"code": 2,
"message": "Failed to create download resource",
"data": None
}), 500
@download_resource_bp.route('/web-api/download-resources', methods=['GET'])
@require_maintainer_permission
def web_api_get_download_resources():
"""获取下载资源列表(管理端,包含所有资源,包括未激活的)"""
package_type = request.args.get('package_type')
is_active_str = request.args.get('is_active')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_active参数
is_active = None
if is_active_str is not None:
is_active = is_active_str.lower() == 'true'
# 处理is_test参数
is_test = None
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resources = get_download_resources(package_type=package_type, is_active=is_active, is_test=is_test)
return jsonify({
"code": 0,
"message": "success",
"data": resources
})
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['GET'])
@require_maintainer_permission
def web_api_get_download_resource(resource_id):
"""获取单个下载资源详情"""
resource = get_download_resource_by_id(resource_id)
if resource:
return jsonify({
"code": 0,
"message": "success",
"data": resource
})
else:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['PUT'])
@require_maintainer_permission
def web_api_update_download_resource(resource_id):
"""更新下载资源"""
data = request.get_json()
# 检查资源是否存在
existing_resource = get_download_resource_by_id(resource_id)
if not existing_resource:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
# 验证package_type如果提供
if 'package_type' in data and data['package_type'] not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 添加更新者信息
data['updated_by'] = str(request.current_user['_id'])
# 更新资源
success = update_download_resource(resource_id, data)
if success:
logger.info(f"Download resource {resource_id} updated by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource updated successfully",
"data": None
})
else:
logger.error(f"Failed to update download resource {resource_id}")
return jsonify({
"code": 2,
"message": "Failed to update download resource",
"data": None
}), 500
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['DELETE'])
@require_maintainer_permission
def web_api_delete_download_resource(resource_id):
"""删除下载资源"""
# 检查资源是否存在
existing_resource = get_download_resource_by_id(resource_id)
if not existing_resource:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
# 删除资源
success = delete_download_resource(resource_id)
if success:
logger.info(f"Download resource {resource_id} deleted by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource deleted successfully",
"data": None
})
else:
logger.error(f"Failed to delete download resource {resource_id}")
return jsonify({
"code": 2,
"message": "Failed to delete download resource",
"data": None
}), 500

View File

@@ -119,6 +119,7 @@ def gacha_log_retrieve():
filtered_items = retrieve_gacha_log(user_id, uid, end_ids) filtered_items = retrieve_gacha_log(user_id, uid, end_ids)
logger.info(f"Gacha log retrieved for user_id: {user_id}, uid: {uid}, items count: {len(filtered_items)}") logger.info(f"Gacha log retrieved for user_id: {user_id}, uid: {uid}, items count: {len(filtered_items)}")
logger.debug(f"end_ids: {end_ids}")
return jsonify({ return jsonify({
"retcode": 0, "retcode": 0,

View File

@@ -4,7 +4,7 @@ from flask import Blueprint, request, jsonify
from app.utils.jwt_utils import verify_token, create_token from app.utils.jwt_utils import verify_token, create_token
from services.auth_service import verify_user_credentials, get_users_with_search from services.auth_service import verify_user_credentials, get_users_with_search
from app.decorators import require_maintainer_permission from app.decorators import require_maintainer_permission
from app.extensions import generate_numeric_id, client, logger from app.extensions import generate_numeric_id, client, logger, config_loader
web_api_bp = Blueprint("web_api", __name__) web_api_bp = Blueprint("web_api", __name__)
@@ -36,7 +36,7 @@ def web_api_login():
"message": "success", "message": "success",
"data": { "data": {
"access_token": access_token, "access_token": access_token,
"expires_in": 3600 "expires_in": config_loader.JWT_EXPIRATION_HOURS * 3600
} }
}) })
@@ -239,9 +239,15 @@ def web_api_get_users():
"data": None "data": None
}), 403 }), 403
# 获取搜索参数 # 获取查询参数
q = request.args.get("q", "").strip() q = request.args.get("q", "").strip()
users = get_users_with_search(q) role = request.args.get("role", "").strip() if request.args.get("role") else None
email = request.args.get("email", "").strip() if request.args.get("email") else None
username = request.args.get("username", "").strip() if request.args.get("username") else None
id_param = request.args.get("id", "").strip() if request.args.get("id") else None
is_licensed = request.args.get("is", "").strip() if request.args.get("is") else None
users = get_users_with_search(q, role, email, username, id_param, is_licensed)
return jsonify({ return jsonify({
"code": 0, "code": 0,

21
run.py
View File

@@ -1,6 +1,21 @@
from app import create_app from app.init import create_app
from app.config_loader import config_loader
import sentry_sdk
sentry_sdk.init(
dsn="https://d1cad1d2b442cf8431df3ee4bab925e0@o4507525750521856.ingest.us.sentry.io/4510623668830208",
# Add data like request headers and IP for users,
# see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
send_default_pii=True,
traces_sample_rate=1.0,
)
# 创建应用实例
app = create_app() app = create_app()
if __name__ == "__main__": if __name__ == '__main__':
app.run(debug=True) app.run(
host=config_loader.SERVER_HOST,
port=config_loader.SERVER_PORT,
debug=config_loader.SERVER_DEBUG
)

View File

@@ -2,6 +2,12 @@ from app.extensions import client, logger
from app.config import Config from app.config import Config
def get_announcements(request_data: list): def get_announcements(request_data: list):
"""
获取公告列表,过滤掉用户已关闭的公告
:param request_data: 用户已关闭的公告ID列表
:type request_data: list
"""
if Config.ISTEST_MODE: if Config.ISTEST_MODE:
return [] return []
# 记录请求体到日志请求体中是用户已关闭的公告ID列表 # 记录请求体到日志请求体中是用户已关闭的公告ID列表

View File

@@ -1,20 +1,23 @@
import datetime
from bson import ObjectId from bson import ObjectId
from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import client, logger from app.extensions import client, logger
from app.config import Config from app.config import Config
from app.config_loader import config_loader
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from datetime import timezone
from zoneinfo import ZoneInfo
import datetime
import SendEmailTool
import re
import base64
def decrypt_data(encrypted_data: str) -> str:
def decrypt_data(encrypted_data):
"""使用RSA私钥解密数据""" """使用RSA私钥解密数据"""
try: try:
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
import base64
from app.config_loader import config_loader
private_key_file = config_loader.RSA_PRIVATE_KEY_FILE private_key_file = config_loader.RSA_PRIVATE_KEY_FILE
private_key = RSA.import_key(open(private_key_file).read()) with open(private_key_file, 'r') as f:
private_key = RSA.import_key(f.read())
cipher = PKCS1_OAEP.new(private_key) cipher = PKCS1_OAEP.new(private_key)
decrypted_data = cipher.decrypt(base64.b64decode(encrypted_data)) decrypted_data = cipher.decrypt(base64.b64decode(encrypted_data))
return decrypted_data.decode() return decrypted_data.decode()
@@ -23,20 +26,123 @@ def decrypt_data(encrypted_data):
raise raise
def send_verification_email(email, code): def send_verification_email(email: str, code: str, ACTION_NAME="注册", EXPIRE_MINUTES=None) -> bool:
"""发送验证码邮件""" """发送验证码邮件,目前只有注册场景,后续再扩展其他场景"""
try: try:
import SendEmailTool subject = Config.EMAIL_SUBJECT
textbody = f"您的验证码是: {code}"
APP_NAME = Config.EMAIL_APP_NAME
OFFICIAL_WEBSITE = Config.EMAIL_OFFICIAL_WEBSITE
if EXPIRE_MINUTES is None:
EXPIRE_MINUTES = Config.VERIFICATION_CODE_EXPIRE_MINUTES
htmlbody = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>验证码邮件</title>
</head>
<body style="margin:0;padding:0;background-color:#f5f6f7;font-family:Arial,Helvetica,sans-serif;">
<table width="100%" cellpadding="0" cellspacing="0">
<tr>
<td align="center" style="padding:40px 0;">
<!-- 主体卡片 -->
<table width="480" cellpadding="0" cellspacing="0" style="background:#ffffff;border-radius:8px;padding:32px;">
subject = "Snap Hutao 验证码" <!-- 应用名 -->
body = f"您的验证码是: {code}" <tr>
<td align="center" style="font-size:22px;font-weight:bold;color:#333333;padding-bottom:16px;">
{APP_NAME}
</td>
</tr>
<!-- 操作提示 -->
<tr>
<td style="font-size:14px;color:#555555;padding-bottom:24px;text-align:center;">
你正在进行 <strong>{ACTION_NAME}</strong> 操作,请使用以下验证码完成验证:
</td>
</tr>
<!-- 验证码 -->
<tr>
<td align="center" style="padding:20px 0;">
<div style="
display:inline-block;
font-size:32px;
font-weight:bold;
letter-spacing:6px;
color:#2e86de;
padding:12px 24px;
border:1px dashed #2e86de;
border-radius:6px;
">
{code}
</div>
</td>
</tr>
<!-- 有效期说明 -->
<tr>
<td style="font-size:13px;color:#888888;text-align:center;padding-top:16px;">
验证码有效期 {EXPIRE_MINUTES} 分钟,请勿泄露给他人。
</td>
</tr>
<!-- 分割线 -->
<tr>
<td style="padding:24px 0;">
<hr style="border:none;border-top:1px solid #eeeeee;">
</td>
</tr>
<!-- 底部信息 -->
<tr>
<td style="font-size:12px;color:#999999;text-align:center;line-height:1.6;">
本邮件由 {APP_NAME} 系统自动发送,请勿回复<br>
如非本人操作,请忽略本邮件
</td>
</tr>
<!-- 官网链接 -->
<tr>
<td align="center" style="padding-top:16px;">
<a href="{OFFICIAL_WEBSITE}"
style="font-size:12px;color:#2e86de;text-decoration:none;">
访问官网
</a>
</td>
</tr>
</table>
</td>
</tr>
</table>
</body>
</html>
"""
try:
SendEmailTool.send_email( SendEmailTool.send_email(
SendEmailTool.gmail_user, config_loader.EMAIL_GMAIL_USER,
SendEmailTool.app_password, config_loader.EMAIL_APP_PASSWORD,
email, email,
subject, subject,
body htmlbody,
app_name=APP_NAME,
body_type="html"
)
logger.info(f"HTML Verification email sent to {email}")
except Exception as e:
logger.error(f"Failed to send HTML verification email to {email}: {e}")
# 如果 HTML 邮件发送失败,尝试发送纯文本邮件
SendEmailTool.send_email(
config_loader.EMAIL_GMAIL_USER,
config_loader.EMAIL_APP_PASSWORD,
email,
subject,
textbody,
app_name=APP_NAME,
body_type="plain"
) )
logger.info(f"Verification email sent to {email}") logger.info(f"Verification email sent to {email}")
return True return True
@@ -45,7 +151,7 @@ def send_verification_email(email, code):
return False return False
def verify_user_credentials(email, password): def verify_user_credentials(email: str, password: str) -> dict | None:
"""验证用户凭据""" """验证用户凭据"""
user = client.ht_server.users.find_one({"email": email}) user = client.ht_server.users.find_one({"email": email})
@@ -55,7 +161,7 @@ def verify_user_credentials(email, password):
return user return user
def create_user_account(email, password): def create_user_account(email: str, password: str) -> dict | None:
"""创建新用户账户""" """创建新用户账户"""
# 检查用户是否已存在 # 检查用户是否已存在
existing_user = client.ht_server.users.find_one({"email": email}) existing_user = client.ht_server.users.find_one({"email": email})
@@ -74,8 +180,9 @@ def create_user_account(email, password):
"CreatedAt": datetime.datetime.utcnow(), "CreatedAt": datetime.datetime.utcnow(),
"IsLicensedDeveloper": False, "IsLicensedDeveloper": False,
"IsMaintainer": False, "IsMaintainer": False,
"GachaLogExpireAt": "2026-01-01T00:00:00Z", # 现在默认用户的上传权限不过期
"CdnExpireAt": "2026-01-01T00:00:00Z" "GachaLogExpireAt": "2099-01-01T00:00:00Z",
"CdnExpireAt": "2099-01-01T00:00:00Z"
} }
result = client.ht_server.users.insert_one(new_user) result = client.ht_server.users.insert_one(new_user)
@@ -84,55 +191,94 @@ def create_user_account(email, password):
return new_user return new_user
def get_user_by_id(user_id): def get_user_by_id(user_id: str) -> dict | None:
"""根据ID获取用户信息""" """根据ID获取用户信息"""
try: try:
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)}) user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
if user: if user:
user['_id'] = str(user['_id']) user['_id'] = str(user['_id'])
return user return user
except: except Exception as e:
logger.error(f"Error retrieving user by ID: {e}")
return None return None
def get_users_with_search(query_text=""): def get_users_with_search(query_text="", role=None, email=None, username=None, id=None, is_licensed=None) -> list:
"""获取用户列表,支持搜索""" """获取用户列表,支持多种筛选条件"""
import re import re
# 构建查询条件 # 构建查询条件
query = {} query = {}
or_conditions = [] and_conditions = []
# 通用搜索q 参数)- 匹配用户名、邮箱、ID
if query_text: if query_text:
or_conditions = []
# 用户名模糊搜索 # 用户名模糊搜索
or_conditions.append({ or_conditions.append({
"UserName": {"$regex": re.escape(query_text), "$options": "i"} "UserName": {"$regex": re.escape(query_text), "$options": "i"}
}) })
# 邮箱模糊搜索 # 邮箱模糊搜索
or_conditions.append({ or_conditions.append({
"email": {"$regex": re.escape(query_text), "$options": "i"} "email": {"$regex": re.escape(query_text), "$options": "i"}
}) })
# _id 搜索(支持完整或前缀) # _id 搜索(支持完整或前缀)
if ObjectId.is_valid(query_text): if ObjectId.is_valid(query_text):
or_conditions.append({ or_conditions.append({
"_id": ObjectId(query_text) "_id": ObjectId(query_text)
}) })
else: and_conditions.append({"$or": or_conditions})
# 允许部分 ObjectId 搜索(转字符串后匹配)
or_conditions.append({ # 按角色筛选
"_id": { if role:
"$in": [ if role == "maintainer":
u["_id"] for u in client.ht_server.users.find( and_conditions.append({"IsMaintainer": True})
{}, elif role == "developer":
{"_id": 1} and_conditions.append({"IsLicensedDeveloper": True})
) if query_text.lower() in str(u["_id"]).lower() elif role == "user":
# user 表示既不是 maintainer 也不是 developer
and_conditions.append({
"$and": [
{"IsMaintainer": {"$ne": True}},
{"IsLicensedDeveloper": {"$ne": True}}
] ]
}
}) })
query = {"$or": or_conditions} # 按邮箱筛选(支持模糊匹配)
if email:
and_conditions.append({
"email": {"$regex": re.escape(email), "$options": "i"}
})
# 按用户名筛选(支持模糊匹配)
if username:
and_conditions.append({
"UserName": {"$regex": re.escape(username), "$options": "i"}
})
# 按用户ID筛选支持模糊匹配
if id:
if ObjectId.is_valid(id):
and_conditions.append({"_id": ObjectId(id)})
else:
# 如果不是有效的 ObjectId尝试匹配字符串形式的 _id
and_conditions.append({
"_id": {"$regex": re.escape(id), "$options": "i"}
})
# 按状态筛选
if is_licensed:
if is_licensed == "licensed":
and_conditions.append({"IsLicensedDeveloper": True})
elif is_licensed == "not-licensed":
and_conditions.append({"IsLicensedDeveloper": False})
# 构建 AND 查询
if and_conditions:
if len(and_conditions) == 1:
query = and_conditions[0]
else:
query = {"$and": and_conditions}
# 查询数据库(排除密码) # 查询数据库(排除密码)
cursor = client.ht_server.users.find(query, {"password": 0}) cursor = client.ht_server.users.find(query, {"password": 0})
@@ -145,9 +291,6 @@ def get_users_with_search(query_text=""):
users = list(users_map.values()) users = list(users_map.values())
# 数据格式化 # 数据格式化
from datetime import timezone
from zoneinfo import ZoneInfo
CST = ZoneInfo("Asia/Shanghai") CST = ZoneInfo("Asia/Shanghai")
for u in users: for u in users:

View File

@@ -0,0 +1,241 @@
import datetime
from app.extensions import client, logger
from app.config import Config
def create_download_resource(data):
"""
创建下载资源
:param data: 包含以下字段的数据
- version: 版本号
- package_type: 包类型 (msi/msix)
- download_url: 下载链接
- features: 新功能描述
- file_size: 文件大小 (可选)
- file_hash: 文件哈希 (可选)
- is_active: 是否激活 (可选默认为True)
- is_test: 是否为测试版本 (可选默认为False)
:return: 创建的资源ID或None
"""
try:
resource_doc = {
"version": data['version'],
"package_type": data['package_type'],
"download_url": data['download_url'],
"features": data.get('features', ''),
"file_size": data.get('file_size'),
"file_hash": data.get('file_hash'),
"is_active": data.get('is_active', True),
"is_test": data.get('is_test', False),
"created_at": datetime.datetime.utcnow(),
"created_by": data.get('created_by')
}
result = client.ht_server.download_resources.insert_one(resource_doc)
logger.info(f"Download resource created with ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Failed to create download resource: {e}")
return None
def get_download_resources(package_type=None, is_active=None, is_test=None):
"""
获取下载资源列表
:param package_type: 包类型过滤 (msi/msix)None表示获取所有
:param is_active: 是否激活过滤None表示获取所有
:param is_test: 是否为测试版本过滤None表示获取所有
:return: 资源列表
"""
try:
query = {}
if package_type:
query['package_type'] = package_type
if is_active is not None:
query['is_active'] = is_active
if is_test is not None:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
if is_test:
query['is_test'] = True
else:
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
resources = list(client.ht_server.download_resources.find(query, sort=[("created_at", -1)]))
# 移除 _id 字段并转换日期
result = []
for r in resources:
r = dict(r)
# 转换_id为字符串并存为id字段
r['id'] = str(r.pop('_id'))
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in r:
r['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in r and isinstance(r['created_at'], datetime.datetime):
dt = r['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
r['created_at'] = dt.isoformat()
if 'updated_at' in r and isinstance(r['updated_at'], datetime.datetime):
dt = r['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
r['updated_at'] = dt.isoformat()
result.append(r)
return result
except Exception as e:
logger.error(f"Failed to get download resources: {e}")
return []
def get_download_resource_by_id(resource_id):
"""
根据ID获取下载资源
:param resource_id: 资源ID
:return: 资源对象或None
"""
try:
from bson import ObjectId
resource = client.ht_server.download_resources.find_one({"_id": ObjectId(resource_id)})
if resource:
resource = dict(resource)
resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['created_at'] = dt.isoformat()
if 'updated_at' in resource and isinstance(resource['updated_at'], datetime.datetime):
dt = resource['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['updated_at'] = dt.isoformat()
return resource
return None
except Exception as e:
logger.error(f"Failed to get download resource by ID: {e}")
return None
def update_download_resource(resource_id, data):
"""
更新下载资源
:param resource_id: 资源ID
:param data: 要更新的字段
:return: 是否成功
"""
try:
from bson import ObjectId
# 构建更新数据
update_data = {"updated_at": datetime.datetime.utcnow()}
if 'version' in data:
update_data['version'] = data['version']
if 'package_type' in data:
update_data['package_type'] = data['package_type']
if 'download_url' in data:
update_data['download_url'] = data['download_url']
if 'features' in data:
update_data['features'] = data['features']
if 'file_size' in data:
update_data['file_size'] = data['file_size']
if 'file_hash' in data:
update_data['file_hash'] = data['file_hash']
if 'is_active' in data:
update_data['is_active'] = data['is_active']
if 'is_test' in data:
update_data['is_test'] = data['is_test']
if 'updated_by' in data:
update_data['updated_by'] = data['updated_by']
result = client.ht_server.download_resources.update_one(
{"_id": ObjectId(resource_id)},
{"$set": update_data}
)
if result.modified_count > 0:
logger.info(f"Download resource {resource_id} updated successfully")
return True
return False
except Exception as e:
logger.error(f"Failed to update download resource: {e}")
return False
def delete_download_resource(resource_id):
"""
删除下载资源
:param resource_id: 资源ID
:return: 是否成功
"""
try:
from bson import ObjectId
result = client.ht_server.download_resources.delete_one({"_id": ObjectId(resource_id)})
if result.deleted_count > 0:
logger.info(f"Download resource {resource_id} deleted successfully")
return True
return False
except Exception as e:
logger.error(f"Failed to delete download resource: {e}")
return False
def get_latest_version(package_type=None, is_test=False):
"""
获取最新版本
:param package_type: 包类型 (msi/msix)None表示获取所有类型的最新版本
:param is_test: 是否包含测试版本默认为False只返回正式版本
:return: 资源对象或None
"""
try:
query = {"is_active": True}
if not is_test:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
else:
query['is_test'] = True
if package_type:
query['package_type'] = package_type
resource = client.ht_server.download_resources.find_one(
query,
sort=[("created_at", -1)]
)
if resource:
resource = dict(resource)
resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['created_at'] = dt.isoformat()
if 'updated_at' in resource and isinstance(resource['updated_at'], datetime.datetime):
dt = resource['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['updated_at'] = dt.isoformat()
return resource
return None
except Exception as e:
logger.error(f"Failed to get latest version: {e}")
return None

View File

@@ -1,5 +1,17 @@
from app.extensions import client, logger from app.extensions import client, logger
"""
注意记录中有两种类型GachaType和QueryType(uigf_gacha_type)GachaType多了一个400类型其实就是QueryType的301类型客户端传的end_ids是按QueryType来的如果按照GachaType来筛选会多出400类型的记录
映射关系
| `uigf_gacha_type` | `gacha_type` |
|-------------------|----------------|
| `100` | `100` |
| `200` | `200` |
| `301` | `301` or `400` |
| `302` | `302` |
| `500` | `500` |
"""
def get_gacha_log_entries(user_id): def get_gacha_log_entries(user_id):
"""获取用户的祈愿记录条目列表""" """获取用户的祈愿记录条目列表"""
@@ -41,6 +53,9 @@ def get_gacha_log_end_ids(user_id, uid):
if gacha_type in end_ids: if gacha_type in end_ids:
end_ids[gacha_type] = max(end_ids[gacha_type], item_id) end_ids[gacha_type] = max(end_ids[gacha_type], item_id)
# 400类型对应301类型
end_ids["400"] = end_ids["301"]
return end_ids return end_ids
@@ -82,6 +97,11 @@ def retrieve_gacha_log(user_id, uid, end_ids):
# 筛选出比end_ids更旧的记录 # 筛选出比end_ids更旧的记录
filtered_items = [] filtered_items = []
# 需要将end_ids的key从QueryType转换为GachaType给400赋值为301的值即可
if "301" in end_ids:
end_ids["400"] = end_ids["301"]
for item in gacha_log['data']: for item in gacha_log['data']:
gacha_type = str(item.get('GachaType', '')) gacha_type = str(item.get('GachaType', ''))
item_id = item.get('Id', 0) item_id = item.get('Id', 0)

View File

@@ -0,0 +1,70 @@
import datetime
from app.extensions import client, logger
def init_verification_code_collection():
"""初始化验证码集合并创建 TTL 索引"""
db = client.ht_server
collection = db.verification_codes
indexes = collection.list_indexes()
ttl_index_exists = False
for index in indexes:
if index.get('expireAfterSeconds') is not None:
ttl_index_exists = True
break
# 如果不存在 TTL 索引,则创建
if not ttl_index_exists:
collection.create_index(
[("expire_at", 1)],
expireAfterSeconds=0,
name="expire_at_ttl"
)
logger.info("Created TTL index on verification_codes collection")
def save_verification_code(email: str, code: str, expire_minutes: int = 10):
"""保存验证码到 MongoDB自动过期时间由 TTL 索引控制"""
db = client.ht_server
collection = db.verification_codes
# 确保集合已初始化
init_verification_code_collection()
# 计算过期时间
expire_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=expire_minutes)
# 插入验证码记录
result = collection.insert_one({
"email": email,
"code": code,
"created_at": datetime.datetime.utcnow(),
"expire_at": expire_at,
"used": False
})
logger.debug(f"Saved verification code for email: {email}")
return result.inserted_id
def verify_code(email: str, code: str) -> bool:
"""验证验证码是否正确,验证成功后删除验证码记录"""
db = client.ht_server
collection = db.verification_codes
# 查找未使用的验证码
verification_record = collection.find_one({
"email": email,
"code": code,
"used": False
})
if verification_record:
# 验证成功,删除该验证码记录
collection.delete_one({"_id": verification_record["_id"]})
logger.info(f"Verification code validated and deleted for email: {email}")
return True
logger.warning(f"Invalid or expired verification code for email: {email}")
return False