from uuid import UUID from config.minIO import get_temp_url from config.pgDb import pg_pool from utils import format_datetime def get_all_exchange_records(): """获取 annual_meeting_exchange 表所有记录""" with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, gift_code, name, created_at, is_finished, sort FROM annual_meeting_exchange ORDER BY sort """ ) rows = cur.fetchall() return [ { "id": row[0], "gift_code": row[1], "name": row[2], "created_at": row[3], "is_finished": row[4], "sort": row[5], } for row in rows ] import random import time def reset_all_exchange_status(): """将所有记录 is_finished 置为 False,并随机 position(以当前时间作为随机种子)""" with pg_pool.getConn() as conn: with conn.cursor() as cur: # 获取总记录数 cur.execute("SELECT id FROM annual_meeting_exchange") ids = [row[0] for row in cur.fetchall()] # 用当前时间戳作为随机种子 seed = int(time.time() * 1000) # 毫秒级 random.seed(seed) # 生成随机顺序的 position positions = list(range(1, len(ids) + 1)) random.shuffle(positions) # 更新每条记录 for record_id, pos in zip(ids, positions): cur.execute( """ UPDATE annual_meeting_exchange SET is_finished = FALSE, sort = %s WHERE id = %s """, (pos, record_id), ) conn.commit() return {"updated_count": len(ids), "seed_used": seed} def reset_user_status(target_user_id: str): """重置指定用户的 is_finished""" with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE annual_meeting_exchange SET is_finished = True WHERE id = %s """, (target_user_id,), ) conn.commit() return {"updated": cur.rowcount} def get_all_lottery(): """获取 annual_meeting_lottery 全部数据""" with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( """ SELECT id, name, is_opened, oss, created_at, remark, sort FROM annual_meeting_lottery ORDER BY sort ASC """ ) rows = cur.fetchall() return [ { "id": row[0], "name": row[1], "is_opened": 1 if row[2] else 0, "oss_url": get_temp_url("image-annual-lottery", row[3]), "created_at": format_datetime(row[4]), "remark": row[5], "sort": row[6], } for row in rows ] def add_lottery(name: str, sort: int, oss: str, is_opened: bool, remark: str): """新增礼品""" with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO annual_meeting_lottery (id, name, sort, oss, is_opened, remark, created_at) VALUES (gen_random_uuid(), %s, %s, %s, %s, %s, now()) RETURNING id """, (name, sort, oss, is_opened, remark), ) new_id = cur.fetchone()[0] conn.commit() return {"id": new_id} def update_lottery( id: str, name: str = None, sort: int = None, oss: str = None, is_opened: bool = None, remark: str = None, ): """更新礼品信息""" update_fields = [] params = [] if name is not None: update_fields.append("name = %s") params.append(name) if sort is not None: update_fields.append("sort = %s") params.append(sort) if oss is not None: update_fields.append("oss = %s") params.append(oss) if is_opened is not None: update_fields.append("is_opened = %s") params.append(is_opened) if remark is not None: update_fields.append("remark = %s") params.append(remark) if not update_fields: return {"updated": 0} params.append(id) sql = f""" UPDATE annual_meeting_lottery SET {", ".join(update_fields)} WHERE id = %s """ with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute(sql, tuple(params)) conn.commit() return {"updated": cur.rowcount} def delete_lottery(id: str): """删除礼品""" with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( """ DELETE FROM annual_meeting_lottery WHERE id = %s """, (id,), ) conn.commit() return {"deleted": cur.rowcount} def reset_lottery_item(item_id: UUID): """将单个奖品标记为已开启""" with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE annual_meeting_lottery SET is_opened = TRUE WHERE id = %s """, (item_id,), ) conn.commit() return cur.rowcount def reset_all_lottery_db(): """将所有奖品 is_opened 置为 False""" with pg_pool.getConn() as conn: with conn.cursor() as cur: cur.execute( """ UPDATE annual_meeting_lottery SET is_opened = FALSE """ ) conn.commit() return cur.rowcount