同步操作将从 BidingCC/码多多 全能知识库(Python 版) 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# +----------------------------------------------------------------------# | ChatWork智能聊天办公系统# +----------------------------------------------------------------------# | 软件声明: 本系统并非自由软件,未经授权任何形式的商业使用均属非法。# | 版权保护: 任何企业和个人不允许对程序代码以任何形式任何目的复制/分发。# | 授权要求: 如有商业使用需求,请务必先与版权所有者取得联系并获得正式授权。# +----------------------------------------------------------------------# | Author: ChatWork Team <2474369941@qq.com># +----------------------------------------------------------------------import osimport timeimport inspectimport importlibimport operatorfrom functools import reducefrom typing import Union, TypeVar, Type, Any, List, Tuplefrom pydantic import TypeAdapterfrom tortoise.models import Modelfrom tortoise.queryset import Qfrom hypertext import PagingResult__all__ = ["DbModel"]class DbModel(Model):""" Model base class """MODEL = TypeVar("MODEL", bound="Model")DB_CONFIGS = {}class Meta:abstract = True@classmethodasync def paginate(cls, model: Type[MODEL],page_no: int = 1,page_size: int = 15,schema: Any = None,fields: List = None,auto_timestamp: bool = True,datetime_field: List = None,datetime_format: str = "%Y-%m-%d %H:%M:%S",):fields = [] if not fields else fields_count = await model.count()_lists = await model.limit(page_size).offset((page_no - 1) * page_size).values(*fields)for item in _lists:if auto_timestamp:tf = datetime_field if datetime_field else ["create_time", "update_time", "delete_time"]for s in tf:if item.get(s) is not None and not item.get(s):item[s] = ""elif item.get(s):time_array = time.localtime(item.get(s))item[s] = time.strftime(datetime_format, time_array)if schema:_lists = [TypeAdapter(schema).validate_python(item) for item in _lists]return PagingResult.create(_lists, _count, page_no, page_size)@classmethoddef without_field(cls, fields: Union[str, List, Tuple]):_fields = []_values = fields.split(",") if isinstance(fields, str) else fieldsfor field in cls._meta.fields:if field not in _values:_fields.append(field.strip())return _fields@classmethoddef build_search(cls, search: dict, params: dict):factor = {"=": "",">": "gt","<": "lt",">=": "__gte","<=": "__lte","<>": "__not","%like%": "__contains","%ilike%": "__icontains","like%": "__startswith","ilike%": "__istartswith","%like": "__endswith","%ilike": "__iendswith","isnull": "__isnull","not_isnull": "__not_isnull","iexact": "__iexact","search": "__search","year": "__year","month": "__month","day": "__day",}where = []for whereType, whereFields in search.items():for whereField in whereFields:w = whereField.split("@")key = w[0] if len(w) >= 2 else whereFieldfield = w[1] if len(w) >= 2 else whereFieldif factor.get(whereType) or whereType == "=":if params.get(key, None) is None or params.get(key, None) == "":continuesuffix = factor.get(whereType)fields = field.split("|")if len(fields) > 1:or_where = []for f in fields:or_where.append(Q(**{(f.strip())+suffix: params.get(key)}))where.append(reduce(operator.or_, or_where))else:where.append(Q(**{field+suffix: params.get(key)}))elif whereType == "datetime":d = key.split("|")s_: str = d[0] if len(d) >= 2 else keye_: str = d[1] if len(d) >= 2 else None# StartTimeif s_ and params.get(s_):value = params.get(s_)if not is_number(value):formats = "%Y-%m-%d %H:%M:%S" if len(value.split(" ")) >= 2 else "%Y-%m-%d"time_array = time.strptime(value, formats)value = int(time.mktime(time_array))where.append(Q(**{field + "__gte": value}))# EndTimeif e_ and params.get(e_):value = params.get(e_)if not is_number(value):formats = "%Y-%m-%d %H:%M:%S" if len(value.split(" ")) >= 2 else "%Y-%m-%d"time_array = time.strptime(value, formats)value = int(time.mktime(time_array))where.append(Q(**{field + "__lte": value}))return where@classmethoddef table_prefix(cls, table: str = "", engine: str = None):if not cls.DB_CONFIGS:cls.DB_CONFIGS = loading_db_configs()if not cls.DB_CONFIGS or not cls.DB_CONFIGS.get("connections") or not cls.DB_CONFIGS.get("apps"):return tableapps = cls.DB_CONFIGS.get("apps", {})if engine is None:if not cls.__module__.startswith("kernels.model"):for key in apps.keys():if cls.__module__.startswith(apps[key].get("models")):engine = keybreakif engine is None:caller_file_path = inspect.stack()[1].filenamecaller_pack_path = caller_file_path.replace(os.sep + caller_file_path.split(os.sep)[-1], "")system_root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + os.sepmodule_apps_path = caller_pack_path.replace(system_root_path, "").replace(os.sep, ".")engine = next((key for key in apps.keys() if apps[key].get("models") == module_apps_path), None)conn = cls.DB_CONFIGS.get("connections", {}).get(engine, {})return conn.get("prefix", "") + tabledef is_number(s: any) -> bool:try:float(s)return Trueexcept ValueError:return Falsedef loading_db_configs():""" Load Db configuration """configs = {}try:package = importlib.import_module("config")clz = getattr(package, "GlobalSetting", None)if not clz:return configsobj = clz().dict()db_config = obj.get("DATABASES", {})return db_configexcept ModuleNotFoundError:return configs
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。