mirror of
https://github.com/wangdage12/Snap.Server.git
synced 2026-02-17 08:52:10 +08:00
Compare commits
1 Commits
01f51d82cd
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
036b64c845 |
@@ -21,8 +21,10 @@ def get_public_download_resources():
|
||||
获取下载资源列表(公开API)
|
||||
可选查询参数:
|
||||
- package_type: 包类型 (msi/msix),不传则返回所有
|
||||
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
|
||||
"""
|
||||
package_type = request.args.get('package_type')
|
||||
is_test_str = request.args.get('is_test')
|
||||
|
||||
# 验证package_type参数
|
||||
if package_type and package_type not in ['msi', 'msix']:
|
||||
@@ -32,8 +34,13 @@ def get_public_download_resources():
|
||||
"data": None
|
||||
}), 400
|
||||
|
||||
# 处理is_test参数,默认只返回正式版本
|
||||
is_test = False
|
||||
if is_test_str is not None:
|
||||
is_test = is_test_str.lower() == 'true'
|
||||
|
||||
# 只返回激活的资源
|
||||
resources = get_download_resources(package_type=package_type, is_active=True)
|
||||
resources = get_download_resources(package_type=package_type, is_active=True, is_test=is_test)
|
||||
|
||||
return jsonify({
|
||||
"code": 0,
|
||||
@@ -48,8 +55,10 @@ def get_latest_download_resource():
|
||||
获取最新版本(公开API)
|
||||
可选查询参数:
|
||||
- package_type: 包类型 (msi/msix),不传则返回最新的任意类型
|
||||
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
|
||||
"""
|
||||
package_type = request.args.get('package_type')
|
||||
is_test_str = request.args.get('is_test')
|
||||
|
||||
# 验证package_type参数
|
||||
if package_type and package_type not in ['msi', 'msix']:
|
||||
@@ -59,7 +68,12 @@ def get_latest_download_resource():
|
||||
"data": None
|
||||
}), 400
|
||||
|
||||
resource = get_latest_version(package_type=package_type)
|
||||
# 处理is_test参数
|
||||
is_test = False
|
||||
if is_test_str is not None:
|
||||
is_test = is_test_str.lower() == 'true'
|
||||
|
||||
resource = get_latest_version(package_type=package_type, is_test=is_test)
|
||||
|
||||
if resource:
|
||||
return jsonify({
|
||||
@@ -130,6 +144,7 @@ def web_api_get_download_resources():
|
||||
"""获取下载资源列表(管理端,包含所有资源,包括未激活的)"""
|
||||
package_type = request.args.get('package_type')
|
||||
is_active_str = request.args.get('is_active')
|
||||
is_test_str = request.args.get('is_test')
|
||||
|
||||
# 验证package_type参数
|
||||
if package_type and package_type not in ['msi', 'msix']:
|
||||
@@ -144,7 +159,12 @@ def web_api_get_download_resources():
|
||||
if is_active_str is not None:
|
||||
is_active = is_active_str.lower() == 'true'
|
||||
|
||||
resources = get_download_resources(package_type=package_type, is_active=is_active)
|
||||
# 处理is_test参数
|
||||
is_test = None
|
||||
if is_test_str is not None:
|
||||
is_test = is_test_str.lower() == 'true'
|
||||
|
||||
resources = get_download_resources(package_type=package_type, is_active=is_active, is_test=is_test)
|
||||
|
||||
return jsonify({
|
||||
"code": 0,
|
||||
|
||||
@@ -239,9 +239,15 @@ def web_api_get_users():
|
||||
"data": None
|
||||
}), 403
|
||||
|
||||
# 获取搜索参数
|
||||
# 获取查询参数
|
||||
q = request.args.get("q", "").strip()
|
||||
users = get_users_with_search(q)
|
||||
role = request.args.get("role", "").strip() if request.args.get("role") else None
|
||||
email = request.args.get("email", "").strip() if request.args.get("email") else None
|
||||
username = request.args.get("username", "").strip() if request.args.get("username") else None
|
||||
id_param = request.args.get("id", "").strip() if request.args.get("id") else None
|
||||
is_licensed = request.args.get("is", "").strip() if request.args.get("is") else None
|
||||
|
||||
users = get_users_with_search(q, role, email, username, id_param, is_licensed)
|
||||
|
||||
return jsonify({
|
||||
"code": 0,
|
||||
|
||||
@@ -203,32 +203,82 @@ def get_user_by_id(user_id: str) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
def get_users_with_search(query_text="") -> list:
|
||||
"""获取用户列表,支持搜索"""
|
||||
def get_users_with_search(query_text="", role=None, email=None, username=None, id=None, is_licensed=None) -> list:
|
||||
"""获取用户列表,支持多种筛选条件"""
|
||||
import re
|
||||
|
||||
# 构建查询条件
|
||||
query = {}
|
||||
or_conditions = []
|
||||
and_conditions = []
|
||||
|
||||
# 通用搜索(q 参数)- 匹配用户名、邮箱、ID
|
||||
if query_text:
|
||||
or_conditions = []
|
||||
# 用户名模糊搜索
|
||||
or_conditions.append({
|
||||
"UserName": {"$regex": re.escape(query_text), "$options": "i"}
|
||||
})
|
||||
|
||||
# 邮箱模糊搜索
|
||||
or_conditions.append({
|
||||
"email": {"$regex": re.escape(query_text), "$options": "i"}
|
||||
})
|
||||
|
||||
# _id 搜索(支持完整或前缀)
|
||||
if ObjectId.is_valid(query_text):
|
||||
or_conditions.append({
|
||||
"_id": ObjectId(query_text)
|
||||
})
|
||||
and_conditions.append({"$or": or_conditions})
|
||||
|
||||
query = {"$or": or_conditions}
|
||||
# 按角色筛选
|
||||
if role:
|
||||
if role == "maintainer":
|
||||
and_conditions.append({"IsMaintainer": True})
|
||||
elif role == "developer":
|
||||
and_conditions.append({"IsLicensedDeveloper": True})
|
||||
elif role == "user":
|
||||
# user 表示既不是 maintainer 也不是 developer
|
||||
and_conditions.append({
|
||||
"$and": [
|
||||
{"IsMaintainer": {"$ne": True}},
|
||||
{"IsLicensedDeveloper": {"$ne": True}}
|
||||
]
|
||||
})
|
||||
|
||||
# 按邮箱筛选(支持模糊匹配)
|
||||
if email:
|
||||
and_conditions.append({
|
||||
"email": {"$regex": re.escape(email), "$options": "i"}
|
||||
})
|
||||
|
||||
# 按用户名筛选(支持模糊匹配)
|
||||
if username:
|
||||
and_conditions.append({
|
||||
"UserName": {"$regex": re.escape(username), "$options": "i"}
|
||||
})
|
||||
|
||||
# 按用户ID筛选(支持模糊匹配)
|
||||
if id:
|
||||
if ObjectId.is_valid(id):
|
||||
and_conditions.append({"_id": ObjectId(id)})
|
||||
else:
|
||||
# 如果不是有效的 ObjectId,尝试匹配字符串形式的 _id
|
||||
and_conditions.append({
|
||||
"_id": {"$regex": re.escape(id), "$options": "i"}
|
||||
})
|
||||
|
||||
# 按状态筛选
|
||||
if is_licensed:
|
||||
if is_licensed == "licensed":
|
||||
and_conditions.append({"IsLicensedDeveloper": True})
|
||||
elif is_licensed == "not-licensed":
|
||||
and_conditions.append({"IsLicensedDeveloper": False})
|
||||
|
||||
# 构建 AND 查询
|
||||
if and_conditions:
|
||||
if len(and_conditions) == 1:
|
||||
query = and_conditions[0]
|
||||
else:
|
||||
query = {"$and": and_conditions}
|
||||
|
||||
# 查询数据库(排除密码)
|
||||
cursor = client.ht_server.users.find(query, {"password": 0})
|
||||
|
||||
@@ -15,6 +15,7 @@ def create_download_resource(data):
|
||||
- file_size: 文件大小 (可选)
|
||||
- file_hash: 文件哈希 (可选)
|
||||
- is_active: 是否激活 (可选,默认为True)
|
||||
- is_test: 是否为测试版本 (可选,默认为False)
|
||||
:return: 创建的资源ID或None
|
||||
"""
|
||||
try:
|
||||
@@ -26,6 +27,7 @@ def create_download_resource(data):
|
||||
"file_size": data.get('file_size'),
|
||||
"file_hash": data.get('file_hash'),
|
||||
"is_active": data.get('is_active', True),
|
||||
"is_test": data.get('is_test', False),
|
||||
"created_at": datetime.datetime.utcnow(),
|
||||
"created_by": data.get('created_by')
|
||||
}
|
||||
@@ -38,12 +40,13 @@ def create_download_resource(data):
|
||||
return None
|
||||
|
||||
|
||||
def get_download_resources(package_type=None, is_active=None):
|
||||
def get_download_resources(package_type=None, is_active=None, is_test=None):
|
||||
"""
|
||||
获取下载资源列表
|
||||
|
||||
:param package_type: 包类型过滤 (msi/msix),None表示获取所有
|
||||
:param is_active: 是否激活过滤,None表示获取所有
|
||||
:param is_test: 是否为测试版本过滤,None表示获取所有
|
||||
:return: 资源列表
|
||||
"""
|
||||
try:
|
||||
@@ -52,6 +55,15 @@ def get_download_resources(package_type=None, is_active=None):
|
||||
query['package_type'] = package_type
|
||||
if is_active is not None:
|
||||
query['is_active'] = is_active
|
||||
if is_test is not None:
|
||||
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
|
||||
if is_test:
|
||||
query['is_test'] = True
|
||||
else:
|
||||
query['$or'] = [
|
||||
{'is_test': False},
|
||||
{'is_test': {'$exists': False}}
|
||||
]
|
||||
|
||||
resources = list(client.ht_server.download_resources.find(query, sort=[("created_at", -1)]))
|
||||
|
||||
@@ -61,6 +73,9 @@ def get_download_resources(package_type=None, is_active=None):
|
||||
r = dict(r)
|
||||
# 转换_id为字符串并存为id字段
|
||||
r['id'] = str(r.pop('_id'))
|
||||
# 如果 is_test 字段不存在,默认设置为 False
|
||||
if 'is_test' not in r:
|
||||
r['is_test'] = False
|
||||
# 转换datetime为配置时区的ISO格式字符串
|
||||
if 'created_at' in r and isinstance(r['created_at'], datetime.datetime):
|
||||
dt = r['created_at'].replace(tzinfo=datetime.timezone.utc)
|
||||
@@ -92,6 +107,9 @@ def get_download_resource_by_id(resource_id):
|
||||
if resource:
|
||||
resource = dict(resource)
|
||||
resource.pop('_id', None)
|
||||
# 如果 is_test 字段不存在,默认设置为 False
|
||||
if 'is_test' not in resource:
|
||||
resource['is_test'] = False
|
||||
# 转换datetime为配置时区的ISO格式字符串
|
||||
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
|
||||
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
|
||||
@@ -136,6 +154,8 @@ def update_download_resource(resource_id, data):
|
||||
update_data['file_hash'] = data['file_hash']
|
||||
if 'is_active' in data:
|
||||
update_data['is_active'] = data['is_active']
|
||||
if 'is_test' in data:
|
||||
update_data['is_test'] = data['is_test']
|
||||
if 'updated_by' in data:
|
||||
update_data['updated_by'] = data['updated_by']
|
||||
|
||||
@@ -173,15 +193,24 @@ def delete_download_resource(resource_id):
|
||||
return False
|
||||
|
||||
|
||||
def get_latest_version(package_type=None):
|
||||
def get_latest_version(package_type=None, is_test=False):
|
||||
"""
|
||||
获取最新版本
|
||||
|
||||
:param package_type: 包类型 (msi/msix),None表示获取所有类型的最新版本
|
||||
:param is_test: 是否包含测试版本,默认为False(只返回正式版本)
|
||||
:return: 资源对象或None
|
||||
"""
|
||||
try:
|
||||
query = {"is_active": True}
|
||||
if not is_test:
|
||||
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
|
||||
query['$or'] = [
|
||||
{'is_test': False},
|
||||
{'is_test': {'$exists': False}}
|
||||
]
|
||||
else:
|
||||
query['is_test'] = True
|
||||
if package_type:
|
||||
query['package_type'] = package_type
|
||||
|
||||
@@ -193,6 +222,9 @@ def get_latest_version(package_type=None):
|
||||
if resource:
|
||||
resource = dict(resource)
|
||||
resource.pop('_id', None)
|
||||
# 如果 is_test 字段不存在,默认设置为 False
|
||||
if 'is_test' not in resource:
|
||||
resource['is_test'] = False
|
||||
# 转换datetime为配置时区的ISO格式字符串
|
||||
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
|
||||
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
|
||||
|
||||
Reference in New Issue
Block a user