diff --git a/routes/download_resource.py b/routes/download_resource.py index 4344d1a..2ee99b4 100644 --- a/routes/download_resource.py +++ b/routes/download_resource.py @@ -21,8 +21,10 @@ def get_public_download_resources(): 获取下载资源列表(公开API) 可选查询参数: - package_type: 包类型 (msi/msix),不传则返回所有 + - is_test: 是否包含测试版本 (true/false),不传则只返回正式版本 """ package_type = request.args.get('package_type') + is_test_str = request.args.get('is_test') # 验证package_type参数 if package_type and package_type not in ['msi', 'msix']: @@ -32,8 +34,13 @@ def get_public_download_resources(): "data": None }), 400 + # 处理is_test参数,默认只返回正式版本 + is_test = False + if is_test_str is not None: + is_test = is_test_str.lower() == 'true' + # 只返回激活的资源 - resources = get_download_resources(package_type=package_type, is_active=True) + resources = get_download_resources(package_type=package_type, is_active=True, is_test=is_test) return jsonify({ "code": 0, @@ -48,8 +55,10 @@ def get_latest_download_resource(): 获取最新版本(公开API) 可选查询参数: - package_type: 包类型 (msi/msix),不传则返回最新的任意类型 + - is_test: 是否包含测试版本 (true/false),不传则只返回正式版本 """ package_type = request.args.get('package_type') + is_test_str = request.args.get('is_test') # 验证package_type参数 if package_type and package_type not in ['msi', 'msix']: @@ -59,7 +68,12 @@ def get_latest_download_resource(): "data": None }), 400 - resource = get_latest_version(package_type=package_type) + # 处理is_test参数 + is_test = False + if is_test_str is not None: + is_test = is_test_str.lower() == 'true' + + resource = get_latest_version(package_type=package_type, is_test=is_test) if resource: return jsonify({ @@ -130,6 +144,7 @@ def web_api_get_download_resources(): """获取下载资源列表(管理端,包含所有资源,包括未激活的)""" package_type = request.args.get('package_type') is_active_str = request.args.get('is_active') + is_test_str = request.args.get('is_test') # 验证package_type参数 if package_type and package_type not in ['msi', 'msix']: @@ -144,7 +159,12 @@ def web_api_get_download_resources(): if is_active_str is not None: is_active = is_active_str.lower() == 'true' - resources = get_download_resources(package_type=package_type, is_active=is_active) + # 处理is_test参数 + is_test = None + if is_test_str is not None: + is_test = is_test_str.lower() == 'true' + + resources = get_download_resources(package_type=package_type, is_active=is_active, is_test=is_test) return jsonify({ "code": 0, diff --git a/routes/web_api.py b/routes/web_api.py index 3c41e07..da59b55 100644 --- a/routes/web_api.py +++ b/routes/web_api.py @@ -239,9 +239,15 @@ def web_api_get_users(): "data": None }), 403 - # 获取搜索参数 + # 获取查询参数 q = request.args.get("q", "").strip() - users = get_users_with_search(q) + role = request.args.get("role", "").strip() if request.args.get("role") else None + email = request.args.get("email", "").strip() if request.args.get("email") else None + username = request.args.get("username", "").strip() if request.args.get("username") else None + id_param = request.args.get("id", "").strip() if request.args.get("id") else None + is_licensed = request.args.get("is", "").strip() if request.args.get("is") else None + + users = get_users_with_search(q, role, email, username, id_param, is_licensed) return jsonify({ "code": 0, diff --git a/services/auth_service.py b/services/auth_service.py index 7b5d9dc..db55114 100644 --- a/services/auth_service.py +++ b/services/auth_service.py @@ -203,32 +203,82 @@ def get_user_by_id(user_id: str) -> dict | None: return None -def get_users_with_search(query_text="") -> list: - """获取用户列表,支持搜索""" +def get_users_with_search(query_text="", role=None, email=None, username=None, id=None, is_licensed=None) -> list: + """获取用户列表,支持多种筛选条件""" import re # 构建查询条件 query = {} - or_conditions = [] + and_conditions = [] + # 通用搜索(q 参数)- 匹配用户名、邮箱、ID if query_text: + or_conditions = [] # 用户名模糊搜索 or_conditions.append({ "UserName": {"$regex": re.escape(query_text), "$options": "i"} }) - # 邮箱模糊搜索 or_conditions.append({ "email": {"$regex": re.escape(query_text), "$options": "i"} }) - # _id 搜索(支持完整或前缀) if ObjectId.is_valid(query_text): or_conditions.append({ "_id": ObjectId(query_text) }) + and_conditions.append({"$or": or_conditions}) - query = {"$or": or_conditions} + # 按角色筛选 + if role: + if role == "maintainer": + and_conditions.append({"IsMaintainer": True}) + elif role == "developer": + and_conditions.append({"IsLicensedDeveloper": True}) + elif role == "user": + # user 表示既不是 maintainer 也不是 developer + and_conditions.append({ + "$and": [ + {"IsMaintainer": {"$ne": True}}, + {"IsLicensedDeveloper": {"$ne": True}} + ] + }) + + # 按邮箱筛选(支持模糊匹配) + if email: + and_conditions.append({ + "email": {"$regex": re.escape(email), "$options": "i"} + }) + + # 按用户名筛选(支持模糊匹配) + if username: + and_conditions.append({ + "UserName": {"$regex": re.escape(username), "$options": "i"} + }) + + # 按用户ID筛选(支持模糊匹配) + if id: + if ObjectId.is_valid(id): + and_conditions.append({"_id": ObjectId(id)}) + else: + # 如果不是有效的 ObjectId,尝试匹配字符串形式的 _id + and_conditions.append({ + "_id": {"$regex": re.escape(id), "$options": "i"} + }) + + # 按状态筛选 + if is_licensed: + if is_licensed == "licensed": + and_conditions.append({"IsLicensedDeveloper": True}) + elif is_licensed == "not-licensed": + and_conditions.append({"IsLicensedDeveloper": False}) + + # 构建 AND 查询 + if and_conditions: + if len(and_conditions) == 1: + query = and_conditions[0] + else: + query = {"$and": and_conditions} # 查询数据库(排除密码) cursor = client.ht_server.users.find(query, {"password": 0}) diff --git a/services/download_resource_service.py b/services/download_resource_service.py index d180cb7..f50dea0 100644 --- a/services/download_resource_service.py +++ b/services/download_resource_service.py @@ -15,6 +15,7 @@ def create_download_resource(data): - file_size: 文件大小 (可选) - file_hash: 文件哈希 (可选) - is_active: 是否激活 (可选,默认为True) + - is_test: 是否为测试版本 (可选,默认为False) :return: 创建的资源ID或None """ try: @@ -26,6 +27,7 @@ def create_download_resource(data): "file_size": data.get('file_size'), "file_hash": data.get('file_hash'), "is_active": data.get('is_active', True), + "is_test": data.get('is_test', False), "created_at": datetime.datetime.utcnow(), "created_by": data.get('created_by') } @@ -38,12 +40,13 @@ def create_download_resource(data): return None -def get_download_resources(package_type=None, is_active=None): +def get_download_resources(package_type=None, is_active=None, is_test=None): """ 获取下载资源列表 :param package_type: 包类型过滤 (msi/msix),None表示获取所有 :param is_active: 是否激活过滤,None表示获取所有 + :param is_test: 是否为测试版本过滤,None表示获取所有 :return: 资源列表 """ try: @@ -52,6 +55,15 @@ def get_download_resources(package_type=None, is_active=None): query['package_type'] = package_type if is_active is not None: query['is_active'] = is_active + if is_test is not None: + # 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录 + if is_test: + query['is_test'] = True + else: + query['$or'] = [ + {'is_test': False}, + {'is_test': {'$exists': False}} + ] resources = list(client.ht_server.download_resources.find(query, sort=[("created_at", -1)])) @@ -61,6 +73,9 @@ def get_download_resources(package_type=None, is_active=None): r = dict(r) # 转换_id为字符串并存为id字段 r['id'] = str(r.pop('_id')) + # 如果 is_test 字段不存在,默认设置为 False + if 'is_test' not in r: + r['is_test'] = False # 转换datetime为配置时区的ISO格式字符串 if 'created_at' in r and isinstance(r['created_at'], datetime.datetime): dt = r['created_at'].replace(tzinfo=datetime.timezone.utc) @@ -92,6 +107,9 @@ def get_download_resource_by_id(resource_id): if resource: resource = dict(resource) resource.pop('_id', None) + # 如果 is_test 字段不存在,默认设置为 False + if 'is_test' not in resource: + resource['is_test'] = False # 转换datetime为配置时区的ISO格式字符串 if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime): dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc) @@ -136,6 +154,8 @@ def update_download_resource(resource_id, data): update_data['file_hash'] = data['file_hash'] if 'is_active' in data: update_data['is_active'] = data['is_active'] + if 'is_test' in data: + update_data['is_test'] = data['is_test'] if 'updated_by' in data: update_data['updated_by'] = data['updated_by'] @@ -173,15 +193,24 @@ def delete_download_resource(resource_id): return False -def get_latest_version(package_type=None): +def get_latest_version(package_type=None, is_test=False): """ 获取最新版本 :param package_type: 包类型 (msi/msix),None表示获取所有类型的最新版本 + :param is_test: 是否包含测试版本,默认为False(只返回正式版本) :return: 资源对象或None """ try: query = {"is_active": True} + if not is_test: + # 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录 + query['$or'] = [ + {'is_test': False}, + {'is_test': {'$exists': False}} + ] + else: + query['is_test'] = True if package_type: query['package_type'] = package_type @@ -193,6 +222,9 @@ def get_latest_version(package_type=None): if resource: resource = dict(resource) resource.pop('_id', None) + # 如果 is_test 字段不存在,默认设置为 False + if 'is_test' not in resource: + resource['is_test'] = False # 转换datetime为配置时区的ISO格式字符串 if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime): dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)