Compare commits

...

17 Commits

Author SHA1 Message Date
wangdage12
3d2a620820 Update README with API and server details
Added information about API services, server infrastructure, and usage recommendations for metadata repositories.
2026-02-12 23:26:24 +08:00
wangdage12
fb977fdeb5 Merge pull request #9 from wangdage12/dev
支持测试版本的资源筛选,增强用户列表的查询功能
2026-02-10 20:26:23 +08:00
fanbook-wangdage
036b64c845 支持测试版本的资源筛选,增强用户列表的查询功能 2026-02-10 20:14:37 +08:00
wangdage12
01f51d82cd Update README with deployment requirements
Added resource and environment requirements for deployment.
2026-02-07 21:56:24 +08:00
wangdage12
c6004bec96 Merge pull request #8 from wangdage12/dev
添加下载资源和下载资源管理功能
2026-02-05 22:06:13 +08:00
fanbook-wangdage
cb925f7200 添加下载资源和下载资源管理功能 2026-02-05 21:53:26 +08:00
wangdage12
6e09869df0 Merge pull request #7 from wangdage12/dev
添加类型注释、修复抽卡id问题导致的返回异常和刷新token的问题
2026-02-03 12:25:01 +08:00
fanbook-wangdage
6b82806931 添加类型注释、修复抽卡id问题 2026-02-03 12:12:45 +08:00
wangdage12
40bd74c101 Merge pull request #5 from wangdage12/dev
更新验证码的存储方式、支持html验证码邮件
2026-01-31 15:22:35 +08:00
fanbook-wangdage
aa82a19ac7 更新验证码的存储方式、支持html验证码邮件 2026-01-31 14:25:33 +08:00
wangdage12
f2ca58182e Merge pull request #1 from wangdage12/dev
初始版本
2026-01-25 19:58:39 +08:00
fanbook-wangdage
199308fab1 添加文档并修复问题 2026-01-25 19:22:49 +08:00
fanbook-wangdage
c1339a9858 修复问题 2026-01-25 18:49:34 +08:00
fanbook-wangdage
cd31c409d7 添加公告中的发行版字段 2026-01-16 11:38:31 +08:00
fanbook-wangdage
74eea3a4f1 添加获取第三方工具api 2026-01-13 15:47:40 +08:00
fanbook-wangdage
a4d95e4f90 添加sentry、新增公告筛选功能 2026-01-08 23:34:42 +08:00
fanbook-wangdage
d67e42b067 初始提交 2025-12-28 17:01:42 +08:00
24 changed files with 2299 additions and 0 deletions

11
.gitignore vendored Normal file
View File

@@ -0,0 +1,11 @@
.vscode/
# 公钥、私钥文件
*.pem
# python缓存文件
__pycache__/
*.pyc
# 配置json文件
config.json

146
README.md
View File

@@ -1,2 +1,148 @@
# Snap.Server # Snap.Server
Snap.Hutao新后端API Snap.Hutao新后端API
## 部署方法
> **资源和环境要求**
> 服务器硬件:
> 最低1核CPU1GB内存
>
> 运行环境:
> `Windows10`及以上、`Windows Server 2019`及以上、`Linux`
> `Python3.12`及以上
> `MongoDB`
### 在服务器生成RSA密钥
执行以下代码在根目录生成密钥:
```python
from Crypto.PublicKey import RSA
# 生成 2048 位 RSA 密钥对
key = RSA.generate(2048)
private_key = key.export_key()
public_key = key.publickey().export_key()
with open("private.pem", "wb") as f:
f.write(private_key)
with open("public.pem", "wb") as f:
f.write(public_key)
print("Keys generated.")
```
**确保客户端的公钥和生成的相同,否则将无法使用账户功能**
### 创建配置文件
创建`config.json`文件,示例内容如下:
```json
{
"SECRET_KEY": "jwt_secret_key",
"MONGO_URI": "mongodb+srv://wdgwdg889_db_user:xxxxxx@cluster0.eplrcvl.mongodb.net/?appName=Cluster0",
"TIMEZONE": "Asia/Shanghai",
"ISTEST_MODE": false,
"SERVER": {
"HOST": "0.0.0.0",
"PORT": 5222,
"DEBUG": false
},
"JWT": {
"ALGORITHM": "HS256",
"EXPIRATION_HOURS": 24
},
"EMAIL": {
"GMAIL_USER": "wdgwdg889@gmail.com",
"APP_PASSWORD": "",
"APP_NAME": "WDG Snap Hutao",
"OFFICIAL_WEBSITE": "https://htserver.wdg.cloudns.ch/",
"SUBJECT": "WDG Snap Hutao 验证码"
},
"RSA": {
"PRIVATE_KEY_FILE": "private.pem",
"PUBLIC_KEY_FILE": "public.pem"
},
"VERIFICATION_CODE": {
"EXPIRE_MINUTES": 10
},
"LOGGING": {
"LEVEL": "DEBUG",
"FORMAT": ""
}
}
```
参数说明:
| 参数 | 说明 |
|------|------|
| SECRET_KEY | 用于JWT签名的密钥请设置为复杂字符串 |
| MONGO_URI | MongoDB连接字符串 |
| TIMEZONE | 服务器时区 |
| ISTEST_MODE | 是否启用测试模式,测试模式下部分功能将返回默认值,不连接数据库 |
| SERVER.HOST | 服务器监听地址 |
| SERVER.PORT | 服务器监听端口 |
| SERVER.DEBUG | 是否启用Flask的调试模式 |
| JWT.ALGORITHM | JWT签名算法 |
| JWT.EXPIRATION_HOURS | JWT过期时间小时 |
| EMAIL.GMAIL_USER | 用于发送验证邮件的Gmail账号 |
| EMAIL.APP_PASSWORD | Gmail应用专用密码 |
| EMAIL.APP_NAME | 应用名称,用于邮件显示 |
| EMAIL.OFFICIAL_WEBSITE | 官方网站地址,用于邮件中的链接 |
| EMAIL.SUBJECT | 验证邮件的主题 |
| RSA.PRIVATE_KEY_FILE | RSA私钥文件路径 |
| RSA.PUBLIC_KEY_FILE | RSA公钥文件路径 |
| VERIFICATION_CODE.EXPIRE_MINUTES | 验证码过期时间(分钟) |
| LOGGING.LEVEL | 日志记录级别生产环境建议设置为INFO |
| LOGGING.FORMAT | 日志记录格式 |
### 开发环境启动方法
确保已安装依赖:
```
pip install -r requirements.txt
```
运行Flask应用
```
python app.py
```
### 生产环境启动方法
建议使用Gunicorn部署
```
pip install -r requirements.txt && python -m gunicorn run:app --bind 0.0.0.0:5222 --workers 4 --threads 2 --access-logfile - --error-logfile -
```
请根据服务器性能调整`--workers``--threads`参数。
### API文档和官方开放平台
**API文档可以在该地址访问**
https://rdgm3wrj7r.apifox.cn/
> 项目官方API和文件资源服务、仓库镜像为各开源项目免费提供
本项目官方API地址https://htserver.wdg.cloudns.ch/api/
<img width="901" height="522" alt="服务拓补结构" src="https://github.com/user-attachments/assets/9cd2f0d5-372e-46b6-b64b-643df661b445" />
我们的项目官方服务采用多台服务器,其中日本的主服务器用来提供所有基础服务,甘肃的自建服务器提供加速和负载均衡,以确保服务稳定
元数据仓库镜像可以随时调用获取元数据仓库API`/git-repository/all`得到
由于Git仓库几乎无法被CDN缓存频繁拉取镜像仓库会对服务器造成压力请各位在使用量较大的情况下自建仓库镜像
甘肃服务器只能使用ipv6访问同时比较不稳定只能用于加速不要将它直接作为主要服务使用
甘肃服务器2为备用服务器计划在用户量较大时作为API以负载均衡的方式提供不在极端情况下不采用和仓库镜像使用
### 注意事项
在轻量使用的场景下可以直接使用MongoDB Atlas的免费套餐但在高并发场景下建议使用自建MongoDB服务器以获得更好的性能和稳定性。
新MongoDB数据库会在写入数据时自动创建无需手动创建数据库和集合。

