157 lines
3.5 KiB
Python
157 lines
3.5 KiB
Python
import asyncio
|
|
import platform
|
|
import socket
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
import psutil
|
|
import pytz
|
|
|
|
|
|
# 后台操作
|
|
def async_new_task(func, *args, **kwargs):
|
|
threading.Thread(target=func, args=args, kwargs=kwargs, daemon=True).start()
|
|
|
|
|
|
async def async_task(func, *args, **kwargs):
|
|
return await asyncio.to_thread(func, *args, **kwargs)
|
|
|
|
|
|
def format_datetime(dt: datetime, tz="Asia/Shanghai"):
|
|
if dt.tzinfo is None:
|
|
dt = pytz.UTC.localize(dt)
|
|
tz_obj = pytz.timezone(tz)
|
|
dt = dt.astimezone(tz_obj)
|
|
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def safe_round(value, ndigits=2, default=None):
|
|
return round(value, ndigits) if value is not None else default
|
|
|
|
|
|
def build_dept_tree(depts):
|
|
dept_map = {d["id"]: d for d in depts}
|
|
|
|
roots = []
|
|
|
|
for d in depts:
|
|
pid = d["pid"]
|
|
if pid and pid in dept_map:
|
|
dept_map[pid]["children"].append(d)
|
|
else:
|
|
# pid 为 0 或不存在 → 顶层
|
|
roots.append(d)
|
|
|
|
return roots
|
|
|
|
|
|
def build_menu_tree(items):
|
|
item_map = {item["id"]: item for item in items}
|
|
tree = []
|
|
|
|
for item in items:
|
|
pid = item["pid"]
|
|
if pid and pid in item_map:
|
|
item_map[pid]["children"].append(item)
|
|
else:
|
|
tree.append(item)
|
|
|
|
return tree
|
|
|
|
|
|
from uuid import UUID
|
|
|
|
|
|
def is_valid_uuid(value: str):
|
|
try:
|
|
UUID(value)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
def get_local_ip():
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("8.8.8.8", 80))
|
|
ip = s.getsockname()[0]
|
|
s.close()
|
|
return ip
|
|
except Exception:
|
|
return "127.0.0.1"
|
|
|
|
|
|
def get_mac_address():
|
|
# 获取第一个网卡的 MAC
|
|
for iface, addrs in psutil.net_if_addrs().items():
|
|
for addr in addrs:
|
|
if addr.family == psutil.AF_LINK:
|
|
return addr.address
|
|
return "00:00:00:00:00:00"
|
|
|
|
|
|
def get_cpu_info():
|
|
return platform.processor()
|
|
|
|
|
|
def get_memory_total():
|
|
return psutil.virtual_memory().total
|
|
|
|
|
|
def get_disk_total():
|
|
return psutil.disk_usage("/").total
|
|
|
|
|
|
def translate_vehicle_type(vehicle_type: str) -> str:
|
|
mapping = {
|
|
"coupe": "双门跑车",
|
|
"largevehicle": "大型车辆",
|
|
"sedan": "三厢轿车",
|
|
"suv": "运动型多用途车",
|
|
"truck": "卡车",
|
|
"van": "面包车",
|
|
}
|
|
|
|
return mapping.get(vehicle_type.lower(), vehicle_type)
|
|
|
|
|
|
import re
|
|
|
|
|
|
def validate_plate(plate: str) -> str:
|
|
if not plate:
|
|
return "车速过快,无法识别"
|
|
|
|
plate = plate.strip().upper()
|
|
|
|
# 普通车牌(燃油)
|
|
normal_pattern = r"^[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼][A-Z][A-Z0-9]{5}$"
|
|
|
|
# 新能源车
|
|
new_energy_pattern = r"^[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼][A-Z][A-Z0-9]{6}$"
|
|
|
|
# 武警车
|
|
wj_pattern = (
|
|
r"^WJ[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼]?\d{5}$"
|
|
)
|
|
|
|
# 外交车
|
|
diplomatic_pattern = r"^(使|领)\d{5}$"
|
|
|
|
# 港澳车辆
|
|
hk_macau_pattern = r"^粤Z[A-Z0-9]{4}(港|澳)$"
|
|
|
|
patterns = [
|
|
normal_pattern,
|
|
new_energy_pattern,
|
|
wj_pattern,
|
|
diplomatic_pattern,
|
|
hk_macau_pattern,
|
|
]
|
|
|
|
for p in patterns:
|
|
if re.match(p, plate):
|
|
return plate
|
|
|
|
return "车速过快,无法识别"
|