from bson import ObjectId from werkzeug.security import generate_password_hash, check_password_hash from app.extensions import client, logger from app.config import Config from app.config_loader import config_loader from Crypto.Cipher import PKCS1_OAEP from Crypto.PublicKey import RSA from datetime import timezone from zoneinfo import ZoneInfo import datetime import SendEmailTool import re import base64 def decrypt_data(encrypted_data: str) -> str: """使用RSA私钥解密数据""" try: private_key_file = config_loader.RSA_PRIVATE_KEY_FILE with open(private_key_file, 'r') as f: private_key = RSA.import_key(f.read()) cipher = PKCS1_OAEP.new(private_key) decrypted_data = cipher.decrypt(base64.b64decode(encrypted_data)) return decrypted_data.decode() except Exception as e: logger.error(f"Decryption error: {e}") raise def send_verification_email(email: str, code: str, ACTION_NAME="注册", EXPIRE_MINUTES=None) -> bool: """发送验证码邮件,目前只有注册场景,后续再扩展其他场景""" try: subject = Config.EMAIL_SUBJECT textbody = f"您的验证码是: {code}" APP_NAME = Config.EMAIL_APP_NAME OFFICIAL_WEBSITE = Config.EMAIL_OFFICIAL_WEBSITE if EXPIRE_MINUTES is None: EXPIRE_MINUTES = Config.VERIFICATION_CODE_EXPIRE_MINUTES htmlbody = f""" 验证码邮件
{APP_NAME}
你正在进行 {ACTION_NAME} 操作,请使用以下验证码完成验证:
{code}
验证码有效期 {EXPIRE_MINUTES} 分钟,请勿泄露给他人。

本邮件由 {APP_NAME} 系统自动发送,请勿回复
如非本人操作,请忽略本邮件
访问官网
""" try: SendEmailTool.send_email( config_loader.EMAIL_GMAIL_USER, config_loader.EMAIL_APP_PASSWORD, email, subject, htmlbody, app_name=APP_NAME, body_type="html" ) logger.info(f"HTML Verification email sent to {email}") except Exception as e: logger.error(f"Failed to send HTML verification email to {email}: {e}") # 如果 HTML 邮件发送失败,尝试发送纯文本邮件 SendEmailTool.send_email( config_loader.EMAIL_GMAIL_USER, config_loader.EMAIL_APP_PASSWORD, email, subject, textbody, app_name=APP_NAME, body_type="plain" ) logger.info(f"Verification email sent to {email}") return True except Exception as e: logger.error(f"Failed to send email: {e}") return False def verify_user_credentials(email: str, password: str) -> dict | None: """验证用户凭据""" user = client.ht_server.users.find_one({"email": email}) if not user or not check_password_hash(user['password'], password): return None return user def create_user_account(email: str, password: str) -> dict | None: """创建新用户账户""" # 检查用户是否已存在 existing_user = client.ht_server.users.find_one({"email": email}) if existing_user: return None # 对密码进行哈希处理 hashed_password = generate_password_hash(password) # 创建新用户 new_user = { "email": email, "password": hashed_password, "NormalizedUserName": email, "UserName": email, "CreatedAt": datetime.datetime.utcnow(), "IsLicensedDeveloper": False, "IsMaintainer": False, # 现在默认用户的上传权限不过期 "GachaLogExpireAt": "2099-01-01T00:00:00Z", "CdnExpireAt": "2099-01-01T00:00:00Z" } result = client.ht_server.users.insert_one(new_user) new_user['_id'] = result.inserted_id return new_user def get_user_by_id(user_id: str) -> dict | None: """根据ID获取用户信息""" try: user = client.ht_server.users.find_one({"_id": ObjectId(user_id)}) if user: user['_id'] = str(user['_id']) return user except Exception as e: logger.error(f"Error retrieving user by ID: {e}") return None def get_users_with_search(query_text="", role=None, email=None, username=None, id=None, is_licensed=None) -> list: """获取用户列表,支持多种筛选条件""" import re # 构建查询条件 query = {} and_conditions = [] # 通用搜索(q 参数)- 匹配用户名、邮箱、ID if query_text: or_conditions = [] # 用户名模糊搜索 or_conditions.append({ "UserName": {"$regex": re.escape(query_text), "$options": "i"} }) # 邮箱模糊搜索 or_conditions.append({ "email": {"$regex": re.escape(query_text), "$options": "i"} }) # _id 搜索(支持完整或前缀) if ObjectId.is_valid(query_text): or_conditions.append({ "_id": ObjectId(query_text) }) and_conditions.append({"$or": or_conditions}) # 按角色筛选 if role: if role == "maintainer": and_conditions.append({"IsMaintainer": True}) elif role == "developer": and_conditions.append({"IsLicensedDeveloper": True}) elif role == "user": # user 表示既不是 maintainer 也不是 developer and_conditions.append({ "$and": [ {"IsMaintainer": {"$ne": True}}, {"IsLicensedDeveloper": {"$ne": True}} ] }) # 按邮箱筛选(支持模糊匹配) if email: and_conditions.append({ "email": {"$regex": re.escape(email), "$options": "i"} }) # 按用户名筛选(支持模糊匹配) if username: and_conditions.append({ "UserName": {"$regex": re.escape(username), "$options": "i"} }) # 按用户ID筛选(支持模糊匹配) if id: if ObjectId.is_valid(id): and_conditions.append({"_id": ObjectId(id)}) else: # 如果不是有效的 ObjectId,尝试匹配字符串形式的 _id and_conditions.append({ "_id": {"$regex": re.escape(id), "$options": "i"} }) # 按状态筛选 if is_licensed: if is_licensed == "licensed": and_conditions.append({"IsLicensedDeveloper": True}) elif is_licensed == "not-licensed": and_conditions.append({"IsLicensedDeveloper": False}) # 构建 AND 查询 if and_conditions: if len(and_conditions) == 1: query = and_conditions[0] else: query = {"$and": and_conditions} # 查询数据库(排除密码) cursor = client.ht_server.users.find(query, {"password": 0}) # 去重(按 _id) users_map = {} for u in cursor: users_map[str(u["_id"])] = u users = list(users_map.values()) # 数据格式化 CST = ZoneInfo("Asia/Shanghai") for u in users: u['_id'] = str(u['_id']) created_at = u.get("CreatedAt") if created_at: if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) created_at_cst = created_at.astimezone(CST) u["CreatedAt"] = created_at_cst.strftime("%Y-%m-%d %H:%M:%S") return users