Compare commits

..

3 Commits

Author SHA1 Message Date
wangdage12
3d2a620820 Update README with API and server details
Added information about API services, server infrastructure, and usage recommendations for metadata repositories.
2026-02-12 23:26:24 +08:00
wangdage12
fb977fdeb5 Merge pull request #9 from wangdage12/dev
支持测试版本的资源筛选,增强用户列表的查询功能
2026-02-10 20:26:23 +08:00
fanbook-wangdage
036b64c845 支持测试版本的资源筛选,增强用户列表的查询功能 2026-02-10 20:14:37 +08:00
5 changed files with 138 additions and 15 deletions

View File

@@ -120,12 +120,27 @@ pip install -r requirements.txt && python -m gunicorn run:app --bind 0.0.0.0:522
请根据服务器性能调整`--workers``--threads`参数。
### API文档
### API文档和官方开放平台
API文档可以在该地址访问
**API文档可以在该地址访问**
https://rdgm3wrj7r.apifox.cn/
> 项目官方API和文件资源服务、仓库镜像为各开源项目免费提供
本项目官方API地址https://htserver.wdg.cloudns.ch/api/
<img width="901" height="522" alt="服务拓补结构" src="https://github.com/user-attachments/assets/9cd2f0d5-372e-46b6-b64b-643df661b445" />
我们的项目官方服务采用多台服务器,其中日本的主服务器用来提供所有基础服务,甘肃的自建服务器提供加速和负载均衡,以确保服务稳定
元数据仓库镜像可以随时调用获取元数据仓库API`/git-repository/all`得到
由于Git仓库几乎无法被CDN缓存频繁拉取镜像仓库会对服务器造成压力请各位在使用量较大的情况下自建仓库镜像
甘肃服务器只能使用ipv6访问同时比较不稳定只能用于加速不要将它直接作为主要服务使用
甘肃服务器2为备用服务器计划在用户量较大时作为API以负载均衡的方式提供不在极端情况下不采用和仓库镜像使用
### 注意事项
在轻量使用的场景下可以直接使用MongoDB Atlas的免费套餐但在高并发场景下建议使用自建MongoDB服务器以获得更好的性能和稳定性。

View File

