Compare commits

..

5 Commits

Author SHA1 Message Date
wangdage12
01f51d82cd Update README with deployment requirements
Added resource and environment requirements for deployment.
2026-02-07 21:56:24 +08:00
wangdage12
c6004bec96 Merge pull request #8 from wangdage12/dev
添加下载资源和下载资源管理功能
2026-02-05 22:06:13 +08:00
wangdage12
6e09869df0 Merge pull request #7 from wangdage12/dev
添加类型注释、修复抽卡id问题导致的返回异常和刷新token的问题
2026-02-03 12:25:01 +08:00
wangdage12
40bd74c101 Merge pull request #5 from wangdage12/dev
更新验证码的存储方式、支持html验证码邮件
2026-01-31 15:22:35 +08:00
wangdage12
f2ca58182e Merge pull request #1 from wangdage12/dev
初始版本
2026-01-25 19:58:39 +08:00
5 changed files with 22 additions and 121 deletions

View File

@@ -3,6 +3,15 @@ Snap.Hutao新后端API
## 部署方法 ## 部署方法
> **资源和环境要求**
> 服务器硬件:
> 最低1核CPU1GB内存
>
> 运行环境:
> `Windows10`及以上、`Windows Server 2019`及以上、`Linux`
> `Python3.12`及以上
> `MongoDB`
### 在服务器生成RSA密钥 ### 在服务器生成RSA密钥
执行以下代码在根目录生成密钥: 执行以下代码在根目录生成密钥:

View File