36
SendEmailTool.py Normal file
View File

@@ -0,0 +1,36 @@
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from app.config_loader import config_loader
# 从配置文件获取邮件设置
gmail_user = config_loader.EMAIL_GMAIL_USER
app_password = config_loader.EMAIL_APP_PASSWORD
to_email = ""
def send_email(gmail_user, app_password, to_email, subject, body="这是一封测试邮件。", app_name=None, body_type="plain"):
msg = MIMEMultipart()
# 如果提供了 app_name使用带显示名称的格式
if app_name:
msg["From"] = f"{app_name} <{gmail_user}>"
else:
msg["From"] = gmail_user
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, body_type))
# Gmail SMTP 服务器
server = smtplib.SMTP("smtp.gmail.com", 587)
server.starttls()
server.login(gmail_user, app_password)
server.sendmail(gmail_user, to_email, msg.as_string())
server.quit()
if __name__ == "__main__":
try:
send_email(gmail_user, app_password, to_email, "测试邮件主题", "这是一封测试邮件。")
print("邮件发送成功!")
except Exception as e:
print("发送失败:", e)

21
app.py Normal file
View File

@@ -0,0 +1,21 @@
from app.init import create_app
from app.config_loader import config_loader
import sentry_sdk
sentry_sdk.init(
dsn="https://d1cad1d2b442cf8431df3ee4bab925e0@o4507525750521856.ingest.us.sentry.io/4510623668830208",
# Add data like request headers and IP for users,
# see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
send_default_pii=True,
traces_sample_rate=1.0,
)
# 创建应用实例
app = create_app()
if __name__ == '__main__':
app.run(
host=config_loader.SERVER_HOST,
port=config_loader.SERVER_PORT,
debug=config_loader.SERVER_DEBUG
)

0
app/__init__.py Normal file
View File

12
app/config.py Normal file
View File

@@ -0,0 +1,12 @@
from app.config_loader import config_loader
# 使用配置加载器提供兼容的接口
class Config:
SECRET_KEY = config_loader.SECRET_KEY
MONGO_URI = config_loader.MONGO_URI
TIMEZONE = config_loader.TIMEZONE
ISTEST_MODE = config_loader.ISTEST_MODE
EMAIL_APP_NAME = config_loader.EMAIL_APP_NAME
EMAIL_OFFICIAL_WEBSITE = config_loader.EMAIL_OFFICIAL_WEBSITE
EMAIL_SUBJECT = config_loader.EMAIL_SUBJECT
VERIFICATION_CODE_EXPIRE_MINUTES = config_loader.VERIFICATION_CODE_EXPIRE_MINUTES

116
app/config_loader.py Normal file
View File

@@ -0,0 +1,116 @@
import json
import os
from zoneinfo import ZoneInfo
from typing import Dict, Any
class ConfigLoader:
def __init__(self, config_file: str = 'config.json'):
self.config_file = config_file
self._config = None
def load_config(self) -> Dict[str, Any]:
"""加载 JSON 配置文件"""
if self._config is None:
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), self.config_file)
try:
with open(config_path, 'r', encoding='utf-8') as f:
self._config = json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"配置文件 {config_path} 不存在")
except json.JSONDecodeError as e:
raise ValueError(f"配置文件格式错误: {e}")
return self._config
def get(self, key: str, default=None) -> Any:
"""获取配置值,支持点号分隔的嵌套键"""
config = self.load_config()
keys = key.split('.')
value = config
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
@property
def SECRET_KEY(self) -> str:
return self.get('SECRET_KEY')
@property
def MONGO_URI(self) -> str:
return self.get('MONGO_URI')
@property
def TIMEZONE(self) -> ZoneInfo:
timezone_str = self.get('TIMEZONE', 'Asia/Shanghai')
return ZoneInfo(timezone_str)
@property
def ISTEST_MODE(self) -> bool:
return self.get('ISTEST_MODE', False)
@property
def SERVER_HOST(self) -> str:
return self.get('SERVER.HOST', '0.0.0.0')
@property
def SERVER_PORT(self) -> int:
return self.get('SERVER.PORT', 5222)
@property
def SERVER_DEBUG(self) -> bool:
return self.get('SERVER.DEBUG', False)
@property
def JWT_ALGORITHM(self) -> str:
return self.get('JWT.ALGORITHM', 'HS256')
@property
def JWT_EXPIRATION_HOURS(self) -> int:
return self.get('JWT.EXPIRATION_HOURS', 24)
@property
def EMAIL_GMAIL_USER(self) -> str:
return self.get('EMAIL.GMAIL_USER')
@property
def EMAIL_APP_PASSWORD(self) -> str:
return self.get('EMAIL.APP_PASSWORD')
@property
def RSA_PRIVATE_KEY_FILE(self) -> str:
return self.get('RSA.PRIVATE_KEY_FILE', 'private.pem')
@property
def RSA_PUBLIC_KEY_FILE(self) -> str:
return self.get('RSA.PUBLIC_KEY_FILE', 'public.pem')
@property
def LOGGING_LEVEL(self) -> str:
return self.get('LOGGING.LEVEL', 'DEBUG')
@property
def LOGGING_FORMAT(self) -> str:
return self.get('LOGGING.FORMAT', '%(asctime)s %(name)s %(levelname)s %(message)s')
@property
def EMAIL_APP_NAME(self) -> str:
return self.get('EMAIL.APP_NAME', 'WDG Snap Hutao')
@property
def EMAIL_OFFICIAL_WEBSITE(self) -> str:
return self.get('EMAIL.OFFICIAL_WEBSITE', 'https://htserver.wdg.cloudns.ch/')
@property
def EMAIL_SUBJECT(self) -> str:
return self.get('EMAIL.SUBJECT', 'WDG Snap Hutao 验证码')
@property
def VERIFICATION_CODE_EXPIRE_MINUTES(self) -> int:
return self.get('VERIFICATION_CODE.EXPIRE_MINUTES', 10)
# 创建全局配置实例
config_loader = ConfigLoader()

22
app/decorators.py Normal file
View File

