Merge pull request #1 from wangdage12/dev

初始版本
This commit is contained in:
wangdage12
2026-01-25 19:58:39 +08:00
committed by GitHub
21 changed files with 1441 additions and 0 deletions

11
.gitignore vendored Normal file
View File

@@ -0,0 +1,11 @@
.vscode/
# 公钥、私钥文件
*.pem
# python缓存文件
__pycache__/
*.pyc
# 配置json文件
config.json

112
README.md
View File

@@ -1,2 +1,114 @@
# Snap.Server # Snap.Server
Snap.Hutao新后端API Snap.Hutao新后端API
## 部署方法
### 在服务器生成RSA密钥
执行以下代码在根目录生成密钥:
```python
from Crypto.PublicKey import RSA
# 生成 2048 位 RSA 密钥对
key = RSA.generate(2048)
private_key = key.export_key()
public_key = key.publickey().export_key()
with open("private.pem", "wb") as f:
f.write(private_key)
with open("public.pem", "wb") as f:
f.write(public_key)
print("Keys generated.")
```
**确保客户端的公钥和生成的相同,否则将无法使用账户功能**
### 创建配置文件
创建`config.json`文件,示例内容如下:
```json
{
"SECRET_KEY": "jwt_secret_key",
"MONGO_URI": "mongodb+srv://wdgwdg889_db_user:xxxxxx@cluster0.eplrcvl.mongodb.net/?appName=Cluster0",
"TIMEZONE": "Asia/Shanghai",
"ISTEST_MODE": false,
"SERVER": {
"HOST": "0.0.0.0",
"PORT": 5222,
"DEBUG": false
},
"JWT": {
"ALGORITHM": "HS256",
"EXPIRATION_HOURS": 24
},
"EMAIL": {
"GMAIL_USER": "wdgwdg889@gmail.com",
"APP_PASSWORD": ""
},
"RSA": {
"PRIVATE_KEY_FILE": "private.pem",
"PUBLIC_KEY_FILE": "public.pem"
},
"LOGGING": {
"LEVEL": "DEBUG",
"FORMAT": ""
}
}
```
参数说明:
| 参数 | 说明 |
|------|------|
| SECRET_KEY | 用于JWT签名的密钥请设置为复杂字符串 |
| MONGO_URI | MongoDB连接字符串 |
| TIMEZONE | 服务器时区 |
| ISTEST_MODE | 是否启用测试模式,测试模式下部分功能将返回默认值,不连接数据库 |
| SERVER.HOST | 服务器监听地址 |
| SERVER.PORT | 服务器监听端口 |
| SERVER.DEBUG | 是否启用Flask的调试模式 |
| JWT.ALGORITHM | JWT签名算法 |
| JWT.EXPIRATION_HOURS | JWT过期时间小时 |
| EMAIL.GMAIL_USER | 用于发送验证邮件的Gmail账号 |
| EMAIL.APP_PASSWORD | Gmail应用专用密码 |
| RSA.PRIVATE_KEY_FILE | RSA私钥文件路径 |
| RSA.PUBLIC_KEY_FILE | RSA公钥文件路径 |
| LOGGING.LEVEL | 日志记录级别生产环境建议设置为INFO |
| LOGGING.FORMAT | 日志记录格式 |
### 开发环境启动方法
确保已安装依赖:
```
pip install -r requirements.txt
```
运行Flask应用
```
python app.py
```
### 生产环境启动方法
建议使用Gunicorn部署
```
pip install -r requirements.txt && python -m gunicorn run:app --bind 0.0.0.0:5222 --workers 4 --threads 2 --access-logfile - --error-logfile -
```
请根据服务器性能调整`--workers``--threads`参数。
### API文档
API文档可以在该地址访问
https://rdgm3wrj7r.apifox.cn/
### 注意事项
在轻量使用的场景下可以直接使用MongoDB Atlas的免费套餐但在高并发场景下建议使用自建MongoDB服务器以获得更好的性能和稳定性。
新MongoDB数据库会在写入数据时自动创建无需手动创建数据库和集合。

32
SendEmailTool.py Normal file
View File

@@ -0,0 +1,32 @@
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from app.config_loader import config_loader
# 从配置文件获取邮件设置
gmail_user = config_loader.EMAIL_GMAIL_USER
app_password = config_loader.EMAIL_APP_PASSWORD
to_email = ""
def send_email(gmail_user, app_password, to_email, subject, body="这是一封测试邮件。"):
msg = MIMEMultipart()
msg["From"] = gmail_user
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
# Gmail SMTP 服务器
server = smtplib.SMTP("smtp.gmail.com", 587)
server.starttls()
server.login(gmail_user, app_password)
server.sendmail(gmail_user, to_email, msg.as_string())
server.quit()
if __name__ == "__main__":
try:
send_email(gmail_user, app_password, to_email, "测试邮件主题", "这是一封测试邮件。")
print("邮件发送成功!")
except Exception as e:
print("发送失败:", e)