@@ -21,10 +21,8 @@ def get_public_download_resources():
获取下载资源列表公开API 获取下载资源列表公开API
可选查询参数: 可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回所有 - package_type: 包类型 (msi/msix),不传则返回所有
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
""" """
package_type = request.args.get('package_type') package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数 # 验证package_type参数
if package_type and package_type not in ['msi', 'msix']: if package_type and package_type not in ['msi', 'msix']:
@@ -34,13 +32,8 @@ def get_public_download_resources():
"data": None "data": None
}), 400 }), 400
# 处理is_test参数默认只返回正式版本
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
# 只返回激活的资源 # 只返回激活的资源
resources = get_download_resources(package_type=package_type, is_active=True, is_test=is_test) resources = get_download_resources(package_type=package_type, is_active=True)
return jsonify({ return jsonify({
"code": 0, "code": 0,
@@ -55,10 +48,8 @@ def get_latest_download_resource():
获取最新版本公开API 获取最新版本公开API
可选查询参数: 可选查询参数:
- package_type: 包类型 (msi/msix),不传则返回最新的任意类型 - package_type: 包类型 (msi/msix),不传则返回最新的任意类型
- is_test: 是否包含测试版本 (true/false),不传则只返回正式版本
""" """
package_type = request.args.get('package_type') package_type = request.args.get('package_type')
is_test_str = request.args.get('is_test')
# 验证package_type参数 # 验证package_type参数
if package_type and package_type not in ['msi', 'msix']: if package_type and package_type not in ['msi', 'msix']:
@@ -68,12 +59,7 @@ def get_latest_download_resource():
"data": None "data": None
}), 400 }), 400
# 处理is_test参数 resource = get_latest_version(package_type=package_type)
is_test = False
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resource = get_latest_version(package_type=package_type, is_test=is_test)
if resource: if resource:
return jsonify({ return jsonify({
@@ -144,7 +130,6 @@ def web_api_get_download_resources():
"""获取下载资源列表(管理端,包含所有资源,包括未激活的)""" """获取下载资源列表(管理端,包含所有资源,包括未激活的)"""
package_type = request.args.get('package_type') package_type = request.args.get('package_type')
is_active_str = request.args.get('is_active') is_active_str = request.args.get('is_active')
is_test_str = request.args.get('is_test')
# 验证package_type参数 # 验证package_type参数
if package_type and package_type not in ['msi', 'msix']: if package_type and package_type not in ['msi', 'msix']:
@@ -159,12 +144,7 @@ def web_api_get_download_resources():
if is_active_str is not None: if is_active_str is not None:
is_active = is_active_str.lower() == 'true' is_active = is_active_str.lower() == 'true'
# 处理is_test参数 resources = get_download_resources(package_type=package_type, is_active=is_active)
is_test = None
if is_test_str is not None:
is_test = is_test_str.lower() == 'true'
resources = get_download_resources(package_type=package_type, is_active=is_active, is_test=is_test)
return jsonify({ return jsonify({
"code": 0, "code": 0,

View File

@@ -239,15 +239,9 @@ def web_api_get_users():
"data": None "data": None
}), 403 }), 403
# 获取查询参数 # 获取搜索参数
q = request.args.get("q", "").strip() q = request.args.get("q", "").strip()
role = request.args.get("role", "").strip() if request.args.get("role") else None users = get_users_with_search(q)
email = request.args.get("email", "").strip() if request.args.get("email") else None
username = request.args.get("username", "").strip() if request.args.get("username") else None
id_param = request.args.get("id", "").strip() if request.args.get("id") else None
is_licensed = request.args.get("is", "").strip() if request.args.get("is") else None
users = get_users_with_search(q, role, email, username, id_param, is_licensed)
return jsonify({ return jsonify({
"code": 0, "code": 0,

View File

@@ -203,82 +203,32 @@ def get_user_by_id(user_id: str) -> dict | None:
return None return None
def get_users_with_search(query_text="", role=None, email=None, username=None, id=None, is_licensed=None) -> list: def get_users_with_search(query_text="") -> list:
"""获取用户列表,支持多种筛选条件""" """获取用户列表,支持搜索"""
import re import re
# 构建查询条件 # 构建查询条件
query = {} query = {}
and_conditions = []
# 通用搜索q 参数)- 匹配用户名、邮箱、ID
if query_text:
or_conditions = [] or_conditions = []
if query_text:
# 用户名模糊搜索 # 用户名模糊搜索
or_conditions.append({ or_conditions.append({
"UserName": {"$regex": re.escape(query_text), "$options": "i"} "UserName": {"$regex": re.escape(query_text), "$options": "i"}
}) })
# 邮箱模糊搜索 # 邮箱模糊搜索
or_conditions.append({ or_conditions.append({
"email": {"$regex": re.escape(query_text), "$options": "i"} "email": {"$regex": re.escape(query_text), "$options": "i"}
}) })
# _id 搜索(支持完整或前缀) # _id 搜索(支持完整或前缀)
if ObjectId.is_valid(query_text): if ObjectId.is_valid(query_text):
or_conditions.append({ or_conditions.append({
"_id": ObjectId(query_text) "_id": ObjectId(query_text)
}) })
and_conditions.append({"$or": or_conditions})
# 按角色筛选 query = {"$or": or_conditions}
if role:
if role == "maintainer":
and_conditions.append({"IsMaintainer": True})
elif role == "developer":
and_conditions.append({"IsLicensedDeveloper": True})
elif role == "user":
# user 表示既不是 maintainer 也不是 developer
and_conditions.append({
"$and": [
{"IsMaintainer": {"$ne": True}},
{"IsLicensedDeveloper": {"$ne": True}}
]
})
# 按邮箱筛选(支持模糊匹配)
if email:
and_conditions.append({
"email": {"$regex": re.escape(email), "$options": "i"}
})
# 按用户名筛选(支持模糊匹配)
if username:
and_conditions.append({
"UserName": {"$regex": re.escape(username), "$options": "i"}
})
# 按用户ID筛选支持模糊匹配
if id:
if ObjectId.is_valid(id):
and_conditions.append({"_id": ObjectId(id)})
else:
# 如果不是有效的 ObjectId尝试匹配字符串形式的 _id
and_conditions.append({
"_id": {"$regex": re.escape(id), "$options": "i"}
})
# 按状态筛选
if is_licensed:
if is_licensed == "licensed":
and_conditions.append({"IsLicensedDeveloper": True})
elif is_licensed == "not-licensed":
and_conditions.append({"IsLicensedDeveloper": False})
# 构建 AND 查询
if and_conditions:
if len(and_conditions) == 1:
query = and_conditions[0]
else:
query = {"$and": and_conditions}
# 查询数据库(排除密码) # 查询数据库(排除密码)
cursor = client.ht_server.users.find(query, {"password": 0}) cursor = client.ht_server.users.find(query, {"password": 0})

View File

@@ -15,7 +15,6 @@ def create_download_resource(data):
- file_size: 文件大小 (可选) - file_size: 文件大小 (可选)
- file_hash: 文件哈希 (可选) - file_hash: 文件哈希 (可选)
- is_active: 是否激活 (可选默认为True) - is_active: 是否激活 (可选默认为True)
- is_test: 是否为测试版本 (可选默认为False)
:return: 创建的资源ID或None :return: 创建的资源ID或None
""" """
try: try:
@@ -27,7 +26,6 @@ def create_download_resource(data):
"file_size": data.get('file_size'), "file_size": data.get('file_size'),
"file_hash": data.get('file_hash'), "file_hash": data.get('file_hash'),
"is_active": data.get('is_active', True), "is_active": data.get('is_active', True),
"is_test": data.get('is_test', False),
"created_at": datetime.datetime.utcnow(), "created_at": datetime.datetime.utcnow(),
"created_by": data.get('created_by') "created_by": data.get('created_by')
} }
@@ -40,13 +38,12 @@ def create_download_resource(data):
return None return None
def get_download_resources(package_type=None, is_active=None, is_test=None): def get_download_resources(package_type=None, is_active=None):
""" """
获取下载资源列表 获取下载资源列表
:param package_type: 包类型过滤 (msi/msix)None表示获取所有 :param package_type: 包类型过滤 (msi/msix)None表示获取所有
:param is_active: 是否激活过滤None表示获取所有 :param is_active: 是否激活过滤None表示获取所有
:param is_test: 是否为测试版本过滤None表示获取所有
:return: 资源列表 :return: 资源列表
""" """
try: try:
@@ -55,15 +52,6 @@ def get_download_resources(package_type=None, is_active=None, is_test=None):
query['package_type'] = package_type query['package_type'] = package_type
if is_active is not None: if is_active is not None:
query['is_active'] = is_active query['is_active'] = is_active
if is_test is not None:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
if is_test:
query['is_test'] = True
else:
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
resources = list(client.ht_server.download_resources.find(query, sort=[("created_at", -1)])) resources = list(client.ht_server.download_resources.find(query, sort=[("created_at", -1)]))
@@ -73,9 +61,6 @@ def get_download_resources(package_type=None, is_active=None, is_test=None):
r = dict(r) r = dict(r)
# 转换_id为字符串并存为id字段 # 转换_id为字符串并存为id字段
r['id'] = str(r.pop('_id')) r['id'] = str(r.pop('_id'))
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in r:
r['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串 # 转换datetime为配置时区的ISO格式字符串
if 'created_at' in r and isinstance(r['created_at'], datetime.datetime): if 'created_at' in r and isinstance(r['created_at'], datetime.datetime):
dt = r['created_at'].replace(tzinfo=datetime.timezone.utc) dt = r['created_at'].replace(tzinfo=datetime.timezone.utc)
@@ -107,9 +92,6 @@ def get_download_resource_by_id(resource_id):
if resource: if resource:
resource = dict(resource) resource = dict(resource)
resource.pop('_id', None) resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串 # 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime): if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc) dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)
@@ -154,8 +136,6 @@ def update_download_resource(resource_id, data):
update_data['file_hash'] = data['file_hash'] update_data['file_hash'] = data['file_hash']
if 'is_active' in data: if 'is_active' in data:
update_data['is_active'] = data['is_active'] update_data['is_active'] = data['is_active']
if 'is_test' in data:
update_data['is_test'] = data['is_test']
if 'updated_by' in data: if 'updated_by' in data:
update_data['updated_by'] = data['updated_by'] update_data['updated_by'] = data['updated_by']
@@ -193,24 +173,15 @@ def delete_download_resource(resource_id):
return False return False
def get_latest_version(package_type=None, is_test=False): def get_latest_version(package_type=None):
""" """
获取最新版本 获取最新版本
:param package_type: 包类型 (msi/msix)None表示获取所有类型的最新版本 :param package_type: 包类型 (msi/msix)None表示获取所有类型的最新版本
:param is_test: 是否包含测试版本默认为False只返回正式版本
:return: 资源对象或None :return: 资源对象或None
""" """
try: try:
query = {"is_active": True} query = {"is_active": True}
if not is_test:
# 如果查询非测试版本,需要包含 is_test=false 或 is_test 字段不存在的记录
query['$or'] = [
{'is_test': False},
{'is_test': {'$exists': False}}
]
else:
query['is_test'] = True
if package_type: if package_type:
query['package_type'] = package_type query['package_type'] = package_type
@@ -222,9 +193,6 @@ def get_latest_version(package_type=None, is_test=False):
if resource: if resource:
resource = dict(resource) resource = dict(resource)
resource.pop('_id', None) resource.pop('_id', None)
# 如果 is_test 字段不存在,默认设置为 False
if 'is_test' not in resource:
resource['is_test'] = False
# 转换datetime为配置时区的ISO格式字符串 # 转换datetime为配置时区的ISO格式字符串
if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime): if 'created_at' in resource and isinstance(resource['created_at'], datetime.datetime):
dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc) dt = resource['created_at'].replace(tzinfo=datetime.timezone.utc)