@@ -0,0 +1,22 @@
from flask import request, jsonify
from bson import ObjectId
from app.extensions import client, logger
from app.utils.jwt_utils import verify_token
def require_maintainer_permission(f):
def wrapper(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
return jsonify({"code": 1, "message": "Invalid token"}), 401
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
if not user or not user.get("IsMaintainer", False):
return jsonify({"code": 2, "message": "Permission denied"}), 403
request.current_user = user
return f(*args, **kwargs)
wrapper.__name__ = f.__name__
return wrapper

35
app/extensions.py Normal file
View File

@@ -0,0 +1,35 @@
import logging
import coloredlogs
import secrets
import string
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from app.config_loader import config_loader
logger = logging.getLogger("app")
coloredlogs.install(level=config_loader.LOGGING_LEVEL, logger=logger, fmt=config_loader.LOGGING_FORMAT)
client = None
def init_mongo(uri: str, test_mode=False):
global client
if test_mode:
logger.info("Running in test mode, skipping MongoDB connection")
return
client = MongoClient(uri, server_api=ServerApi('1'))
try:
client.admin.command('ping')
logger.info("MongoDB connected successfully")
except Exception as e:
logger.error(f"MongoDB connection failed: {e}")
raise
def generate_code(length=6) -> str:
"""生成数字验证码"""
return ''.join(secrets.choice('0123456789') for _ in range(length))
def generate_numeric_id(length=8) -> str:
"""生成数字ID"""
return ''.join(secrets.choice(string.digits) for _ in range(length))

35
app/init.py Normal file
View File

@@ -0,0 +1,35 @@
from flask import Flask
from app.config import Config
from app.extensions import init_mongo
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
app.secret_key = Config.SECRET_KEY
init_mongo(Config.MONGO_URI, Config.ISTEST_MODE)
# 注册蓝图
from routes.announcement import announcement_bp
from routes.auth import auth_bp
from routes.gacha_log import gacha_log_bp
from routes.web_api import web_api_bp
from routes.misc import misc_bp
from routes.download_resource import download_resource_bp
app.register_blueprint(announcement_bp, url_prefix="/Announcement")
app.register_blueprint(auth_bp)
app.register_blueprint(gacha_log_bp)
app.register_blueprint(web_api_bp)
app.register_blueprint(misc_bp)
app.register_blueprint(download_resource_bp)
# CORS
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
return response
return app

46
app/utils/jwt_utils.py Normal file
View File

@@ -0,0 +1,46 @@
import jwt
import datetime
from flask import current_app
from app.config_loader import config_loader
def create_token(user_id: str) -> str:
"""
创建JWT访问令牌有效期由配置文件中的JWT_EXPIRATION_HOURS决定。
:param user_id: 用户ID
:return: JWT 访问令牌
"""
payload = {
"user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS)
}
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM)
# 创建刷新token有效期是访问token的两倍
def create_refresh_token(user_id: str) -> str:
"""
创建JWT刷新令牌有效期为访问令牌的两倍。
:param user_id: 用户ID
:return: JWT 刷新令牌
"""
payload = {
"user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS * 2)
}
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM)
def verify_token(token: str)-> str | None:
"""
验证JWT令牌并返回用户ID如果无效则返回None。
:param token: JWT令牌字符串
:type token: str
:return: 用户ID或None
:rtype: str | None
"""
try:
data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=[config_loader.JWT_ALGORITHM])
return data["user_id"]
except:
return None

8
requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
coloredlogs==15.0.1
Flask==3.1.2
pycryptodome==3.20.0
PyJWT==2.10.1
pymongo==4.15.5
Werkzeug==3.1.4
sentry-sdk[flask]
gunicorn

14
routes/announcement.py Normal file
View File

@@ -0,0 +1,14 @@
from flask import Blueprint, jsonify, request
from services.announcement_service import get_announcements
announcement_bp = Blueprint("announcement", __name__)
@announcement_bp.route("/List", methods=["POST"])
def list_announcements():
# 获取用户已关闭的公告ID列表也可能没有请求体
request_data = request.get_json(silent=True) or []
return jsonify({
"code": 0,
"message": "OK",
"data": get_announcements(request_data)
})

242
routes/auth.py Normal file
View File