21
app.py Normal file
View File

@@ -0,0 +1,21 @@
from app.init import create_app
from app.config_loader import config_loader
import sentry_sdk
sentry_sdk.init(
dsn="https://d1cad1d2b442cf8431df3ee4bab925e0@o4507525750521856.ingest.us.sentry.io/4510623668830208",
# Add data like request headers and IP for users,
# see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
send_default_pii=True,
traces_sample_rate=1.0,
)
# 创建应用实例
app = create_app()
if __name__ == '__main__':
app.run(
host=config_loader.SERVER_HOST,
port=config_loader.SERVER_PORT,
debug=config_loader.SERVER_DEBUG
)

0
app/__init__.py Normal file
View File

8
app/config.py Normal file
View File

@@ -0,0 +1,8 @@
from app.config_loader import config_loader
# 使用配置加载器提供兼容的接口
class Config:
SECRET_KEY = config_loader.SECRET_KEY
MONGO_URI = config_loader.MONGO_URI
TIMEZONE = config_loader.TIMEZONE
ISTEST_MODE = config_loader.ISTEST_MODE

100
app/config_loader.py Normal file
View File

@@ -0,0 +1,100 @@
import json
import os
from zoneinfo import ZoneInfo
from typing import Dict, Any
class ConfigLoader:
def __init__(self, config_file: str = 'config.json'):
self.config_file = config_file
self._config = None
def load_config(self) -> Dict[str, Any]:
"""加载 JSON 配置文件"""
if self._config is None:
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), self.config_file)
try:
with open(config_path, 'r', encoding='utf-8') as f:
self._config = json.load(f)
except FileNotFoundError:
raise FileNotFoundError(f"配置文件 {config_path} 不存在")
except json.JSONDecodeError as e:
raise ValueError(f"配置文件格式错误: {e}")
return self._config
def get(self, key: str, default=None):
"""获取配置值,支持点号分隔的嵌套键"""
config = self.load_config()
keys = key.split('.')
value = config
try:
for k in keys:
value = value[k]
return value
except (KeyError, TypeError):
return default
@property
def SECRET_KEY(self) -> str:
return self.get('SECRET_KEY')
@property
def MONGO_URI(self) -> str:
return self.get('MONGO_URI')
@property
def TIMEZONE(self) -> ZoneInfo:
timezone_str = self.get('TIMEZONE', 'Asia/Shanghai')
return ZoneInfo(timezone_str)
@property
def ISTEST_MODE(self) -> bool:
return self.get('ISTEST_MODE', False)
@property
def SERVER_HOST(self) -> str:
return self.get('SERVER.HOST', '0.0.0.0')
@property
def SERVER_PORT(self) -> int:
return self.get('SERVER.PORT', 5222)
@property
def SERVER_DEBUG(self) -> bool:
return self.get('SERVER.DEBUG', False)
@property
def JWT_ALGORITHM(self) -> str:
return self.get('JWT.ALGORITHM', 'HS256')
@property
def JWT_EXPIRATION_HOURS(self) -> int:
return self.get('JWT.EXPIRATION_HOURS', 24)
@property
def EMAIL_GMAIL_USER(self) -> str:
return self.get('EMAIL.GMAIL_USER')
@property
def EMAIL_APP_PASSWORD(self) -> str:
return self.get('EMAIL.APP_PASSWORD')
@property
def RSA_PRIVATE_KEY_FILE(self) -> str:
return self.get('RSA.PRIVATE_KEY_FILE', 'private.pem')
@property
def RSA_PUBLIC_KEY_FILE(self) -> str:
return self.get('RSA.PUBLIC_KEY_FILE', 'public.pem')
@property
def LOGGING_LEVEL(self) -> str:
return self.get('LOGGING.LEVEL', 'DEBUG')
@property
def LOGGING_FORMAT(self) -> str:
return self.get('LOGGING.FORMAT', '%(asctime)s %(name)s %(levelname)s %(message)s')
# 创建全局配置实例
config_loader = ConfigLoader()

22
app/decorators.py Normal file
View File

@@ -0,0 +1,22 @@
from flask import request, jsonify
from bson import ObjectId
from app.extensions import client, logger
from app.utils.jwt_utils import verify_token
def require_maintainer_permission(f):
def wrapper(*args, **kwargs):
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
return jsonify({"code": 1, "message": "Invalid token"}), 401
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
if not user or not user.get("IsMaintainer", False):
return jsonify({"code": 2, "message": "Permission denied"}), 403
request.current_user = user
return f(*args, **kwargs)
wrapper.__name__ = f.__name__
return wrapper

35
app/extensions.py Normal file
View File

