from bson import ObjectId from werkzeug.security import generate_password_hash, check_password_hash from app.extensions import client, logger from app.config import Config from Crypto.Cipher import PKCS1_OAEP from Crypto.PublicKey import RSA from app.config_loader import config_loader from datetime import timezone from zoneinfo import ZoneInfo import datetime import SendEmailTool import re import base64 def decrypt_data(encrypted_data): """使用RSA私钥解密数据""" try: private_key_file = config_loader.RSA_PRIVATE_KEY_FILE with open(private_key_file, 'r') as f: private_key = RSA.import_key(f.read()) cipher = PKCS1_OAEP.new(private_key) decrypted_data = cipher.decrypt(base64.b64decode(encrypted_data)) return decrypted_data.decode() except Exception as e: logger.error(f"Decryption error: {e}") raise def send_verification_email(email, code): """发送验证码邮件""" try: subject = "Snap Hutao 验证码" body = f"您的验证码是: {code}" SendEmailTool.send_email( SendEmailTool.gmail_user, SendEmailTool.app_password, email, subject, body ) logger.info(f"Verification email sent to {email}") return True except Exception as e: logger.error(f"Failed to send email: {e}") return False def verify_user_credentials(email, password): """验证用户凭据""" user = client.ht_server.users.find_one({"email": email}) if not user or not check_password_hash(user['password'], password): return None return user def create_user_account(email, password): """创建新用户账户""" # 检查用户是否已存在 existing_user = client.ht_server.users.find_one({"email": email}) if existing_user: return None # 对密码进行哈希处理 hashed_password = generate_password_hash(password) # 创建新用户 new_user = { "email": email, "password": hashed_password, "NormalizedUserName": email, "UserName": email, "CreatedAt": datetime.datetime.utcnow(), "IsLicensedDeveloper": False, "IsMaintainer": False, # 现在默认用户的上传权限不过期 "GachaLogExpireAt": "2099-01-01T00:00:00Z", "CdnExpireAt": "2099-01-01T00:00:00Z" } result = client.ht_server.users.insert_one(new_user) new_user['_id'] = result.inserted_id return new_user def get_user_by_id(user_id): """根据ID获取用户信息""" try: user = client.ht_server.users.find_one({"_id": ObjectId(user_id)}) if user: user['_id'] = str(user['_id']) return user except Exception as e: logger.error(f"Error retrieving user by ID: {e}") return None def get_users_with_search(query_text=""): """获取用户列表,支持搜索""" import re # 构建查询条件 query = {} or_conditions = [] if query_text: # 用户名模糊搜索 or_conditions.append({ "UserName": {"$regex": re.escape(query_text), "$options": "i"} }) # 邮箱模糊搜索 or_conditions.append({ "email": {"$regex": re.escape(query_text), "$options": "i"} }) # _id 搜索(支持完整或前缀) if ObjectId.is_valid(query_text): or_conditions.append({ "_id": ObjectId(query_text) }) query = {"$or": or_conditions} # 查询数据库(排除密码) cursor = client.ht_server.users.find(query, {"password": 0}) # 去重(按 _id) users_map = {} for u in cursor: users_map[str(u["_id"])] = u users = list(users_map.values()) # 数据格式化 CST = ZoneInfo("Asia/Shanghai") for u in users: u['_id'] = str(u['_id']) created_at = u.get("CreatedAt") if created_at: if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) created_at_cst = created_at.astimezone(CST) u["CreatedAt"] = created_at_cst.strftime("%Y-%m-%d %H:%M:%S") return users