@@ -21,8 +21,10 @@ def get_public_download_resources():
获取下载资源列表公开API
可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回所有
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
"""
package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
@@ -32,8 +34,13 @@ def get_public_download_resources():
"data": None
}), 400
# 处理is_test参数默认只返回正式版本
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
# 只返回激活的资源
resources = get_download_resources(package_type=package_type, is_active=True)
resources = get_download_resources(package_type=package_type, is_active=True, is_test=is_test)
return jsonify({
"code": 0,
@@ -48,8 +55,10 @@ def get_latest_download_resource():
获取最新版本公开API
可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回最新的任意类型
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
"""
package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
@@ -59,7 +68,12 @@ def get_latest_download_resource():
"data": None
}), 400
resource = get_latest_version(package_type=package_type)
# 处理is_test参数
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resource = get_latest_version(package_type=package_type, is_test=is_test)
if resource:
return jsonify({
@@ -130,6 +144,7 @@ def web_api_get_download_resources():
"""获取下载资源列表(管理端,包含所有资源,包括未激活的)"""
package_type = request.args.get('package_type')
is_active_str = request.args.get('is_active')
is_test_str = request.args.get('is_test')
# 验证package_type参数
if package_type and package_type not in ['msi', 'msix']:
@@ -144,7 +159,12 @@ def web_api_get_download_resources():
if is_active_str is not None:
is_active = is_active_str.lower() == 'true'
resources = get_download_resources(package_type=package_type, is_active=is_active)
# 处理is_test参数
is_test = None
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resources = get_download_resources(package_type=package_type, is_active=is_active, is_test=is_test)
return jsonify({
"code": 0,

View File

@@ -239,9 +239,15 @@ def web_api_get_users():
"data": None
}), 403
# 获取搜索参数
# 获取查询参数
q = request.args.get("q", "").strip()
users = get_users_with_search(q)
role = request.args.get("role", "").strip() if request.args.get("role") else None
email = request.args.get("email", "").strip() if request.args.get("email") else None
username = request.args.get("username", "").strip() if request.args.get("username") else None
id_param = request.args.get("id", "").strip() if request.args.get("id") else None
is_licensed = request.args.get("is", "").strip() if request.args.get("is") else None
users = get_users_with_search(q, role, email, username, id_param, is_licensed)
return jsonify({
"code": 0,

View File

@@ -203,32 +203,82 @@ def get_user_by_id(user_id: str) -> dict | None:
return None
def get_users_with_search(query_text="") -> list:
"""获取用户列表,支持搜索"""
def get_users_with_search(query_text="", role=None, email=None, username=None, id=None, is_licensed=None) -> list:
"""获取用户列表,支持多种筛选条件"""
import re
# 构建查询条件
query = {}
or_conditions = []
and_conditions = []
# 通用搜索q 参数)- 匹配用户名、邮箱、ID
if query_text:
or_conditions = []
# 用户名模糊搜索
or_conditions.append({
"UserName": {"$regex": re.escape(query_text), "$options": "i"}
})
# 邮箱模糊搜索
or_conditions.append({
"email": {"$regex": re.escape(query_text), "$options": "i"}
})
# _id 搜索(支持完整或前缀)
if ObjectId.is_valid(query_text):
or_conditions.append({
"_id": ObjectId(query_text)
})
and_conditions.append({"$or": or_conditions})
query = {"$or": or_conditions}
# 按角色筛选
if role:
if role == "maintainer":
and_conditions.append({"IsMaintainer": True})
elif role == "developer":
and_conditions.append({"IsLicensedDeveloper": True})
elif role == "user":
# user 表示既不是 maintainer 也不是 developer
and_conditions.append({
"$and": [
{"IsMaintainer": {"$ne": True}},
{"IsLicensedDeveloper": {"$ne": True}}
]
})
# 按邮箱筛选(支持模糊匹配)
if email:
and_conditions.append({
"email": {"$regex": re.escape(email), "$options": "i"}
})
# 按用户名筛选(支持模糊匹配)
if username:
and_conditions.append({
"UserName": {"$regex": re.escape(username), "$options": "i"}
})
# 按用户ID筛选支持模糊匹配
if id:
if ObjectId.is_valid(id):
and_conditions.append({"_id": ObjectId(id)})
else:
# 如果不是有效的 ObjectId尝试匹配字符串形式的 _id
and_conditions.append({
"_id": {"$regex": re.escape(id), "$options": "i"}
})
# 按状态筛选
if is_licensed:
if is_licensed == "licensed":
and_conditions.append({"IsLicensedDeveloper": True})
elif is_licensed == "not-licensed":
and_conditions.append({"IsLicensedDeveloper": False})
# 构建 AND 查询
if and_conditions:
if len(and_conditions) == 1:
query = and_conditions[0]
else:
query = {"$and": and_conditions}
# 查询数据库(排除密码)
cursor = client.ht_server.users.find(query, {"password": 0})

View File

@@ -15,6 +15,7 @@ def create_download_resource(data):
- file_size: 文件大小 (可选)
- file_hash: 文件哈希 (可选)
- is_active: 是否激活 (可选默认为True)
- is_test: 是否为测试版本 (可选默认为False)
:return: 创建的资源ID或None
"""
try:
@@ -26,6 +27,7 @@ def create_download_resource(data):
"file_size": data.get('file_size'),
"file_hash": data.get('file_hash'),
"is_active": data.get('is_active', True),
"is_test": data.get('is_test', False),
"created_at": datetime.datetime.utcnow(),
"created_by": data.get('created_by')
}
@@ -38,12 +40,13 @@ def create_download_resource(data):
return None
def get_download_resources(package_type=None, is_active=None):
def get_download_resources(package_type=None, is_active=None, is_test=None):
"""
获取下载资源列表
:param package_type: 包类型过滤 (msi/msix)None表示获取所有
:param is_active: 是否激活过滤None表示获取所有
:param is_test: 是否为测试版本过滤None表示获取所有
:return: 资源列表
"""
try:
@@ -52,6 +55,15 @@ def get_download_resources(package_type=None, is_active=None):
query['package_type'] = package_type
if is_active is not None:
query['is_active'] = is_active
if is_test is not None:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
if is_test:
query['is_test'] = True
else:
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
resources = list(client.ht_server.download_resources.find(query, sort=[("created_at", -1)]))
@@ -61,6 +73,9 @@ def get_download_resources(package_type=None, is_active=None):
r = dict(r)
# 转换_id为字符串并存为id字段
r['id'] = str(r.pop('_id'))
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in r:
r['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in r and isinstance(r['created_at'], datetime.datetime):
dt = r['created_at'].replace(tzinfo=datetime.timezone.utc)
@@ -92,6 +107,9 @@ def get_download_resource_by_id(resource_id):
if resource:
resource = dict(resource)
resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
@@ -136,6 +154,8 @@ def update_download_resource(resource_id, data):
update_data['file_hash'] = data['file_hash']
if 'is_active' in data:
update_data['is_active'] = data['is_active']
if 'is_test' in data:
update_data['is_test'] = data['is_test']
if 'updated_by' in data:
update_data['updated_by'] = data['updated_by']
@@ -173,15 +193,24 @@ def delete_download_resource(resource_id):
return False
def get_latest_version(package_type=None):
def get_latest_version(package_type=None, is_test=False):
"""
获取最新版本
:param package_type: 包类型 (msi/msix)None表示获取所有类型的最新版本
:param is_test: 是否包含测试版本默认为False只返回正式版本
:return: 资源对象或None
"""
try:
query = {"is_active": True}
if not is_test:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
else:
query['is_test'] = True
if package_type:
query['package_type'] = package_type
@@ -193,6 +222,9 @@ def get_latest_version(package_type=None):
if resource:
resource = dict(resource)
resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)