mirror of
https://github.com/wangdage12/Snap.Server.git
synced 2026-02-17 08:52:10 +08:00
初始提交
This commit is contained in:
0
app/__init__.py
Normal file
0
app/__init__.py
Normal file
8
app/config.py
Normal file
8
app/config.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from app.config_loader import config_loader
|
||||
|
||||
# 使用配置加载器提供兼容的接口
|
||||
class Config:
|
||||
SECRET_KEY = config_loader.SECRET_KEY
|
||||
MONGO_URI = config_loader.MONGO_URI
|
||||
TIMEZONE = config_loader.TIMEZONE
|
||||
ISTEST_MODE = config_loader.ISTEST_MODE
|
||||
100
app/config_loader.py
Normal file
100
app/config_loader.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import json
|
||||
import os
|
||||
from zoneinfo import ZoneInfo
|
||||
from typing import Dict, Any
|
||||
|
||||
class ConfigLoader:
|
||||
def __init__(self, config_file: str = 'config.json'):
|
||||
self.config_file = config_file
|
||||
self._config = None
|
||||
|
||||
def load_config(self) -> Dict[str, Any]:
|
||||
"""加载 JSON 配置文件"""
|
||||
if self._config is None:
|
||||
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), self.config_file)
|
||||
try:
|
||||
with open(config_path, 'r', encoding='utf-8') as f:
|
||||
self._config = json.load(f)
|
||||
except FileNotFoundError:
|
||||
raise FileNotFoundError(f"配置文件 {config_path} 不存在")
|
||||
except json.JSONDecodeError as e:
|
||||
raise ValueError(f"配置文件格式错误: {e}")
|
||||
|
||||
return self._config
|
||||
|
||||
def get(self, key: str, default=None):
|
||||
"""获取配置值,支持点号分隔的嵌套键"""
|
||||
config = self.load_config()
|
||||
keys = key.split('.')
|
||||
value = config
|
||||
|
||||
try:
|
||||
for k in keys:
|
||||
value = value[k]
|
||||
return value
|
||||
except (KeyError, TypeError):
|
||||
return default
|
||||
|
||||
@property
|
||||
def SECRET_KEY(self) -> str:
|
||||
return self.get('SECRET_KEY')
|
||||
|
||||
@property
|
||||
def MONGO_URI(self) -> str:
|
||||
return self.get('MONGO_URI')
|
||||
|
||||
@property
|
||||
def TIMEZONE(self) -> ZoneInfo:
|
||||
timezone_str = self.get('TIMEZONE', 'Asia/Shanghai')
|
||||
return ZoneInfo(timezone_str)
|
||||
|
||||
@property
|
||||
def ISTEST_MODE(self) -> bool:
|
||||
return self.get('ISTEST_MODE', False)
|
||||
|
||||
@property
|
||||
def SERVER_HOST(self) -> str:
|
||||
return self.get('SERVER.HOST', '0.0.0.0')
|
||||
|
||||
@property
|
||||
def SERVER_PORT(self) -> int:
|
||||
return self.get('SERVER.PORT', 5222)
|
||||
|
||||
@property
|
||||
def SERVER_DEBUG(self) -> bool:
|
||||
return self.get('SERVER.DEBUG', False)
|
||||
|
||||
@property
|
||||
def JWT_ALGORITHM(self) -> str:
|
||||
return self.get('JWT.ALGORITHM', 'HS256')
|
||||
|
||||
@property
|
||||
def JWT_EXPIRATION_HOURS(self) -> int:
|
||||
return self.get('JWT.EXPIRATION_HOURS', 24)
|
||||
|
||||
@property
|
||||
def EMAIL_GMAIL_USER(self) -> str:
|
||||
return self.get('EMAIL.GMAIL_USER')
|
||||
|
||||
@property
|
||||
def EMAIL_APP_PASSWORD(self) -> str:
|
||||
return self.get('EMAIL.APP_PASSWORD')
|
||||
|
||||
@property
|
||||
def RSA_PRIVATE_KEY_FILE(self) -> str:
|
||||
return self.get('RSA.PRIVATE_KEY_FILE', 'private.pem')
|
||||
|
||||
@property
|
||||
def RSA_PUBLIC_KEY_FILE(self) -> str:
|
||||
return self.get('RSA.PUBLIC_KEY_FILE', 'public.pem')
|
||||
|
||||
@property
|
||||
def LOGGING_LEVEL(self) -> str:
|
||||
return self.get('LOGGING.LEVEL', 'DEBUG')
|
||||
|
||||
@property
|
||||
def LOGGING_FORMAT(self) -> str:
|
||||
return self.get('LOGGING.FORMAT', '%(asctime)s %(name)s %(levelname)s %(message)s')
|
||||
|
||||
# 创建全局配置实例
|
||||
config_loader = ConfigLoader()
|
||||
22
app/decorators.py
Normal file
22
app/decorators.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from flask import request, jsonify
|
||||
from bson import ObjectId
|
||||
from app.extensions import client, logger
|
||||
from app.utils.jwt_utils import verify_token
|
||||
|
||||
def require_maintainer_permission(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
||||
user_id = verify_token(token)
|
||||
|
||||
if not user_id:
|
||||
return jsonify({"code": 1, "message": "Invalid token"}), 401
|
||||
|
||||
user = client.ht_server.users.find_one({"_id": ObjectId(user_id)})
|
||||
if not user or not user.get("IsMaintainer", False):
|
||||
return jsonify({"code": 2, "message": "Permission denied"}), 403
|
||||
|
||||
request.current_user = user
|
||||
return f(*args, **kwargs)
|
||||
|
||||
wrapper.__name__ = f.__name__
|
||||
return wrapper
|
||||
35
app/extensions.py
Normal file
35
app/extensions.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import logging
|
||||
import coloredlogs
|
||||
import secrets
|
||||
import string
|
||||
from pymongo.mongo_client import MongoClient
|
||||
from pymongo.server_api import ServerApi
|
||||
from app.config_loader import config_loader
|
||||
|
||||
logger = logging.getLogger("app")
|
||||
coloredlogs.install(level=config_loader.LOGGING_LEVEL, logger=logger, fmt=config_loader.LOGGING_FORMAT)
|
||||
|
||||
client = None
|
||||
|
||||
def init_mongo(uri: str, test_mode=False):
|
||||
global client
|
||||
if test_mode:
|
||||
logger.info("Running in test mode, skipping MongoDB connection")
|
||||
return
|
||||
|
||||
client = MongoClient(uri, server_api=ServerApi('1'))
|
||||
|
||||
try:
|
||||
client.admin.command('ping')
|
||||
logger.info("MongoDB connected successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"MongoDB connection failed: {e}")
|
||||
raise
|
||||
|
||||
def generate_code(length=6):
|
||||
"""生成数字验证码"""
|
||||
return ''.join(secrets.choice('0123456789') for _ in range(length))
|
||||
|
||||
def generate_numeric_id(length=8):
|
||||
"""生成数字ID"""
|
||||
return ''.join(secrets.choice(string.digits) for _ in range(length))
|
||||
33
app/init.py
Normal file
33
app/init.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from flask import Flask
|
||||
from app.config import Config
|
||||
from app.extensions import init_mongo
|
||||
|
||||
def create_app():
|
||||
app = Flask(__name__)
|
||||
app.config.from_object(Config)
|
||||
app.secret_key = Config.SECRET_KEY
|
||||
|
||||
init_mongo(Config.MONGO_URI, Config.ISTEST_MODE)
|
||||
|
||||
# 注册蓝图
|
||||
from routes.announcement import announcement_bp
|
||||
from routes.auth import auth_bp
|
||||
from routes.gacha_log import gacha_log_bp
|
||||
from routes.web_api import web_api_bp
|
||||
from routes.misc import misc_bp
|
||||
|
||||
app.register_blueprint(announcement_bp, url_prefix="/Announcement")
|
||||
app.register_blueprint(auth_bp)
|
||||
app.register_blueprint(gacha_log_bp)
|
||||
app.register_blueprint(web_api_bp)
|
||||
app.register_blueprint(misc_bp)
|
||||
|
||||
# CORS
|
||||
@app.after_request
|
||||
def after_request(response):
|
||||
response.headers.add('Access-Control-Allow-Origin', '*')
|
||||
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
|
||||
response.headers.add('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS')
|
||||
return response
|
||||
|
||||
return app
|
||||
18
app/utils/jwt_utils.py
Normal file
18
app/utils/jwt_utils.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import jwt
|
||||
import datetime
|
||||
from flask import current_app
|
||||
from app.config_loader import config_loader
|
||||
|
||||
def create_token(user_id):
|
||||
payload = {
|
||||
"user_id": user_id,
|
||||
"exp": datetime.datetime.utcnow() + datetime.timedelta(hours=config_loader.JWT_EXPIRATION_HOURS)
|
||||
}
|
||||
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm=config_loader.JWT_ALGORITHM)
|
||||
|
||||
def verify_token(token):
|
||||
try:
|
||||
data = jwt.decode(token, current_app.config["SECRET_KEY"], algorithms=[config_loader.JWT_ALGORITHM])
|
||||
return data["user_id"]
|
||||
except:
|
||||
return None
|
||||
Reference in New Issue
Block a user