@@ -0,0 +1,35 @@
import logging
import coloredlogs
import secrets
import string
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from app.config_loader import config_loader
logger = logging.getLogger("app")
coloredlogs.install(level=config_loader.LOGGING_LEVEL, logger=logger, fmt=config_loader.LOGGING_FORMAT)
client = None
def init_mongo(uri: str, test_mode=False):
global client
if test_mode:
logger.info("Running in test mode, skipping MongoDB connection")
return
client = MongoClient(uri, server_api=ServerApi('1'))
try:
client.admin.command('ping')
logger.info("MongoDB connected successfully")
except Exception as e:
logger.error(f"MongoDB connection failed: {e}")
raise
def generate_code(length=6):
"""生成数字验证码"""
return ''.join(secrets.choice('0123456789') for _ in range(length))
def generate_numeric_id(length=8):
"""生成数字ID"""
return ''.join(secrets.choice(string.digits) for _ in range(length))

33
app/init.py Normal file
View File

@@ -0,0 +1,33 @@
from flask import Flask
from app.config import Config
from app.extensions import init_mongo
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
app.secret_key = Config.SECRET_KEY
init_mongo(Config.MONGO_URI, Config.ISTEST_MODE)
# 注册蓝图
from routes.announcement import announcement_bp
from routes.auth import auth_bp
from routes.gacha_log import gacha_log_bp
from routes.web_api import web_api_bp
from routes.misc import misc_bp
app.register_blueprint(announcement_bp, url_prefix="/Announcement")
app.register_blueprint(auth_bp)
app.register_blueprint(gacha_log_bp)
app.register_blueprint(web_api_bp)
app.register_blueprint(misc_bp)
# CORS
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
return response
return app

18
app/utils/jwt_utils.py Normal file
View File

@@ -0,0 +1,18 @@
import jwt
import datetime
from flask import current_app
from app.config_loader import config_loader
def create_token(user_id):
payload = {
"user_id": user_id,
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS)
}
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM)
def verify_token(token):
try:
data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=[config_loader.JWT_ALGORITHM])
return data["user_id"]
except:
return None

8
requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
coloredlogs==15.0.1
Flask==3.1.2
pycryptodome==3.20.0
PyJWT==2.10.1
pymongo==4.15.5
Werkzeug==3.1.4
sentry-sdk[flask]
gunicorn

14
routes/announcement.py Normal file
View File

@@ -0,0 +1,14 @@
from flask import Blueprint, jsonify, request
from services.announcement_service import get_announcements
announcement_bp = Blueprint("announcement", __name__)
@announcement_bp.route("/List", methods=["POST"])
def list_announcements():
# 获取用户已关闭的公告ID列表也可能没有请求体
request_data = request.get_json(silent=True) or []
return jsonify({
"code": 0,
"message": "OK",
"data": get_announcements(request_data)
})

241
routes/auth.py Normal file
View File