@@ -0,0 +1,242 @@
from flask import Blueprint, request, jsonify
from app.utils.jwt_utils import create_token, verify_token, create_refresh_token
from services.auth_service import (
decrypt_data, send_verification_email, verify_user_credentials,
create_user_account, get_user_by_id
)
from services.verification_code_service import save_verification_code, verify_code
from app.extensions import generate_code, logger , config_loader
from app.config import Config
auth_bp = Blueprint("auth", __name__)
@auth_bp.route('/Passport/v2/Verify', methods=['POST'])
def passport_verify():
"""获取验证码"""
data = request.get_json()
encrypted_email = data.get('UserName', '')
try:
decrypted_email = decrypt_data(encrypted_email)
logger.debug(f"Decrypted email: {decrypted_email}")
except Exception as e:
logger.error(f"Decryption error: {e}")
return jsonify({
"retcode": 1,
"message": f"Invalid encrypted email: {str(e)}",
"data": None
})
# 生成验证码
code = generate_code(6)
# 使用 MongoDB TTL 存储验证码
save_verification_code(decrypted_email, code, expire_minutes=Config.VERIFICATION_CODE_EXPIRE_MINUTES)
# 发送邮件
if send_verification_email(decrypted_email, code):
return jsonify({
"retcode": 0,
"message": "success",
"l10nKey": "ViewDialogUserAccountVerificationEmailCaptchaSent"
})
else:
return jsonify({
"retcode": 1,
"message": "Failed to send email",
"data": None
}), 500
@auth_bp.route('/Passport/v2/Register', methods=['POST'])
def passport_register():
"""用户注册"""
data = request.get_json()
encrypted_email = data.get('UserName', '')
encrypted_password = data.get('Password', '')
encrypted_code = data.get('VerifyCode', '')
try:
decrypted_email = decrypt_data(encrypted_email)
decrypted_password = decrypt_data(encrypted_password)
decrypted_code = decrypt_data(encrypted_code)
logger.debug(f"Decrypted registration data: email={decrypted_email}, code={decrypted_code}")
except Exception as e:
logger.warning(f"Decryption error: {e}")
return jsonify({
"retcode": 1,
"message": f"Invalid encrypted data: {str(e)}",
"data": None
}), 400
# 使用 MongoDB 验证验证码
if not verify_code(decrypted_email, decrypted_code):
logger.warning("Invalid verification code")
return jsonify({
"retcode": 2,
"message": "Invalid verification code",
"data": None
})
# 创建新用户
new_user = create_user_account(decrypted_email, decrypted_password)
if not new_user:
logger.warning(f"User already exists: {decrypted_email}")
return jsonify({
"retcode": 3,
"message": "User already exists",
"data": None
})
# 创建token
access_token = create_token(str(new_user['_id']))
# 刷新token
refresh_token = create_refresh_token(str(new_user['_id']))
logger.info(f"User registered: {decrypted_email}")
return jsonify({
"retcode": 0,
"message": "success",
"data": {
"AccessToken": access_token,
"RefreshToken": refresh_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
@auth_bp.route('/Passport/v2/Login', methods=['POST'])
def passport_login():
"""用户登录"""
data = request.get_json()
encrypted_email = data.get('UserName', '')
encrypted_password = data.get('Password', '')
try:
decrypted_email = decrypt_data(encrypted_email)
decrypted_password = decrypt_data(encrypted_password)
logger.debug(f"Decrypted login data: email={decrypted_email}")
except Exception as e:
logger.warning(f"Decryption error: {e}")
return jsonify({
"retcode": 1,
"message": f"Invalid encrypted data: {str(e)}",
"data": None
}), 400
# 验证用户凭据
user = verify_user_credentials(decrypted_email, decrypted_password)
if not user:
logger.warning(f"Invalid login attempt for email: {decrypted_email}")
return jsonify({
"retcode": 2,
"message": "Invalid email or password",
"data": None
})
# 创建token
access_token = create_token(str(user['_id']))
refresh_token = create_refresh_token(str(user['_id']))
logger.info(f"User logged in: {decrypted_email}")
return jsonify({
"retcode": 0,
"message": "success",
"l10nKey": "ServerPassportLoginSucceed",
"data": {
"AccessToken": access_token,
"RefreshToken": refresh_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
@auth_bp.route('/Passport/v2/UserInfo', methods=['GET'])
def passport_userinfo():
"""获取用户信息"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
user = get_user_by_id(user_id)
if not user:
logger.warning(f"User not found: {user_id}")
return jsonify({
"retcode": 2,
"message": "User not found",
"data": None
})
logger.info(f"User info retrieved: {user['email']}")
return jsonify({
"retcode": 0,
"message": "success",
"data": {
"NormalizedUserName": user['NormalizedUserName'],
"UserName": user['UserName'],
"IsLicensedDeveloper": user['IsLicensedDeveloper'],
"IsMaintainer": user['IsMaintainer'],
"GachaLogExpireAt": user['GachaLogExpireAt'],
"CdnExpireAt": user['CdnExpireAt']
}
})
@auth_bp.route('/Passport/v2/RefreshToken', methods=['POST'])
def passport_refresh_token():
"""刷新Token"""
data = request.get_json()
refresh_token = data.get('RefreshToken', '')
try:
decrypted_refresh_token = decrypt_data(refresh_token)
except Exception as e:
logger.error(f"Decryption error: {e}")
return jsonify({
"retcode": 1,
"message": f"Invalid encrypted refresh token: {str(e)}",
"data": None
}), 400
user_id = verify_token(decrypted_refresh_token)
if not user_id:
logger.warning("Invalid or expired refresh token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired refresh token",
"data": None
})
access_token = create_token(user_id)
refresh_token = create_refresh_token(user_id)
logger.info(f"Token refreshed for user_id: {user_id}")
return jsonify({
"retcode": 0,
"message": "success",
"data": {
"AccessToken": access_token,
"RefreshToken": refresh_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
@auth_bp.route('/Passport/v2/RevokeToken', methods=['POST'])
def passport_revoke_token():
"""注销Token"""
logger.info("Token revoked")
return jsonify({
"retcode": 0,
"message": "Token revoked successfully",
"data": None
})

270
routes/download_resource.py Normal file
View File

@@ -0,0 +1,270 @@
from flask import Blueprint, request, jsonify
from app.decorators import require_maintainer_permission
from app.extensions import logger
from services.download_resource_service import (
create_download_resource,
get_download_resources,
get_download_resource_by_id,
update_download_resource,
delete_download_resource,
get_latest_version
)
download_resource_bp = Blueprint("download_resource", __name__)
# 公开API - 获取下载资源列表
@download_resource_bp.route('/download-resources', methods=['GET'])
def get_public_download_resources():
"""
获取下载资源列表公开API
可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回所有
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
"""
package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_test参数默认只返回正式版本
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
# 只返回激活的资源
resources = get_download_resources(package_type=package_type, is_active=True, is_test=is_test)
return jsonify({
"code": 0,
"message": "success",
"data": resources
})
@download_resource_bp.route('/download-resources/latest', methods=['GET'])
def get_latest_download_resource():
"""
获取最新版本公开API
可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回最新的任意类型
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
"""
package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_test参数
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resource = get_latest_version(package_type=package_type, is_test=is_test)
if resource:
return jsonify({
"code": 0,
"message": "success",
"data": resource
})
else:
return jsonify({
"code": 1,
"message": "No resource found",
"data": None
}), 404
# Web管理端API - 增删改查
@download_resource_bp.route('/web-api/download-resources', methods=['POST'])
@require_maintainer_permission
def web_api_create_download_resource():
"""创建下载资源"""
data = request.get_json()
# 验证必需字段
required_fields = ['version', 'package_type', 'download_url']
if not all(k in data for k in required_fields):
return jsonify({
"code": 1,
"message": f"Missing required fields: {', '.join(required_fields)}",
"data": None
}), 400
# 验证package_type
if data['package_type'] not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 添加创建者信息
data['created_by'] = str(request.current_user['_id'])
# 创建资源
resource_id = create_download_resource(data)
if resource_id:
logger.info(f"Download resource created with ID: {resource_id} by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource created successfully",
"data": {
"id": str(resource_id)
}
})
else:
logger.error("Failed to create download resource")
return jsonify({
"code": 2,
"message": "Failed to create download resource",
"data": None
}), 500
@download_resource_bp.route('/web-api/download-resources', methods=['GET'])
@require_maintainer_permission
def web_api_get_download_resources():
"""获取下载资源列表(管理端,包含所有资源,包括未激活的)"""
package_type = request.args.get('package_type')
is_active_str = request.args.get('is_active')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 处理is_active参数
is_active = None
if is_active_str is not None:
is_active = is_active_str.lower() == 'true'
# 处理is_test参数
is_test = None
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resources = get_download_resources(package_type=package_type, is_active=is_active, is_test=is_test)
return jsonify({
"code": 0,
"message": "success",
"data": resources
})
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['GET'])
@require_maintainer_permission
def web_api_get_download_resource(resource_id):
"""获取单个下载资源详情"""
resource = get_download_resource_by_id(resource_id)
if resource:
return jsonify({
"code": 0,
"message": "success",
"data": resource
})
else:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['PUT'])
@require_maintainer_permission
def web_api_update_download_resource(resource_id):
"""更新下载资源"""
data = request.get_json()
# 检查资源是否存在
existing_resource = get_download_resource_by_id(resource_id)
if not existing_resource:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
# 验证package_type如果提供
if 'package_type' in data and data['package_type'] not in ['msi', 'msix']:
return jsonify({
"code": 1,
"message": "Invalid package_type, must be 'msi' or 'msix'",
"data": None
}), 400
# 添加更新者信息
data['updated_by'] = str(request.current_user['_id'])
# 更新资源
success = update_download_resource(resource_id, data)
if success:
logger.info(f"Download resource {resource_id} updated by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource updated successfully",
"data": None
})
else:
logger.error(f"Failed to update download resource {resource_id}")
return jsonify({
"code": 2,
"message": "Failed to update download resource",
"data": None
}), 500
@download_resource_bp.route('/web-api/download-resources/<resource_id>', methods=['DELETE'])
@require_maintainer_permission
def web_api_delete_download_resource(resource_id):
"""删除下载资源"""
# 检查资源是否存在
existing_resource = get_download_resource_by_id(resource_id)
if not existing_resource:
return jsonify({
"code": 1,
"message": "Resource not found",
"data": None
}), 404
# 删除资源
success = delete_download_resource(resource_id)
if success:
logger.info(f"Download resource {resource_id} deleted by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Download resource deleted successfully",
"data": None
})
else:
logger.error(f"Failed to delete download resource {resource_id}")
return jsonify({
"code": 2,
"message": "Failed to delete download resource",
"data": None
}), 500

161
routes/gacha_log.py Normal file
View File

@@ -0,0 +1,161 @@
from flask import Blueprint, request, jsonify
from app.utils.jwt_utils import verify_token
from services.gacha_log_service import (
get_gacha_log_entries, get_gacha_log_end_ids, upload_gacha_log,
retrieve_gacha_log, delete_gacha_log
)
from app.extensions import logger
gacha_log_bp = Blueprint("gacha_log", __name__)
@gacha_log_bp.route('/GachaLog/Statistics/Distribution/<distributionType>', methods=['GET'])
def gacha_log_statistics_distribution(distributionType):
"""获取祈愿记录统计分布"""
return jsonify({
"retcode": 0,
"message": "success",
"data": {}
})
@gacha_log_bp.route('/GachaLog/Entries', methods=['GET'])
def gacha_log_entries():
"""获取用户的祈愿记录条目列表"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
entries = get_gacha_log_entries(user_id)
logger.info(f"Gacha log entries retrieved for user_id: {user_id}")
logger.debug(f"Entries: {entries}")
return jsonify({
"retcode": 0,
"message": "success",
"data": entries
})
@gacha_log_bp.route('/GachaLog/EndIds', methods=['GET'])
def gacha_log_end_ids():
"""获取指定 UID 用户的祈愿记录最新 ID"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
uid = request.args.get('Uid', '')
end_ids = get_gacha_log_end_ids(user_id, uid)
logger.info(f"Gacha log end IDs retrieved for user_id: {user_id}, uid: {uid}")
logger.debug(f"End IDs: {end_ids}")
return jsonify({
"retcode": 0,
"message": "success",
"data": end_ids
})
@gacha_log_bp.route('/GachaLog/Upload', methods=['POST'])
def gacha_log_upload():
"""上传祈愿记录"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
data = request.get_json()
uid = data.get('Uid', '')
items = data.get('Items', [])
message = upload_gacha_log(user_id, uid, items)
logger.info(f"Gacha log upload for user_id: {user_id}, uid: {uid}")
return jsonify({
"retcode": 0,
"message": message,
"data": None
})
@gacha_log_bp.route('/GachaLog/Retrieve', methods=['POST'])
def gacha_log_retrieve():
"""从云端检索用户的祈愿记录数据"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
data = request.get_json()
uid = data.get('Uid', '')
end_ids = data.get('EndIds', {})
filtered_items = retrieve_gacha_log(user_id, uid, end_ids)
logger.info(f"Gacha log retrieved for user_id: {user_id}, uid: {uid}, items count: {len(filtered_items)}")
logger.debug(f"end_ids: {end_ids}")
return jsonify({
"retcode": 0,
"message": f"success, retrieved {len(filtered_items)} items",
"data": filtered_items
})
@gacha_log_bp.route('/GachaLog/Delete', methods=['GET'])
def gacha_log_delete():
"""删除用户的祈愿记录"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
uid = request.args.get('Uid', '')
success = delete_gacha_log(user_id, uid)
if success:
logger.info(f"Gacha log deleted for user_id: {user_id}, uid: {uid}")
return jsonify({
"retcode": 0,
"message": "success, gacha log deleted",
"data": None
})
else:
logger.info(f"No gacha log found to delete for user_id: {user_id}, uid: {uid}")
return jsonify({
"retcode": 2,
"message": "no gacha log found to delete",
"data": None
})

86
routes/misc.py Normal file
View File

@@ -0,0 +1,86 @@
from flask import Blueprint, request, jsonify, send_file
from app.extensions import logger, client
from app.config import Config
misc_bp = Blueprint("misc", __name__)
@misc_bp.route('/patch/hutao', methods=['GET'])
def patch_hutao():
"""获取新版本信息"""
return {
"code": 0,
"message": "OK",
"data": {
"validation": "",
"version": "1.0.0",
"mirrors": []
}
}
@misc_bp.route('/git-repository/all', methods=['GET'])
def git_repository_all():
"""获取所有Git仓库"""
if Config.ISTEST_MODE:
# 覆盖元数据仓库列表,测试用
repositories = [
{
"name": "test",
"https_url": "http://server.wdg.cloudns.ch:3000/wdg1122/Snap.Metadata.Test.git",
"web_url": "http://server.wdg.cloudns.ch:3000/wdg1122/Snap.Metadata.Test",
"type": "Public"
}
]
return jsonify({
"code": 0,
"message": "OK",
"data": repositories
})
# 从数据库获取 Git 仓库列表
git_repositories = list(client.ht_server.git_repository.find({}))
for repo in git_repositories:
repo.pop('_id', None)
logger.debug(f"Git repositories: {git_repositories}")
return jsonify({
"code": 0,
"message": "OK",
"data": git_repositories
})
@misc_bp.route('/static/raw/<category>/<fileName>', methods=['GET'])
def get_image(category, fileName):
"""获取图片资源,弃用,请使用额外的文件服务器"""
return jsonify({"code": 1, "message": "Image not found"}), 404
@misc_bp.route('/mgnt/am-i-banned', methods=['GET'])
def mgnt_am_i_banned():
"""检查游戏账户是否禁用注入,目前直接返回成功的响应即可"""
return jsonify({
"retcode": 0,
"message": "OK",
"data": {}
})
# 获取额外的第三方注入工具
@misc_bp.route('/tools', methods=['GET'])
def get_tools():
"""获取额外的第三方注入工具列表"""
tools = list(client.ht_server.tools.find({}))
for tool in tools:
tool.pop('_id', None)
logger.debug(f"Tools: {tools}")
return jsonify({
"code": 0,
"message": "OK",
"data": tools
})

256
routes/web_api.py Normal file
View File

@@ -0,0 +1,256 @@
import datetime
from bson import ObjectId
from flask import Blueprint, request, jsonify
from app.utils.jwt_utils import verify_token, create_token
from services.auth_service import verify_user_credentials, get_users_with_search
from app.decorators import require_maintainer_permission
from app.extensions import generate_numeric_id, client, logger, config_loader
web_api_bp = Blueprint("web_api", __name__)
@web_api_bp.route('/web-api/login', methods=['POST'])
def web_api_login():
"""Web管理端登录"""
data = request.get_json()
email = data.get('email', '')
password = data.get('password', '')
# 验证用户凭据
user = verify_user_credentials(email, password)
if not user:
logger.warning(f"Invalid web login attempt for email: {email}")
return jsonify({
"code": 1,
"message": "Invalid email or password",
"data": None
})
# 创建token
access_token = create_token(str(user['_id']))
logger.info(f"Web user logged in: {email}")
return jsonify({
"code": 0,
"message": "success",
"data": {
"access_token": access_token,
"expires_in": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
# 公告管理API
@web_api_bp.route('/web-api/announcement', methods=['POST'])
@require_maintainer_permission
def web_api_create_announcement():
"""创建公告"""
data = request.get_json()
# 验证必需字段
if not all(k in data for k in ['Title', 'Content', 'Locale']):
return jsonify({
"code": 1,
"message": "Missing required fields: Title, Content, Locale",
"data": None
}), 400
# 生成公告ID
announcement_id = int(generate_numeric_id(8))
# 创建公告对象
announcement = {
"Id": announcement_id,
"Title": data['Title'],
"Content": data['Content'],
"Severity": data.get('Severity', 0), # 默认为Informational
"Link": data.get('Link', ''),
"Locale": data['Locale'],
"LastUpdateTime": int(datetime.datetime.now().timestamp()),
"MaxPresentVersion": data.get('MaxPresentVersion', None),
"CreatedBy": str(request.current_user['_id']),
"CreatedAt": datetime.datetime.utcnow(),
# 发行版名称,用于给不同的发行版显示不同的公告,默认为空字符串,表示所有发行版
"Distribution": data.get('Distribution', '')
}
# 插入数据库
result = client.ht_server.announcement.insert_one(announcement)
if result.inserted_id:
logger.info(f"Announcement created with ID: {announcement_id} by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Announcement created successfully",
"data": {
"Id": announcement_id
}
})
else:
logger.error("Failed to create announcement")
return jsonify({
"code": 2,
"message": "Failed to create announcement",
"data": None
}), 500
@web_api_bp.route('/web-api/announcement/<int:announcement_id>', methods=['PUT'])
@require_maintainer_permission
def web_api_update_announcement(announcement_id):
"""编辑公告"""
data = request.get_json()
# 检查公告是否存在
existing_announcement = client.ht_server.announcement.find_one({"Id": announcement_id})
if not existing_announcement:
return jsonify({
"code": 1,
"message": "Announcement not found",
"data": None
}), 404
# 更新字段
update_data = {
"LastUpdateTime": int(datetime.datetime.now().timestamp()),
"UpdatedBy": str(request.current_user['_id']),
"UpdatedAt": datetime.datetime.utcnow()
}
# 只更新提供的字段
if 'Title' in data:
update_data["Title"] = data['Title']
if 'Content' in data:
update_data["Content"] = data['Content']
if 'Severity' in data:
update_data["Severity"] = data['Severity']
if 'Link' in data:
update_data["Link"] = data['Link']
if 'Locale' in data:
update_data["Locale"] = data['Locale']
if 'MaxPresentVersion' in data:
update_data["MaxPresentVersion"] = data['MaxPresentVersion']
if 'Distribution' in data:
update_data["Distribution"] = data['Distribution']
# 执行更新
result = client.ht_server.announcement.update_one(
{"Id": announcement_id},
{"$set": update_data}
)
if result.modified_count > 0:
logger.info(f"Announcement {announcement_id} updated by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Announcement updated successfully",
"data": None
})
else:
logger.warning(f"No changes made to announcement {announcement_id}")
return jsonify({
"code": 2,
"message": "No changes made",
"data": None
})
@web_api_bp.route('/web-api/announcement/<int:announcement_id>', methods=['DELETE'])
@require_maintainer_permission
def web_api_delete_announcement(announcement_id):
"""删除公告"""
# 检查公告是否存在
existing_announcement = client.ht_server.announcement.find_one({"Id": announcement_id})
if not existing_announcement:
return jsonify({
"code": 1,
"message": "Announcement not found",
"data": None
}), 404
# 执行删除
result = client.ht_server.announcement.delete_one({"Id": announcement_id})
if result.deleted_count > 0:
logger.info(f"Announcement {announcement_id} deleted by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Announcement deleted successfully",
"data": None
})
else:
logger.error(f"Failed to delete announcement {announcement_id}")
return jsonify({
"code": 2,
"message": "Failed to delete announcement",
"data": None
}), 500
@web_api_bp.route('/web-api/announcement/<int:announcement_id>', methods=['GET'])
@require_maintainer_permission
def web_api_get_announcement(announcement_id):
"""获取单个公告详情"""
# 查询公告
announcement = client.ht_server.announcement.find_one({"Id": announcement_id})
if not announcement:
return jsonify({
"code": 1,
"message": "Announcement not found",
"data": None
}), 404
# 移除MongoDB的_id字段
announcement.pop('_id', None)
return jsonify({
"code": 0,
"message": "success",
"data": announcement
})
@web_api_bp.route('/web-api/users', methods=['GET'])
def web_api_get_users():
"""获取所有用户列表需要验证token并且需要高权限"""
# 获取用户信息
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"code": 1,
"message": "Invalid or expired token",
"data": None
}), 401
# 检查用户是否具有高权限
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
if not user or not (user.get("IsMaintainer", False) and user.get("IsLicensedDeveloper", False)):
logger.warning(f"User {user_id} does not have required permissions")
logger.debug(f"User details: {user}")
return jsonify({
"code": 2,
"message": "Insufficient permissions",
"data": None
}), 403
# 获取查询参数
q = request.args.get("q", "").strip()
role = request.args.get("role", "").strip() if request.args.get("role") else None
email = request.args.get("email", "").strip() if request.args.get("email") else None
username = request.args.get("username", "").strip() if request.args.get("username") else None
id_param = request.args.get("id", "").strip() if request.args.get("id") else None
is_licensed = request.args.get("is", "").strip() if request.args.get("is") else None
users = get_users_with_search(q, role, email, username, id_param, is_licensed)
return jsonify({
"code": 0,
"message": "success",
"data": users
})

21
run.py Normal file
View File

@@ -0,0 +1,21 @@
from app.init import create_app
from app.config_loader import config_loader
import sentry_sdk
sentry_sdk.init(
dsn="https://d1cad1d2b442cf8431df3ee4bab925e0@o4507525750521856.ingest.us.sentry.io/4510623668830208",
# Add data like request headers and IP for users,
# see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
send_default_pii=True,
traces_sample_rate=1.0,
)
# 创建应用实例
app = create_app()
if __name__ == '__main__':
app.run(
host=config_loader.SERVER_HOST,
port=config_loader.SERVER_PORT,
debug=config_loader.SERVER_DEBUG
)

View File

@@ -0,0 +1,25 @@
from app.extensions import client, logger
from app.config import Config
def get_announcements(request_data: list):
"""
获取公告列表,过滤掉用户已关闭的公告
:param request_data: 用户已关闭的公告ID列表
:type request_data: list
"""
if Config.ISTEST_MODE:
return []
# 记录请求体到日志请求体中是用户已关闭的公告ID列表
logger.debug("Request body: %s", request_data)
announcements = list(client.ht_server.announcement.find({}))
result = []
for a in announcements:
# 拷贝并移除 _id 字段,避免 ObjectId 无法序列化
a = dict(a)
a.pop('_id', None)
# 如果请求体中包含该公告ID说明用户已关闭该公告不返回该公告
if a.get('Id') not in request_data:
result.append(a)
return result

307
services/auth_service.py Normal file
View File

@@ -0,0 +1,307 @@
from bson import ObjectId
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import client, logger
from app.config import Config
from app.config_loader import config_loader
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from datetime import timezone
from zoneinfo import ZoneInfo
import datetime
import SendEmailTool
import re
import base64
def decrypt_data(encrypted_data: str) -> str:
"""使用RSA私钥解密数据"""
try:
private_key_file = config_loader.RSA_PRIVATE_KEY_FILE
with open(private_key_file, 'r') as f:
private_key = RSA.import_key(f.read())
cipher = PKCS1_OAEP.new(private_key)
decrypted_data = cipher.decrypt(base64.b64decode(encrypted_data))
return decrypted_data.decode()
except Exception as e:
logger.error(f"Decryption error: {e}")
raise
def send_verification_email(email: str, code: str, ACTION_NAME="注册", EXPIRE_MINUTES=None) -> bool:
"""发送验证码邮件,目前只有注册场景,后续再扩展其他场景"""
try:
subject = Config.EMAIL_SUBJECT
textbody = f"您的验证码是: {code}"
APP_NAME = Config.EMAIL_APP_NAME
OFFICIAL_WEBSITE = Config.EMAIL_OFFICIAL_WEBSITE
if EXPIRE_MINUTES is None:
EXPIRE_MINUTES = Config.VERIFICATION_CODE_EXPIRE_MINUTES
htmlbody = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>验证码邮件</title>
</head>
<body style="margin:0;padding:0;background-color:#f5f6f7;font-family:Arial,Helvetica,sans-serif;">
<table width="100%" cellpadding="0" cellspacing="0">
<tr>
<td align="center" style="padding:40px 0;">
<!-- 主体卡片 -->
<table width="480" cellpadding="0" cellspacing="0" style="background:#ffffff;border-radius:8px;padding:32px;">
<!-- 应用名 -->
<tr>
<td align="center" style="font-size:22px;font-weight:bold;color:#333333;padding-bottom:16px;">
{APP_NAME}
</td>
</tr>
<!-- 操作提示 -->
<tr>
<td style="font-size:14px;color:#555555;padding-bottom:24px;text-align:center;">
你正在进行 <strong>{ACTION_NAME}</strong> 操作,请使用以下验证码完成验证:
</td>
</tr>
<!-- 验证码 -->
<tr>
<td align="center" style="padding:20px 0;">
<div style="
display:inline-block;
font-size:32px;
font-weight:bold;
letter-spacing:6px;
color:#2e86de;
padding:12px 24px;
border:1px dashed #2e86de;
border-radius:6px;
">
{code}
</div>
</td>
</tr>
<!-- 有效期说明 -->
<tr>
<td style="font-size:13px;color:#888888;text-align:center;padding-top:16px;">
验证码有效期 {EXPIRE_MINUTES} 分钟,请勿泄露给他人。
</td>
</tr>
<!-- 分割线 -->
<tr>
<td style="padding:24px 0;">
<hr style="border:none;border-top:1px solid #eeeeee;">
</td>
</tr>
<!-- 底部信息 -->
<tr>
<td style="font-size:12px;color:#999999;text-align:center;line-height:1.6;">
本邮件由 {APP_NAME} 系统自动发送,请勿回复<br>
如非本人操作,请忽略本邮件
</td>
</tr>
<!-- 官网链接 -->
<tr>
<td align="center" style="padding-top:16px;">
<a href="{OFFICIAL_WEBSITE}"
style="font-size:12px;color:#2e86de;text-decoration:none;">
访问官网
</a>
</td>
</tr>
</table>
</td>
</tr>
</table>
</body>
</html>
"""
try:
SendEmailTool.send_email(
config_loader.EMAIL_GMAIL_USER,
config_loader.EMAIL_APP_PASSWORD,
email,
subject,
htmlbody,
app_name=APP_NAME,
body_type="html"
)
logger.info(f"HTML Verification email sent to {email}")
except Exception as e:
logger.error(f"Failed to send HTML verification email to {email}: {e}")
# 如果 HTML 邮件发送失败,尝试发送纯文本邮件
SendEmailTool.send_email(
config_loader.EMAIL_GMAIL_USER,
config_loader.EMAIL_APP_PASSWORD,
email,
subject,
textbody,
app_name=APP_NAME,
body_type="plain"
)
logger.info(f"Verification email sent to {email}")
return True
except Exception as e:
logger.error(f"Failed to send email: {e}")
return False
def verify_user_credentials(email: str, password: str) -> dict | None:
"""验证用户凭据"""
user = client.ht_server.users.find_one({"email": email})
if not user or not check_password_hash(user['password'], password):
return None
return user
def create_user_account(email: str, password: str) -> dict | None:
"""创建新用户账户"""
# 检查用户是否已存在
existing_user = client.ht_server.users.find_one({"email": email})
if existing_user:
return None
# 对密码进行哈希处理
hashed_password = generate_password_hash(password)
# 创建新用户
new_user = {
"email": email,
"password": hashed_password,
"NormalizedUserName": email,
"UserName": email,
"CreatedAt": datetime.datetime.utcnow(),
"IsLicensedDeveloper": False,
"IsMaintainer": False,
# 现在默认用户的上传权限不过期
"GachaLogExpireAt": "2099-01-01T00:00:00Z",
"CdnExpireAt": "2099-01-01T00:00:00Z"
}
result = client.ht_server.users.insert_one(new_user)
new_user['_id'] = result.inserted_id
return new_user
def get_user_by_id(user_id: str) -> dict | None:
"""根据ID获取用户信息"""
try:
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
if user:
user['_id'] = str(user['_id'])
return user
except Exception as e:
logger.error(f"Error retrieving user by ID: {e}")
return None
def get_users_with_search(query_text="", role=None, email=None, username=None, id=None, is_licensed=None) -> list:
"""获取用户列表,支持多种筛选条件"""
import re
# 构建查询条件
query = {}
and_conditions = []
# 通用搜索q 参数)- 匹配用户名、邮箱、ID
if query_text:
or_conditions = []
# 用户名模糊搜索
or_conditions.append({
"UserName": {"$regex": re.escape(query_text), "$options": "i"}
})
# 邮箱模糊搜索
or_conditions.append({
"email": {"$regex": re.escape(query_text), "$options": "i"}
})
# _id 搜索(支持完整或前缀)
if ObjectId.is_valid(query_text):
or_conditions.append({
"_id": ObjectId(query_text)
})
and_conditions.append({"$or": or_conditions})
# 按角色筛选
if role:
if role == "maintainer":
and_conditions.append({"IsMaintainer": True})
elif role == "developer":
and_conditions.append({"IsLicensedDeveloper": True})
elif role == "user":
# user 表示既不是 maintainer 也不是 developer
and_conditions.append({
"$and": [
{"IsMaintainer": {"$ne": True}},
{"IsLicensedDeveloper": {"$ne": True}}
]
})
# 按邮箱筛选(支持模糊匹配)
if email:
and_conditions.append({
"email": {"$regex": re.escape(email), "$options": "i"}
})
# 按用户名筛选(支持模糊匹配)
if username:
and_conditions.append({
"UserName": {"$regex": re.escape(username), "$options": "i"}
})
# 按用户ID筛选支持模糊匹配
if id:
if ObjectId.is_valid(id):
and_conditions.append({"_id": ObjectId(id)})
else:
# 如果不是有效的 ObjectId尝试匹配字符串形式的 _id
and_conditions.append({
"_id": {"$regex": re.escape(id), "$options": "i"}
})
# 按状态筛选
if is_licensed:
if is_licensed == "licensed":
and_conditions.append({"IsLicensedDeveloper": True})
elif is_licensed == "not-licensed":
and_conditions.append({"IsLicensedDeveloper": False})
# 构建 AND 查询
if and_conditions:
if len(and_conditions) == 1:
query = and_conditions[0]
else:
query = {"$and": and_conditions}
# 查询数据库(排除密码)
cursor = client.ht_server.users.find(query, {"password": 0})
# 去重(按 _id
users_map = {}
for u in cursor:
users_map[str(u["_id"])] = u
users = list(users_map.values())
# 数据格式化
CST = ZoneInfo("Asia/Shanghai")
for u in users:
u['_id'] = str(u['_id'])
created_at = u.get("CreatedAt")
if created_at:
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
created_at_cst = created_at.astimezone(CST)
u["CreatedAt"] = created_at_cst.strftime("%Y-%m-%d %H:%M:%S")
return users

View File

@@ -0,0 +1,241 @@
import datetime
from app.extensions import client, logger
from app.config import Config
def create_download_resource(data):
"""
创建下载资源
:param data: 包含以下字段的数据
- version: 版本号
- package_type: 包类型 (msi/msix)
- download_url: 下载链接
- features: 新功能描述
- file_size: 文件大小 (可选)
- file_hash: 文件哈希 (可选)
- is_active: 是否激活 (可选默认为True)
- is_test: 是否为测试版本 (可选默认为False)
:return: 创建的资源ID或None
"""
try:
resource_doc = {
"version": data['version'],
"package_type": data['package_type'],
"download_url": data['download_url'],
"features": data.get('features', ''),
"file_size": data.get('file_size'),
"file_hash": data.get('file_hash'),
"is_active": data.get('is_active', True),
"is_test": data.get('is_test', False),
"created_at": datetime.datetime.utcnow(),
"created_by": data.get('created_by')
}
result = client.ht_server.download_resources.insert_one(resource_doc)
logger.info(f"Download resource created with ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Failed to create download resource: {e}")
return None
def get_download_resources(package_type=None, is_active=None, is_test=None):
"""
获取下载资源列表
:param package_type: 包类型过滤 (msi/msix)None表示获取所有
:param is_active: 是否激活过滤None表示获取所有
:param is_test: 是否为测试版本过滤None表示获取所有
:return: 资源列表
"""
try:
query = {}
if package_type:
query['package_type'] = package_type
if is_active is not None:
query['is_active'] = is_active
if is_test is not None:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
if is_test:
query['is_test'] = True
else:
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
resources = list(client.ht_server.download_resources.find(query, sort=[("created_at", -1)]))
# 移除 _id 字段并转换日期
result = []
for r in resources:
r = dict(r)
# 转换_id为字符串并存为id字段
r['id'] = str(r.pop('_id'))
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in r:
r['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in r and isinstance(r['created_at'], datetime.datetime):
dt = r['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
r['created_at'] = dt.isoformat()
if 'updated_at' in r and isinstance(r['updated_at'], datetime.datetime):
dt = r['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
r['updated_at'] = dt.isoformat()
result.append(r)
return result
except Exception as e:
logger.error(f"Failed to get download resources: {e}")
return []
def get_download_resource_by_id(resource_id):
"""
根据ID获取下载资源
:param resource_id: 资源ID
:return: 资源对象或None
"""
try:
from bson import ObjectId
resource = client.ht_server.download_resources.find_one({"_id": ObjectId(resource_id)})
if resource:
resource = dict(resource)
resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['created_at'] = dt.isoformat()
if 'updated_at' in resource and isinstance(resource['updated_at'], datetime.datetime):
dt = resource['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['updated_at'] = dt.isoformat()
return resource
return None
except Exception as e:
logger.error(f"Failed to get download resource by ID: {e}")
return None
def update_download_resource(resource_id, data):
"""
更新下载资源
:param resource_id: 资源ID
:param data: 要更新的字段
:return: 是否成功
"""
try:
from bson import ObjectId
# 构建更新数据
update_data = {"updated_at": datetime.datetime.utcnow()}
if 'version' in data:
update_data['version'] = data['version']
if 'package_type' in data:
update_data['package_type'] = data['package_type']
if 'download_url' in data:
update_data['download_url'] = data['download_url']
if 'features' in data:
update_data['features'] = data['features']
if 'file_size' in data:
update_data['file_size'] = data['file_size']
if 'file_hash' in data:
update_data['file_hash'] = data['file_hash']
if 'is_active' in data:
update_data['is_active'] = data['is_active']
if 'is_test' in data:
update_data['is_test'] = data['is_test']
if 'updated_by' in data:
update_data['updated_by'] = data['updated_by']
result = client.ht_server.download_resources.update_one(
{"_id": ObjectId(resource_id)},
{"$set": update_data}
)
if result.modified_count > 0:
logger.info(f"Download resource {resource_id} updated successfully")
return True
return False
except Exception as e:
logger.error(f"Failed to update download resource: {e}")
return False
def delete_download_resource(resource_id):
"""
删除下载资源
:param resource_id: 资源ID
:return: 是否成功
"""
try:
from bson import ObjectId
result = client.ht_server.download_resources.delete_one({"_id": ObjectId(resource_id)})
if result.deleted_count > 0:
logger.info(f"Download resource {resource_id} deleted successfully")
return True
return False
except Exception as e:
logger.error(f"Failed to delete download resource: {e}")
return False
def get_latest_version(package_type=None, is_test=False):
"""
获取最新版本
:param package_type: 包类型 (msi/msix)None表示获取所有类型的最新版本
:param is_test: 是否包含测试版本默认为False只返回正式版本
:return: 资源对象或None
"""
try:
query = {"is_active": True}
if not is_test:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
else:
query['is_test'] = True
if package_type:
query['package_type'] = package_type
resource = client.ht_server.download_resources.find_one(
query,
sort=[("created_at", -1)]
)
if resource:
resource = dict(resource)
resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['created_at'] = dt.isoformat()
if 'updated_at' in resource and isinstance(resource['updated_at'], datetime.datetime):
dt = resource['updated_at'].replace(tzinfo=datetime.timezone.utc)
dt = dt.astimezone(Config.TIMEZONE)
resource['updated_at'] = dt.isoformat()
return resource
return None
except Exception as e:
logger.error(f"Failed to get latest version: {e}")
return None

View File

@@ -0,0 +1,118 @@
from app.extensions import client, logger
"""
注意记录中有两种类型GachaType和QueryType(uigf_gacha_type)GachaType多了一个400类型其实就是QueryType的301类型客户端传的end_ids是按QueryType来的如果按照GachaType来筛选会多出400类型的记录
映射关系
| `uigf_gacha_type` | `gacha_type` |
|-------------------|----------------|
| `100` | `100` |
| `200` | `200` |
| `301` | `301` or `400` |
| `302` | `302` |
| `500` | `500` |
"""
def get_gacha_log_entries(user_id):
"""获取用户的祈愿记录条目列表"""
gacha_logs = list(client.ht_server.GachaLog.find({"user_id": user_id}))
entries = []
for log in gacha_logs:
entry = {
"Uid": log['Uid'],
"Excluded": False,
"ItemCount": len(log['data'])
}
entries.append(entry)
return entries
def get_gacha_log_end_ids(user_id, uid):
"""获取指定 UID 用户的祈愿记录最新 ID"""
gacha_log = client.ht_server.GachaLog.find_one({"user_id": user_id, "Uid": uid})
if not gacha_log:
return {
"100": 0, # NoviceWish
"200": 0, # StandardWish
"301": 0, # AvatarEventWish
"302": 0, # WeaponEventWish
"500": 0 # ChronicledWish
}
# 计算各个祈愿类型的最新ID
end_ids = {
"100": 0,
"200": 0,
"301": 0,
"302": 0,
"500": 0
}
for item in gacha_log['data']:
gacha_type = str(item.get('GachaType', ''))
item_id = item.get('Id', 0)
if gacha_type in end_ids:
end_ids[gacha_type] = max(end_ids[gacha_type], item_id)
# 400类型对应301类型
end_ids["400"] = end_ids["301"]
return end_ids
def upload_gacha_log(user_id, uid, items):
"""上传祈愿记录"""
# 查找是否已有该用户和UID的祈愿记录
existing_log = client.ht_server.GachaLog.find_one({"user_id": user_id, "Uid": uid})
if existing_log:
# 已有数据合并新旧数据按Id去重
old_items = existing_log.get('data', [])
# 用Id做索引先加旧的再加新的新数据覆盖旧数据
item_dict = {item.get('Id'): item for item in old_items}
for item in items:
item_dict[item.get('Id')] = item
merged_items = list(item_dict.values())
# 更新数据库
client.ht_server.GachaLog.update_one(
{"_id": existing_log['_id']},
{"$set": {"data": merged_items}}
)
return f"success, merged {len(items)} new items, total {len(merged_items)} items"
else:
# 没有数据,直接插入
gacha_log_entry = {
"user_id": user_id,
"Uid": uid,
"data": items
}
client.ht_server.GachaLog.insert_one(gacha_log_entry)
return f"success, uploaded {len(items)} items"
def retrieve_gacha_log(user_id, uid, end_ids):
"""从云端检索用户的祈愿记录数据"""
gacha_log = client.ht_server.GachaLog.find_one({"user_id": user_id, "Uid": uid})
if not gacha_log:
return []
# 筛选出比end_ids更旧的记录
filtered_items = []
# 需要将end_ids的key从QueryType转换为GachaType给400赋值为301的值即可
if "301" in end_ids:
end_ids["400"] = end_ids["301"]
for item in gacha_log['data']:
gacha_type = str(item.get('GachaType', ''))
item_id = item.get('Id', 0)
# end_ids有可能是0那么返回全部
if (gacha_type in end_ids and item_id < end_ids[gacha_type]) or end_ids.get(gacha_type, 0) == 0:
filtered_items.append(item)
return filtered_items
def delete_gacha_log(user_id, uid):
"""删除指定用户的祈愿记录"""
result = client.ht_server.GachaLog.delete_one({"user_id": user_id, "Uid": uid})
return result.deleted_count > 0

View File

@@ -0,0 +1,70 @@
import datetime
from app.extensions import client, logger
def init_verification_code_collection():
"""初始化验证码集合并创建 TTL 索引"""
db = client.ht_server
collection = db.verification_codes
indexes = collection.list_indexes()
ttl_index_exists = False
for index in indexes:
if index.get('expireAfterSeconds') is not None:
ttl_index_exists = True
break
# 如果不存在 TTL 索引,则创建
if not ttl_index_exists:
collection.create_index(
[("expire_at", 1)],
expireAfterSeconds=0,
name="expire_at_ttl"
)
logger.info("Created TTL index on verification_codes collection")
def save_verification_code(email: str, code: str, expire_minutes: int = 10):
"""保存验证码到 MongoDB自动过期时间由 TTL 索引控制"""
db = client.ht_server
collection = db.verification_codes
# 确保集合已初始化
init_verification_code_collection()
# 计算过期时间
expire_at = datetime.datetime.utcnow() + datetime.timedelta(minutes=expire_minutes)
# 插入验证码记录
result = collection.insert_one({
"email": email,
"code": code,
"created_at": datetime.datetime.utcnow(),
"expire_at": expire_at,
"used": False
})
logger.debug(f"Saved verification code for email: {email}")
return result.inserted_id
def verify_code(email: str, code: str) -> bool:
"""验证验证码是否正确,验证成功后删除验证码记录"""
db = client.ht_server
collection = db.verification_codes
# 查找未使用的验证码
verification_record = collection.find_one({
"email": email,
"code": code,
"used": False
})
if verification_record:
# 验证成功,删除该验证码记录
collection.delete_one({"_id": verification_record["_id"]})
logger.info(f"Verification code validated and deleted for email: {email}")
return True
logger.warning(f"Invalid or expired verification code for email: {email}")
return False