@@ -0,0 +1,241 @@
from flask import Blueprint, request, jsonify, session
from app.utils.jwt_utils import create_token, verify_token
from services.auth_service import (
decrypt_data, send_verification_email, verify_user_credentials,
create_user_account, get_user_by_id
)
from app.extensions import generate_code, logger , config_loader
auth_bp = Blueprint("auth", __name__)
@auth_bp.route('/Passport/v2/Verify', methods=['POST'])
def passport_verify():
"""获取验证码"""
data = request.get_json()
encrypted_email = data.get('UserName', '')
try:
decrypted_email = decrypt_data(encrypted_email)
logger.debug(f"Decrypted email: {decrypted_email}")
except Exception as e:
logger.error(f"Decryption error: {e}")
return jsonify({
"retcode": 1,
"message": f"Invalid encrypted email: {str(e)}",
"data": None
})
# 生成验证码
code = generate_code(6)
session['verification_code'] = code
session['email'] = decrypted_email
# 发送邮件
if send_verification_email(decrypted_email, code):
return jsonify({
"retcode": 0,
"message": "success",
"l10nKey": "ViewDialogUserAccountVerificationEmailCaptchaSent"
})
else:
return jsonify({
"retcode": 1,
"message": "Failed to send email",
"data": None
}), 500
@auth_bp.route('/Passport/v2/Register', methods=['POST'])
def passport_register():
"""用户注册"""
data = request.get_json()
encrypted_email = data.get('UserName', '')
encrypted_password = data.get('Password', '')
encrypted_code = data.get('VerifyCode', '')
try:
decrypted_email = decrypt_data(encrypted_email)
decrypted_password = decrypt_data(encrypted_password)
decrypted_code = decrypt_data(encrypted_code)
logger.debug(f"Decrypted registration data: email={decrypted_email}, code={decrypted_code}")
except Exception as e:
logger.warning(f"Decryption error: {e}")
return jsonify({
"retcode": 1,
"message": f"Invalid encrypted data: {str(e)}",
"data": None
}), 400
# 验证验证码
if (session.get('verification_code') != decrypted_code or
session.get('email') != decrypted_email):
logger.warning("Invalid verification code")
return jsonify({
"retcode": 2,
"message": "Invalid verification code",
"data": None
})
# 创建新用户
new_user = create_user_account(decrypted_email, decrypted_password)
if not new_user:
logger.warning(f"User already exists: {decrypted_email}")
return jsonify({
"retcode": 3,
"message": "User already exists",
"data": None
})
# 删除session中的验证码和邮箱
session.pop('verification_code', None)
session.pop('email', None)
# 创建token
access_token = create_token(str(new_user['_id']))
logger.info(f"User registered: {decrypted_email}")
return jsonify({
"retcode": 0,
"message": "success",
"data": {
"AccessToken": access_token,
"RefreshToken": access_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
@auth_bp.route('/Passport/v2/Login', methods=['POST'])
def passport_login():
"""用户登录"""
data = request.get_json()
encrypted_email = data.get('UserName', '')
encrypted_password = data.get('Password', '')
try:
decrypted_email = decrypt_data(encrypted_email)
decrypted_password = decrypt_data(encrypted_password)
logger.debug(f"Decrypted login data: email={decrypted_email}")
except Exception as e:
logger.warning(f"Decryption error: {e}")
return jsonify({
"retcode": 1,
"message": f"Invalid encrypted data: {str(e)}",
"data": None
}), 400
# 验证用户凭据
user = verify_user_credentials(decrypted_email, decrypted_password)
if not user:
logger.warning(f"Invalid login attempt for email: {decrypted_email}")
return jsonify({
"retcode": 2,
"message": "Invalid email or password",
"data": None
})
# 创建token
access_token = create_token(str(user['_id']))
logger.info(f"User logged in: {decrypted_email}")
return jsonify({
"retcode": 0,
"message": "success",
"l10nKey": "ServerPassportLoginSucceed",
"data": {
"AccessToken": access_token,
"RefreshToken": access_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
@auth_bp.route('/Passport/v2/UserInfo', methods=['GET'])
def passport_userinfo():
"""获取用户信息"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
user = get_user_by_id(user_id)
if not user:
logger.warning(f"User not found: {user_id}")
return jsonify({
"retcode": 2,
"message": "User not found",
"data": None
})
logger.info(f"User info retrieved: {user['email']}")
return jsonify({
"retcode": 0,
"message": "success",
"data": {
"NormalizedUserName": user['NormalizedUserName'],
"UserName": user['UserName'],
"IsLicensedDeveloper": user['IsLicensedDeveloper'],
"IsMaintainer": user['IsMaintainer'],
"GachaLogExpireAt": user['GachaLogExpireAt'],
"CdnExpireAt": user['CdnExpireAt']
}
})
@auth_bp.route('/Passport/v2/RefreshToken', methods=['POST'])
def passport_refresh_token():
"""刷新Token"""
data = request.get_json()
refresh_token = data.get('RefreshToken', '')
try:
decrypted_refresh_token = decrypt_data(refresh_token)
except Exception as e:
logger.error(f"Decryption error: {e}")
return jsonify({
"retcode": 1,
"message": f"Invalid encrypted refresh token: {str(e)}",
"data": None
}), 400
user_id = verify_token(decrypted_refresh_token)
if not user_id:
logger.warning("Invalid or expired refresh token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired refresh token",
"data": None
})
access_token = create_token(user_id)
logger.info(f"Token refreshed for user_id: {user_id}")
return jsonify({
"retcode": 0,
"message": "success",
"data": {
"AccessToken": access_token,
"RefreshToken": access_token,
"ExpiresIn": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
@auth_bp.route('/Passport/v2/RevokeToken', methods=['POST'])
def passport_revoke_token():
"""注销Token"""
logger.info("Token revoked")
return jsonify({
"retcode": 0,
"message": "Token revoked successfully",
"data": None
})

160
routes/gacha_log.py Normal file
View File

@@ -0,0 +1,160 @@
from flask import Blueprint, request, jsonify
from app.utils.jwt_utils import verify_token
from services.gacha_log_service import (
get_gacha_log_entries, get_gacha_log_end_ids, upload_gacha_log,
retrieve_gacha_log, delete_gacha_log
)
from app.extensions import logger
gacha_log_bp = Blueprint("gacha_log", __name__)
@gacha_log_bp.route('/GachaLog/Statistics/Distribution/<distributionType>', methods=['GET'])
def gacha_log_statistics_distribution(distributionType):
"""获取祈愿记录统计分布"""
return jsonify({
"retcode": 0,
"message": "success",
"data": {}
})
@gacha_log_bp.route('/GachaLog/Entries', methods=['GET'])
def gacha_log_entries():
"""获取用户的祈愿记录条目列表"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
entries = get_gacha_log_entries(user_id)
logger.info(f"Gacha log entries retrieved for user_id: {user_id}")
logger.debug(f"Entries: {entries}")
return jsonify({
"retcode": 0,
"message": "success",
"data": entries
})
@gacha_log_bp.route('/GachaLog/EndIds', methods=['GET'])
def gacha_log_end_ids():
"""获取指定 UID 用户的祈愿记录最新 ID"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
uid = request.args.get('Uid', '')
end_ids = get_gacha_log_end_ids(user_id, uid)
logger.info(f"Gacha log end IDs retrieved for user_id: {user_id}, uid: {uid}")
logger.debug(f"End IDs: {end_ids}")
return jsonify({
"retcode": 0,
"message": "success",
"data": end_ids
})
@gacha_log_bp.route('/GachaLog/Upload', methods=['POST'])
def gacha_log_upload():
"""上传祈愿记录"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
data = request.get_json()
uid = data.get('Uid', '')
items = data.get('Items', [])
message = upload_gacha_log(user_id, uid, items)
logger.info(f"Gacha log upload for user_id: {user_id}, uid: {uid}")
return jsonify({
"retcode": 0,
"message": message,
"data": None
})
@gacha_log_bp.route('/GachaLog/Retrieve', methods=['POST'])
def gacha_log_retrieve():
"""从云端检索用户的祈愿记录数据"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
data = request.get_json()
uid = data.get('Uid', '')
end_ids = data.get('EndIds', {})
filtered_items = retrieve_gacha_log(user_id, uid, end_ids)
logger.info(f"Gacha log retrieved for user_id: {user_id}, uid: {uid}, items count: {len(filtered_items)}")
return jsonify({
"retcode": 0,
"message": f"success, retrieved {len(filtered_items)} items",
"data": filtered_items
})
@gacha_log_bp.route('/GachaLog/Delete', methods=['GET'])
def gacha_log_delete():
"""删除用户的祈愿记录"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"retcode": 1,
"message": "Invalid or expired token",
"data": None
}), 401
uid = request.args.get('Uid', '')
success = delete_gacha_log(user_id, uid)
if success:
logger.info(f"Gacha log deleted for user_id: {user_id}, uid: {uid}")
return jsonify({
"retcode": 0,
"message": "success, gacha log deleted",
"data": None
})
else:
logger.info(f"No gacha log found to delete for user_id: {user_id}, uid: {uid}")
return jsonify({
"retcode": 2,
"message": "no gacha log found to delete",
"data": None
})

86
routes/misc.py Normal file
View File

@@ -0,0 +1,86 @@
from flask import Blueprint, request, jsonify, send_file
from app.extensions import logger, client
from app.config import Config
misc_bp = Blueprint("misc", __name__)
@misc_bp.route('/patch/hutao', methods=['GET'])
def patch_hutao():
"""获取新版本信息"""
return {
"code": 0,
"message": "OK",
"data": {
"validation": "",
"version": "1.0.0",
"mirrors": []
}
}
@misc_bp.route('/git-repository/all', methods=['GET'])
def git_repository_all():
"""获取所有Git仓库"""
if Config.ISTEST_MODE:
# 覆盖元数据仓库列表,测试用
repositories = [
{
"name": "test",
"https_url": "http://server.wdg.cloudns.ch:3000/wdg1122/Snap.Metadata.Test.git",
"web_url": "http://server.wdg.cloudns.ch:3000/wdg1122/Snap.Metadata.Test",
"type": "Public"
}
]
return jsonify({
"code": 0,
"message": "OK",
"data": repositories
})
# 从数据库获取 Git 仓库列表
git_repositories = list(client.ht_server.git_repository.find({}))
for repo in git_repositories:
repo.pop('_id', None)
logger.debug(f"Git repositories: {git_repositories}")
return jsonify({
"code": 0,
"message": "OK",
"data": git_repositories
})
@misc_bp.route('/static/raw/<category>/<fileName>', methods=['GET'])
def get_image(category, fileName):
"""获取图片资源,弃用,请使用额外的文件服务器"""
return jsonify({"code": 1, "message": "Image not found"}), 404
@misc_bp.route('/mgnt/am-i-banned', methods=['GET'])
def mgnt_am_i_banned():
"""检查游戏账户是否禁用注入,目前直接返回成功的响应即可"""
return jsonify({
"retcode": 0,
"message": "OK",
"data": {}
})
# 获取额外的第三方注入工具
@misc_bp.route('/tools', methods=['GET'])
def get_tools():
"""获取额外的第三方注入工具列表"""
tools = list(client.ht_server.tools.find({}))
for tool in tools:
tool.pop('_id', None)
logger.debug(f"Tools: {tools}")
return jsonify({
"code": 0,
"message": "OK",
"data": tools
})

250
routes/web_api.py Normal file
View File

@@ -0,0 +1,250 @@
import datetime
from bson import ObjectId
from flask import Blueprint, request, jsonify
from app.utils.jwt_utils import verify_token, create_token
from services.auth_service import verify_user_credentials, get_users_with_search
from app.decorators import require_maintainer_permission
from app.extensions import generate_numeric_id, client, logger, config_loader
web_api_bp = Blueprint("web_api", __name__)
@web_api_bp.route('/web-api/login', methods=['POST'])
def web_api_login():
"""Web管理端登录"""
data = request.get_json()
email = data.get('email', '')
password = data.get('password', '')
# 验证用户凭据
user = verify_user_credentials(email, password)
if not user:
logger.warning(f"Invalid web login attempt for email: {email}")
return jsonify({
"code": 1,
"message": "Invalid email or password",
"data": None
})
# 创建token
access_token = create_token(str(user['_id']))
logger.info(f"Web user logged in: {email}")
return jsonify({
"code": 0,
"message": "success",
"data": {
"access_token": access_token,
"expires_in": config_loader.JWT_EXPIRATION_HOURS * 3600
}
})
# 公告管理API
@web_api_bp.route('/web-api/announcement', methods=['POST'])
@require_maintainer_permission
def web_api_create_announcement():
"""创建公告"""
data = request.get_json()
# 验证必需字段
if not all(k in data for k in ['Title', 'Content', 'Locale']):
return jsonify({
"code": 1,
"message": "Missing required fields: Title, Content, Locale",
"data": None
}), 400
# 生成公告ID
announcement_id = int(generate_numeric_id(8))
# 创建公告对象
announcement = {
"Id": announcement_id,
"Title": data['Title'],
"Content": data['Content'],
"Severity": data.get('Severity', 0), # 默认为Informational
"Link": data.get('Link', ''),
"Locale": data['Locale'],
"LastUpdateTime": int(datetime.datetime.now().timestamp()),
"MaxPresentVersion": data.get('MaxPresentVersion', None),
"CreatedBy": str(request.current_user['_id']),
"CreatedAt": datetime.datetime.utcnow(),
# 发行版名称,用于给不同的发行版显示不同的公告,默认为空字符串,表示所有发行版
"Distribution": data.get('Distribution', '')
}
# 插入数据库
result = client.ht_server.announcement.insert_one(announcement)
if result.inserted_id:
logger.info(f"Announcement created with ID: {announcement_id} by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Announcement created successfully",
"data": {
"Id": announcement_id
}
})
else:
logger.error("Failed to create announcement")
return jsonify({
"code": 2,
"message": "Failed to create announcement",
"data": None
}), 500
@web_api_bp.route('/web-api/announcement/<int:announcement_id>', methods=['PUT'])
@require_maintainer_permission
def web_api_update_announcement(announcement_id):
"""编辑公告"""
data = request.get_json()
# 检查公告是否存在
existing_announcement = client.ht_server.announcement.find_one({"Id": announcement_id})
if not existing_announcement:
return jsonify({
"code": 1,
"message": "Announcement not found",
"data": None
}), 404
# 更新字段
update_data = {
"LastUpdateTime": int(datetime.datetime.now().timestamp()),
"UpdatedBy": str(request.current_user['_id']),
"UpdatedAt": datetime.datetime.utcnow()
}
# 只更新提供的字段
if 'Title' in data:
update_data["Title"] = data['Title']
if 'Content' in data:
update_data["Content"] = data['Content']
if 'Severity' in data:
update_data["Severity"] = data['Severity']
if 'Link' in data:
update_data["Link"] = data['Link']
if 'Locale' in data:
update_data["Locale"] = data['Locale']
if 'MaxPresentVersion' in data:
update_data["MaxPresentVersion"] = data['MaxPresentVersion']
if 'Distribution' in data:
update_data["Distribution"] = data['Distribution']
# 执行更新
result = client.ht_server.announcement.update_one(
{"Id": announcement_id},
{"$set": update_data}
)
if result.modified_count > 0:
logger.info(f"Announcement {announcement_id} updated by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Announcement updated successfully",
"data": None
})
else:
logger.warning(f"No changes made to announcement {announcement_id}")
return jsonify({
"code": 2,
"message": "No changes made",
"data": None
})
@web_api_bp.route('/web-api/announcement/<int:announcement_id>', methods=['DELETE'])
@require_maintainer_permission
def web_api_delete_announcement(announcement_id):
"""删除公告"""
# 检查公告是否存在
existing_announcement = client.ht_server.announcement.find_one({"Id": announcement_id})
if not existing_announcement:
return jsonify({
"code": 1,
"message": "Announcement not found",
"data": None
}), 404
# 执行删除
result = client.ht_server.announcement.delete_one({"Id": announcement_id})
if result.deleted_count > 0:
logger.info(f"Announcement {announcement_id} deleted by user: {request.current_user['email']}")
return jsonify({
"code": 0,
"message": "Announcement deleted successfully",
"data": None
})
else:
logger.error(f"Failed to delete announcement {announcement_id}")
return jsonify({
"code": 2,
"message": "Failed to delete announcement",
"data": None
}), 500
@web_api_bp.route('/web-api/announcement/<int:announcement_id>', methods=['GET'])
@require_maintainer_permission
def web_api_get_announcement(announcement_id):
"""获取单个公告详情"""
# 查询公告
announcement = client.ht_server.announcement.find_one({"Id": announcement_id})
if not announcement:
return jsonify({
"code": 1,
"message": "Announcement not found",
"data": None
}), 404
# 移除MongoDB的_id字段
announcement.pop('_id', None)
return jsonify({
"code": 0,
"message": "success",
"data": announcement
})
@web_api_bp.route('/web-api/users', methods=['GET'])
def web_api_get_users():
"""获取所有用户列表需要验证token并且需要高权限"""
# 获取用户信息
token = request.headers.get('Authorization', '').replace('Bearer ', '')
user_id = verify_token(token)
if not user_id:
logger.warning("Invalid or expired token")
return jsonify({
"code": 1,
"message": "Invalid or expired token",
"data": None
}), 401
# 检查用户是否具有高权限
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
if not user or not (user.get("IsMaintainer", False) and user.get("IsLicensedDeveloper", False)):
logger.warning(f"User {user_id} does not have required permissions")
logger.debug(f"User details: {user}")
return jsonify({
"code": 2,
"message": "Insufficient permissions",
"data": None
}), 403
# 获取搜索参数
q = request.args.get("q", "").strip()
users = get_users_with_search(q)
return jsonify({
"code": 0,
"message": "success",
"data": users
})

21
run.py Normal file
View File

@@ -0,0 +1,21 @@
from app.init import create_app
from app.config_loader import config_loader
import sentry_sdk
sentry_sdk.init(
dsn="https://d1cad1d2b442cf8431df3ee4bab925e0@o4507525750521856.ingest.us.sentry.io/4510623668830208",
# Add data like request headers and IP for users,
# see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
send_default_pii=True,
traces_sample_rate=1.0,
)
# 创建应用实例
app = create_app()
if __name__ == '__main__':
app.run(
host=config_loader.SERVER_HOST,
port=config_loader.SERVER_PORT,
debug=config_loader.SERVER_DEBUG
)

View File

@@ -0,0 +1,19 @@
from app.extensions import client, logger
from app.config import Config
def get_announcements(request_data: list):
if Config.ISTEST_MODE:
return []
# 记录请求体到日志请求体中是用户已关闭的公告ID列表
logger.debug("Request body: %s", request_data)
announcements = list(client.ht_server.announcement.find({}))
result = []
for a in announcements:
# 拷贝并移除 _id 字段,避免 ObjectId 无法序列化
a = dict(a)
a.pop('_id', None)
# 如果请求体中包含该公告ID说明用户已关闭该公告不返回该公告
if a.get('Id') not in request_data:
result.append(a)
return result

152
services/auth_service.py Normal file
View File

@@ -0,0 +1,152 @@
from bson import ObjectId
from werkzeug.security import generate_password_hash, check_password_hash
from app.extensions import client, logger
from app.config import Config
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from app.config_loader import config_loader
from datetime import timezone
from zoneinfo import ZoneInfo
import datetime
import SendEmailTool
import re
import base64
def decrypt_data(encrypted_data):
"""使用RSA私钥解密数据"""
try:
private_key_file = config_loader.RSA_PRIVATE_KEY_FILE
with open(private_key_file, 'r') as f:
private_key = RSA.import_key(f.read())
cipher = PKCS1_OAEP.new(private_key)
decrypted_data = cipher.decrypt(base64.b64decode(encrypted_data))
return decrypted_data.decode()
except Exception as e:
logger.error(f"Decryption error: {e}")
raise
def send_verification_email(email, code):
"""发送验证码邮件"""
try:
subject = "Snap Hutao 验证码"
body = f"您的验证码是: {code}"
SendEmailTool.send_email(
SendEmailTool.gmail_user,
SendEmailTool.app_password,
email,
subject,
body
)
logger.info(f"Verification email sent to {email}")
return True
except Exception as e:
logger.error(f"Failed to send email: {e}")
return False
def verify_user_credentials(email, password):
"""验证用户凭据"""
user = client.ht_server.users.find_one({"email": email})
if not user or not check_password_hash(user['password'], password):
return None
return user
def create_user_account(email, password):
"""创建新用户账户"""
# 检查用户是否已存在
existing_user = client.ht_server.users.find_one({"email": email})
if existing_user:
return None
# 对密码进行哈希处理
hashed_password = generate_password_hash(password)
# 创建新用户
new_user = {
"email": email,
"password": hashed_password,
"NormalizedUserName": email,
"UserName": email,
"CreatedAt": datetime.datetime.utcnow(),
"IsLicensedDeveloper": False,
"IsMaintainer": False,
# 现在默认用户的上传权限不过期
"GachaLogExpireAt": "2099-01-01T00:00:00Z",
"CdnExpireAt": "2099-01-01T00:00:00Z"
}
result = client.ht_server.users.insert_one(new_user)
new_user['_id'] = result.inserted_id
return new_user
def get_user_by_id(user_id):
"""根据ID获取用户信息"""
try:
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
if user:
user['_id'] = str(user['_id'])
return user
except Exception as e:
logger.error(f"Error retrieving user by ID: {e}")
return None
def get_users_with_search(query_text=""):
"""获取用户列表,支持搜索"""
import re
# 构建查询条件
query = {}
or_conditions = []
if query_text:
# 用户名模糊搜索
or_conditions.append({
"UserName": {"$regex": re.escape(query_text), "$options": "i"}
})
# 邮箱模糊搜索
or_conditions.append({
"email": {"$regex": re.escape(query_text), "$options": "i"}
})
# _id 搜索(支持完整或前缀)
if ObjectId.is_valid(query_text):
or_conditions.append({
"_id": ObjectId(query_text)
})
query = {"$or": or_conditions}
# 查询数据库(排除密码)
cursor = client.ht_server.users.find(query, {"password": 0})
# 去重(按 _id
users_map = {}
for u in cursor:
users_map[str(u["_id"])] = u
users = list(users_map.values())
# 数据格式化
CST = ZoneInfo("Asia/Shanghai")
for u in users:
u['_id'] = str(u['_id'])
created_at = u.get("CreatedAt")
if created_at:
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
created_at_cst = created_at.astimezone(CST)
u["CreatedAt"] = created_at_cst.strftime("%Y-%m-%d %H:%M:%S")
return users

View File

@@ -0,0 +1,98 @@
from app.extensions import client, logger
def get_gacha_log_entries(user_id):
"""获取用户的祈愿记录条目列表"""
gacha_logs = list(client.ht_server.GachaLog.find({"user_id": user_id}))
entries = []
for log in gacha_logs:
entry = {
"Uid": log['Uid'],
"Excluded": False,
"ItemCount": len(log['data'])
}
entries.append(entry)
return entries
def get_gacha_log_end_ids(user_id, uid):
"""获取指定 UID 用户的祈愿记录最新 ID"""
gacha_log = client.ht_server.GachaLog.find_one({"user_id": user_id, "Uid": uid})
if not gacha_log:
return {
"100": 0, # NoviceWish
"200": 0, # StandardWish
"301": 0, # AvatarEventWish
"302": 0, # WeaponEventWish
"500": 0 # ChronicledWish
}
# 计算各个祈愿类型的最新ID
end_ids = {
"100": 0,
"200": 0,
"301": 0,
"302": 0,
"500": 0
}
for item in gacha_log['data']:
gacha_type = str(item.get('GachaType', ''))
item_id = item.get('Id', 0)
if gacha_type in end_ids:
end_ids[gacha_type] = max(end_ids[gacha_type], item_id)
return end_ids
def upload_gacha_log(user_id, uid, items):
"""上传祈愿记录"""
# 查找是否已有该用户和UID的祈愿记录
existing_log = client.ht_server.GachaLog.find_one({"user_id": user_id, "Uid": uid})
if existing_log:
# 已有数据合并新旧数据按Id去重
old_items = existing_log.get('data', [])
# 用Id做索引先加旧的再加新的新数据覆盖旧数据
item_dict = {item.get('Id'): item for item in old_items}
for item in items:
item_dict[item.get('Id')] = item
merged_items = list(item_dict.values())
# 更新数据库
client.ht_server.GachaLog.update_one(
{"_id": existing_log['_id']},
{"$set": {"data": merged_items}}
)
return f"success, merged {len(items)} new items, total {len(merged_items)} items"
else:
# 没有数据,直接插入
gacha_log_entry = {
"user_id": user_id,
"Uid": uid,
"data": items
}
client.ht_server.GachaLog.insert_one(gacha_log_entry)
return f"success, uploaded {len(items)} items"
def retrieve_gacha_log(user_id, uid, end_ids):
"""从云端检索用户的祈愿记录数据"""
gacha_log = client.ht_server.GachaLog.find_one({"user_id": user_id, "Uid": uid})
if not gacha_log:
return []
# 筛选出比end_ids更旧的记录
filtered_items = []
for item in gacha_log['data']:
gacha_type = str(item.get('GachaType', ''))
item_id = item.get('Id', 0)
# end_ids有可能是0那么返回全部
if (gacha_type in end_ids and item_id < end_ids[gacha_type]) or end_ids.get(gacha_type, 0) == 0:
filtered_items.append(item)
return filtered_items
def delete_gacha_log(user_id, uid):
"""删除指定用户的祈愿记录"""
result = client.ht_server.GachaLog.delete_one({"user_id": user_id, "Uid": uid})
return result.deleted_count > 0