From 62f9fd5b7fe3acecec7b0d20413510e9cfdc6dcc Mon Sep 17 00:00:00 2001 From: BBIT-Kai <2911862937@qq.com> Date: Mon, 25 May 2026 14:41:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E5=8D=87OpenAPI=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E9=AB=98=E5=B9=B6=E5=8F=91=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kotlin/com/bbit/ticket/Application.kt | 8 + .../ticket/dao/piaotong/BlueInvoiceDao.kt | 15 +- .../database/piaotong/OpenInvoiceTaskTable.kt | 62 ++ .../entity/openapi/OpenInvoiceTaskDto.kt | 57 ++ .../openapi/registerOpenBlueInvoiceRoutes.kt | 2 +- .../openapi/registerOpenInvoiceTaskRoutes.kt | 35 + .../registerOpenInvoiceTaskManageRoutes.kt | 49 ++ .../route/piaotong/registerPTInvoiceRoutes.kt | 14 +- .../service/openapi/OpenInvoiceTaskService.kt | 742 ++++++++++++++++++ .../ticket/service/piaotong/PTBlueService.kt | 23 +- .../bbit/ticket/utils/bootstrap/AppConfig.kt | 25 +- .../utils/bootstrap/DatabaseInitializer.kt | 4 + server/src/main/resources/application.yaml | 11 + web/src/api/piaotong/index.ts | 66 ++ web/src/features/statistics/openapi/index.vue | 384 ++++++++- 15 files changed, 1454 insertions(+), 43 deletions(-) create mode 100644 server/src/main/kotlin/com/bbit/ticket/database/piaotong/OpenInvoiceTaskTable.kt create mode 100644 server/src/main/kotlin/com/bbit/ticket/entity/openapi/OpenInvoiceTaskDto.kt create mode 100644 server/src/main/kotlin/com/bbit/ticket/route/openapi/registerOpenInvoiceTaskRoutes.kt create mode 100644 server/src/main/kotlin/com/bbit/ticket/route/piaotong/registerOpenInvoiceTaskManageRoutes.kt create mode 100644 server/src/main/kotlin/com/bbit/ticket/service/openapi/OpenInvoiceTaskService.kt diff --git a/server/src/main/kotlin/com/bbit/ticket/Application.kt b/server/src/main/kotlin/com/bbit/ticket/Application.kt index 2a60d6a..b26b944 100644 --- a/server/src/main/kotlin/com/bbit/ticket/Application.kt +++ b/server/src/main/kotlin/com/bbit/ticket/Application.kt @@ -17,12 +17,15 @@ import com.bbit.ticket.utils.plugins.configureTrace import com.bbit.ticket.route.piaotong.registerPTAuthRoutes import com.bbit.ticket.route.piaotong.registerPTInvoiceRoutes import com.bbit.ticket.route.openapi.registerOpenBlueInvoiceRoutes +import com.bbit.ticket.route.openapi.registerOpenInvoiceTaskRoutes +import com.bbit.ticket.route.piaotong.registerOpenInvoiceTaskManageRoutes import com.bbit.ticket.route.system.registerDictRoutes import com.bbit.ticket.route.system.registerLogsQueryRoutes import com.bbit.ticket.route.system.registerMenuRoutes import com.bbit.ticket.route.system.registerOrgRoutes import com.bbit.ticket.route.system.registerRoleRoutes import com.bbit.ticket.route.system.registerUserRoutes +import com.bbit.ticket.service.openapi.OpenInvoiceTaskWorker import kotlinx.coroutines.runBlocking import io.ktor.server.application.Application import io.ktor.server.auth.authenticate @@ -52,6 +55,7 @@ fun Application.module() { DatabaseInitializer.initialize() SeedData.seed() } + OpenInvoiceTaskWorker.start(this) routing { get("/health") { @@ -69,6 +73,9 @@ fun Application.module() { route("/blue-invoices") { registerOpenBlueInvoiceRoutes() } + route("/blue-invoice-tasks") { + registerOpenInvoiceTaskRoutes() + } route("/f8") { } @@ -77,6 +84,7 @@ fun Application.module() { authenticate("auth-jwt") { registerPTAuthRoutes() registerPTInvoiceRoutes() + registerOpenInvoiceTaskManageRoutes() } } } diff --git a/server/src/main/kotlin/com/bbit/ticket/dao/piaotong/BlueInvoiceDao.kt b/server/src/main/kotlin/com/bbit/ticket/dao/piaotong/BlueInvoiceDao.kt index 1219ea6..293a5cc 100644 --- a/server/src/main/kotlin/com/bbit/ticket/dao/piaotong/BlueInvoiceDao.kt +++ b/server/src/main/kotlin/com/bbit/ticket/dao/piaotong/BlueInvoiceDao.kt @@ -444,9 +444,18 @@ object BlueInvoiceDao { it[HistoryInvoiceBasicTable.invDeletedFlag] = req.invDeletedFlag ?: "0" } - fun findInvoiceScopeBySerialNo(invoiceReqSerialNo: String): InvoiceScope { + fun findInvoiceScopeBySerialNo( + userId: Uuid, + enterpriseId: Uuid?, + digitalAccountId: Uuid?, + invoiceReqSerialNo: String, + ): InvoiceScope { val row = HistoryInvoiceBasicTable.selectAll() - .where { HistoryInvoiceBasicTable.invoiceReqSerialNo eq invoiceReqSerialNo } + .where { + invoiceScopeWhere(userId, enterpriseId, digitalAccountId) and + (HistoryInvoiceBasicTable.invoiceReqSerialNo eq invoiceReqSerialNo) and + HistoryInvoiceBasicTable.deletedAt.isNull() + } .singleOrNull() ?: throw com.bbit.ticket.entity.common.BizException("NOT_FOUND", "发票记录不存在") return InvoiceScope( @@ -454,6 +463,7 @@ object BlueInvoiceDao { ?: throw com.bbit.ticket.entity.common.BizException("NOT_FOUND", "发票记录不存在用户信息"), enterpriseId = row[HistoryInvoiceBasicTable.enterpriseId], digitalAccountId = row[HistoryInvoiceBasicTable.digitalAccountId], + taxpayerNum = row[HistoryInvoiceBasicTable.sellerTaxpayerNum], ) } @@ -461,6 +471,7 @@ object BlueInvoiceDao { val userId: Uuid, val enterpriseId: Uuid?, val digitalAccountId: Uuid?, + val taxpayerNum: String, ) fun findRelatedInvoiceReqSerialNos(userId: Uuid, invoiceReqSerialNo: String): List { diff --git a/server/src/main/kotlin/com/bbit/ticket/database/piaotong/OpenInvoiceTaskTable.kt b/server/src/main/kotlin/com/bbit/ticket/database/piaotong/OpenInvoiceTaskTable.kt new file mode 100644 index 0000000..cabf024 --- /dev/null +++ b/server/src/main/kotlin/com/bbit/ticket/database/piaotong/OpenInvoiceTaskTable.kt @@ -0,0 +1,62 @@ +package com.bbit.ticket.database.piaotong + +import org.jetbrains.exposed.v1.core.Table +import org.jetbrains.exposed.v1.javatime.timestampWithTimeZone +import kotlin.uuid.ExperimentalUuidApi +import kotlin.uuid.Uuid + +@OptIn(ExperimentalUuidApi::class) +object OpenInvoiceTaskTable : Table("open_invoice_task") { + val id = uuid("id").clientDefault { Uuid.random() } + val apiKey = varchar("api_key", 128) + val userId = uuid("user_id") + val enterpriseId = uuid("enterprise_id").nullable() + val digitalAccountId = uuid("digital_account_id") + val taxpayerNum = varchar("taxpayer_num", 32) + val taxAccount = varchar("tax_account", 64).nullable() + val taskType = varchar("task_type", 32) + val sourceType = varchar("source_type", 32) + val runMode = varchar("run_mode", 16) + val invoiceReqSerialNo = varchar("invoice_req_serial_no", 20) + val batchNo = varchar("batch_no", 64).nullable() + val status = varchar("status", 32) + val ptCode = varchar("pt_code", 64).nullable() + val errorMessage = text("error_message").nullable() + val requestBody = text("request_body").nullable() + val attemptCount = integer("attempt_count").default(0) + val maxAttemptCount = integer("max_attempt_count").default(3) + val pollCount = integer("poll_count").default(0) + val maxPollCount = integer("max_poll_count").default(30) + val nextRunAt = timestampWithTimeZone("next_run_at") + val lockedBy = varchar("locked_by", 64).nullable() + val lockedAt = timestampWithTimeZone("locked_at").nullable() + val createdAt = timestampWithTimeZone("created_at") + val updatedAt = timestampWithTimeZone("updated_at").nullable() + val startedAt = timestampWithTimeZone("started_at").nullable() + val finishedAt = timestampWithTimeZone("finished_at").nullable() + + override val primaryKey = PrimaryKey(id) + + init { + uniqueIndex(taskType, invoiceReqSerialNo) + index(false, status, taskType, nextRunAt) + index(false, apiKey, status) + index(false, digitalAccountId, status) + index(false, invoiceReqSerialNo) + index(false, batchNo) + } +} + +@OptIn(ExperimentalUuidApi::class) +object OpenInvoiceQueueControlTable : Table("open_invoice_queue_control") { + val id = uuid("id").clientDefault { Uuid.random() } + val apiKey = varchar("api_key", 128).uniqueIndex() + val digitalAccountId = uuid("digital_account_id").nullable() + val status = varchar("status", 16) + val pauseCode = varchar("pause_code", 64).nullable() + val reason = text("reason").nullable() + val createdAt = timestampWithTimeZone("created_at") + val updatedAt = timestampWithTimeZone("updated_at").nullable() + + override val primaryKey = PrimaryKey(id) +} diff --git a/server/src/main/kotlin/com/bbit/ticket/entity/openapi/OpenInvoiceTaskDto.kt b/server/src/main/kotlin/com/bbit/ticket/entity/openapi/OpenInvoiceTaskDto.kt new file mode 100644 index 0000000..2241ae6 --- /dev/null +++ b/server/src/main/kotlin/com/bbit/ticket/entity/openapi/OpenInvoiceTaskDto.kt @@ -0,0 +1,57 @@ +package com.bbit.ticket.entity.openapi + +import com.bbit.ticket.entity.common.PageResult +import kotlinx.serialization.Serializable + +@Serializable +data class OpenInvoiceTaskSubmitResponse( + val taskId: String, + val invoiceReqSerialNo: String, + val status: String, + val taskType: String, + val runMode: String, +) + +@Serializable +data class OpenInvoiceTaskOverviewItem( + val digitalAccountId: String, + val apiKey: String, + val account: String? = null, + val status: String, + val pauseCode: String? = null, + val reason: String? = null, + val pending: Long, + val processing: Long, + val success: Long, + val failed: Long, + val waitingAuth: Long, + val total: Long, + val lastCreatedAt: String? = null, +) + +@Serializable +data class OpenInvoiceTaskItem( + val id: String, + val digitalAccountId: String, + val apiKey: String, + val account: String? = null, + val taskType: String, + val sourceType: String, + val runMode: String, + val invoiceReqSerialNo: String, + val batchNo: String? = null, + val status: String, + val ptCode: String? = null, + val errorMessage: String? = null, + val attemptCount: Int, + val maxAttemptCount: Int, + val pollCount: Int, + val maxPollCount: Int, + val nextRunAt: String, + val createdAt: String, + val updatedAt: String? = null, + val startedAt: String? = null, + val finishedAt: String? = null, +) + +typealias OpenInvoiceTaskPage = PageResult diff --git a/server/src/main/kotlin/com/bbit/ticket/route/openapi/registerOpenBlueInvoiceRoutes.kt b/server/src/main/kotlin/com/bbit/ticket/route/openapi/registerOpenBlueInvoiceRoutes.kt index b0c2fb4..8d7e892 100644 --- a/server/src/main/kotlin/com/bbit/ticket/route/openapi/registerOpenBlueInvoiceRoutes.kt +++ b/server/src/main/kotlin/com/bbit/ticket/route/openapi/registerOpenBlueInvoiceRoutes.kt @@ -84,7 +84,7 @@ fun Route.registerOpenBlueInvoiceRoutes() { * @param requestBody 请求体 JSON 文本。 * @param block 当前接口要执行的业务逻辑。 */ -private suspend inline fun ApplicationCall.respondOpenApi( +internal suspend inline fun ApplicationCall.respondOpenApi( principal: OpenApiPrincipal, interfaceCode: String, requestBody: String?, diff --git a/server/src/main/kotlin/com/bbit/ticket/route/openapi/registerOpenInvoiceTaskRoutes.kt b/server/src/main/kotlin/com/bbit/ticket/route/openapi/registerOpenInvoiceTaskRoutes.kt new file mode 100644 index 0000000..2ad13b6 --- /dev/null +++ b/server/src/main/kotlin/com/bbit/ticket/route/openapi/registerOpenInvoiceTaskRoutes.kt @@ -0,0 +1,35 @@ +package com.bbit.ticket.route.openapi + +import com.bbit.ticket.entity.openapi.OpenBlueInvoiceCreateRequest +import com.bbit.ticket.service.openapi.OpenInvoiceTaskService +import com.bbit.ticket.utils.requireOpenApiPrincipal +import io.ktor.server.request.receive +import io.ktor.server.routing.Route +import io.ktor.server.routing.post + +fun Route.registerOpenInvoiceTaskRoutes() { + post("/test") { + val principal = call.requireOpenApiPrincipal() + val request = call.receive() + call.respondOpenApi(principal, "blue-invoice-task.test", null) { + OpenInvoiceTaskService.createIssueTask( + principal = principal, + request = request, + runMode = OpenInvoiceTaskService.MODE_SIMULATED, + sourceType = OpenInvoiceTaskService.SOURCE_TEST, + ) + } + } + + post("/production") { + val principal = call.requireOpenApiPrincipal() + val request = call.receive() + call.respondOpenApi(principal, "blue-invoice-task.production", null) { + OpenInvoiceTaskService.createIssueTask( + principal = principal, + request = request, + runMode = OpenInvoiceTaskService.MODE_REAL, + ) + } + } +} diff --git a/server/src/main/kotlin/com/bbit/ticket/route/piaotong/registerOpenInvoiceTaskManageRoutes.kt b/server/src/main/kotlin/com/bbit/ticket/route/piaotong/registerOpenInvoiceTaskManageRoutes.kt new file mode 100644 index 0000000..df8bd24 --- /dev/null +++ b/server/src/main/kotlin/com/bbit/ticket/route/piaotong/registerOpenInvoiceTaskManageRoutes.kt @@ -0,0 +1,49 @@ +package com.bbit.ticket.route.piaotong + +import com.bbit.ticket.service.openapi.OpenInvoiceTaskService +import com.bbit.ticket.utils.requireCurrentUser +import io.ktor.server.request.receiveNullable +import io.ktor.server.routing.Route +import io.ktor.server.routing.get +import io.ktor.server.routing.post + +fun Route.registerOpenInvoiceTaskManageRoutes() { + get("/openapi/tasks/overview") { + call.respondPt("查询 OpenAPI 任务概览失败") { + OpenInvoiceTaskService.overview(call.requireCurrentUser()) + } + } + + get("/openapi/tasks") { + call.respondPt("查询 OpenAPI 任务列表失败") { + OpenInvoiceTaskService.taskPage( + user = call.requireCurrentUser(), + digitalAccountId = call.request.queryParameters["digitalAccountId"], + status = call.request.queryParameters["status"], + sourceType = call.request.queryParameters["sourceType"], + runMode = call.request.queryParameters["runMode"], + page = call.request.queryParameters["page"]?.toIntOrNull() ?: 1, + pageSize = call.request.queryParameters["pageSize"]?.toIntOrNull() ?: 20, + ) + } + } + + post("/openapi/tasks/queues/{digitalAccountId}/pause") { + val digitalAccountId = call.parameters["digitalAccountId"].orEmpty() + val body = call.receiveNullable>() ?: emptyMap() + call.respondPt("暂停 OpenAPI 队列失败") { + OpenInvoiceTaskService.pauseQueue( + user = call.requireCurrentUser(), + digitalAccountId = digitalAccountId, + reason = body["reason"], + ) + } + } + + post("/openapi/tasks/queues/{digitalAccountId}/resume") { + val digitalAccountId = call.parameters["digitalAccountId"].orEmpty() + call.respondPt("恢复 OpenAPI 队列失败") { + OpenInvoiceTaskService.resumeQueue(call.requireCurrentUser(), digitalAccountId) + } + } +} diff --git a/server/src/main/kotlin/com/bbit/ticket/route/piaotong/registerPTInvoiceRoutes.kt b/server/src/main/kotlin/com/bbit/ticket/route/piaotong/registerPTInvoiceRoutes.kt index 28b2c22..ee46f0f 100644 --- a/server/src/main/kotlin/com/bbit/ticket/route/piaotong/registerPTInvoiceRoutes.kt +++ b/server/src/main/kotlin/com/bbit/ticket/route/piaotong/registerPTInvoiceRoutes.kt @@ -4,8 +4,6 @@ package com.bbit.ticket.route.piaotong import com.bbit.ticket.entity.common.BizException import com.bbit.ticket.entity.request.AskInvoiceRequest -import com.bbit.ticket.entity.request.InvoiceQuerySubmitRequest -import com.bbit.ticket.entity.request.QueryInvoiceRequest import com.bbit.ticket.entity.request.RedCreateRequest import com.bbit.ticket.service.piaotong.PTBlueService import com.bbit.ticket.service.piaotong.PTRedService @@ -103,17 +101,7 @@ fun Route.registerPTInvoiceRoutes() { val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号") ?: return@get call.respondPt("刷新发票状态失败") { - val currentUser = call.requireCurrentUser() - val account = com.bbit.ticket.service.piaotong.PTConfigService.requireDigitalAccountForAction( - currentUser, - call.request.queryParameters["digitalAccountId"], - ) - PTBlueService.queryInvoiceAllInfo( - QueryInvoiceRequest( - taxpayerNum = account.taxpayerNum, - invoiceReqSerialNo = invoiceReqSerialNo, - ) - ) + PTBlueService.queryInvoiceAllInfo(call.requireCurrentUser(), invoiceReqSerialNo) } } diff --git a/server/src/main/kotlin/com/bbit/ticket/service/openapi/OpenInvoiceTaskService.kt b/server/src/main/kotlin/com/bbit/ticket/service/openapi/OpenInvoiceTaskService.kt new file mode 100644 index 0000000..3850290 --- /dev/null +++ b/server/src/main/kotlin/com/bbit/ticket/service/openapi/OpenInvoiceTaskService.kt @@ -0,0 +1,742 @@ +@file:OptIn(ExperimentalUuidApi::class) + +package com.bbit.ticket.service.openapi + +import com.bbit.ticket.dao.piaotong.BlueInvoiceDao +import com.bbit.ticket.database.piaotong.HistoryInvoiceBasicTable +import com.bbit.ticket.database.piaotong.OpenInvoiceQueueControlTable +import com.bbit.ticket.database.piaotong.OpenInvoiceTaskTable +import com.bbit.ticket.database.piaotong.PtDigitalAccountTable +import com.bbit.ticket.entity.common.BizException +import com.bbit.ticket.entity.common.ErrorCode +import com.bbit.ticket.entity.common.PTException +import com.bbit.ticket.entity.common.PageResult +import com.bbit.ticket.entity.openapi.OpenBlueInvoiceCreateRequest +import com.bbit.ticket.entity.openapi.OpenInvoiceTaskItem +import com.bbit.ticket.entity.openapi.OpenInvoiceTaskOverviewItem +import com.bbit.ticket.entity.openapi.OpenInvoiceTaskSubmitResponse +import com.bbit.ticket.entity.request.AskInvoiceRequest +import com.bbit.ticket.entity.request.QueryInvoiceRequest +import com.bbit.ticket.service.piaotong.PTConfigService +import com.bbit.ticket.utils.CurrentUser +import com.bbit.ticket.utils.OpenApiPrincipal +import com.bbit.ticket.utils.formatDateTime +import com.bbit.ticket.utils.net.PTApi +import com.bbit.ticket.utils.plugins.dbQuery +import com.bbit.ticket.utils.plugins.myJson +import io.ktor.server.application.Application +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.encodeToString +import org.jetbrains.exposed.v1.core.Op +import org.jetbrains.exposed.v1.core.ResultRow +import org.jetbrains.exposed.v1.core.SortOrder +import org.jetbrains.exposed.v1.core.and +import org.jetbrains.exposed.v1.core.eq +import org.jetbrains.exposed.v1.core.inList +import org.jetbrains.exposed.v1.core.isNull +import org.jetbrains.exposed.v1.core.lessEq +import org.jetbrains.exposed.v1.core.notInList +import org.jetbrains.exposed.v1.core.or +import org.jetbrains.exposed.v1.jdbc.Query +import org.jetbrains.exposed.v1.jdbc.insert +import org.jetbrains.exposed.v1.jdbc.selectAll +import org.jetbrains.exposed.v1.jdbc.update +import java.time.OffsetDateTime +import java.util.concurrent.ConcurrentHashMap +import kotlin.uuid.ExperimentalUuidApi +import kotlin.uuid.Uuid + +object OpenInvoiceTaskService { + const val TASK_ISSUE_BLUE = "ISSUE_BLUE" + const val TASK_QUERY_BLUE = "QUERY_BLUE" + const val SOURCE_SINGLE = "SINGLE" + const val SOURCE_TEST = "TEST" + const val MODE_REAL = "REAL" + const val MODE_SIMULATED = "SIMULATED" + + private const val STATUS_PENDING = "PENDING" + private const val STATUS_PROCESSING = "PROCESSING" + private const val STATUS_SUCCESS = "SUCCESS" + private const val STATUS_FAILED = "FAILED" + private const val STATUS_WAITING_AUTH = "WAITING_AUTH" + private const val QUEUE_RUNNING = "RUNNING" + private const val QUEUE_PAUSED = "PAUSED" + private const val AUTH_REQUIRED_CODE = "3999" + + suspend fun createIssueTask( + principal: OpenApiPrincipal, + request: OpenBlueInvoiceCreateRequest, + runMode: String, + sourceType: String = SOURCE_SINGLE, + batchNo: String? = null, + ): OpenInvoiceTaskSubmitResponse { + validateRequest(request) + val invoiceReqSerialNo = request.invoiceReqSerialNo?.trim() + ?: throw BizException(ErrorCode.BAD_REQUEST.code, "invoiceReqSerialNo 不能为空") + if (invoiceReqSerialNo.length > 20) { + throw BizException(ErrorCode.BAD_REQUEST.code, "invoiceReqSerialNo 长度不能超过 20") + } + OpenApiRateLimiter.check(principal.apiKey) + + val now = OffsetDateTime.now() + val existing = dbQuery { + OpenInvoiceTaskTable.selectAll() + .where { + (OpenInvoiceTaskTable.taskType eq TASK_ISSUE_BLUE) and + (OpenInvoiceTaskTable.invoiceReqSerialNo eq invoiceReqSerialNo) + } + .singleOrNull() + } + if (existing != null) { + if (existing[OpenInvoiceTaskTable.runMode] != runMode) { + throw BizException( + ErrorCode.BAD_REQUEST.code, + "invoiceReqSerialNo 已存在 ${existing[OpenInvoiceTaskTable.runMode]} 任务,不能重复创建 $runMode 任务", + ) + } + return existing.toSubmitResponse() + } + + val historyExists = dbQuery { + HistoryInvoiceBasicTable.selectAll() + .where { HistoryInvoiceBasicTable.invoiceReqSerialNo eq invoiceReqSerialNo } + .singleOrNull() != null + } + if (historyExists) { + throw BizException(ErrorCode.BAD_REQUEST.code, "invoiceReqSerialNo 已存在历史发票记录,请勿重复开票") + } + + ensureQueueCanAccept(principal.apiKey) + + val taskId = dbQuery { + OpenInvoiceTaskTable.insert { + it[apiKey] = principal.apiKey + it[userId] = principal.userId + it[enterpriseId] = principal.enterpriseId + it[digitalAccountId] = principal.digitalAccountId + it[taxpayerNum] = principal.taxPayerNum + it[taxAccount] = principal.taxAccount + it[taskType] = TASK_ISSUE_BLUE + it[OpenInvoiceTaskTable.sourceType] = sourceType + it[OpenInvoiceTaskTable.runMode] = runMode + it[OpenInvoiceTaskTable.invoiceReqSerialNo] = invoiceReqSerialNo + it[OpenInvoiceTaskTable.batchNo] = batchNo + it[status] = STATUS_PENDING + it[requestBody] = myJson.encodeToString(request.copy(invoiceReqSerialNo = invoiceReqSerialNo)) + it[maxAttemptCount] = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.maxAttemptCount + it[maxPollCount] = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.maxQueryPollCount + it[nextRunAt] = now + it[createdAt] = now + }[OpenInvoiceTaskTable.id] + } + + return OpenInvoiceTaskSubmitResponse( + taskId = taskId.toString(), + invoiceReqSerialNo = invoiceReqSerialNo, + status = STATUS_PENDING, + taskType = TASK_ISSUE_BLUE, + runMode = runMode, + ) + } + + suspend fun overview(user: CurrentUser): List { + val rows = dbQuery { + val tasks = scopedTaskQuery(user).toList() + val accountRows = PtDigitalAccountTable.selectAll() + .where { accountScope(user) } + .associateBy { it[PtDigitalAccountTable.id] } + val controls = OpenInvoiceQueueControlTable.selectAll() + .associateBy { it[OpenInvoiceQueueControlTable.apiKey] } + + val taskGroups = tasks.groupBy { it[OpenInvoiceTaskTable.digitalAccountId] } + accountRows.map { (digitalAccountId, account) -> + val group = taskGroups[digitalAccountId].orEmpty() + val apiKey = account[PtDigitalAccountTable.apiKey].orEmpty() + val control = controls[apiKey] + OpenInvoiceTaskOverviewItem( + digitalAccountId = digitalAccountId.toString(), + apiKey = apiKey, + account = account[PtDigitalAccountTable.account], + status = control?.get(OpenInvoiceQueueControlTable.status) ?: QUEUE_RUNNING, + pauseCode = control?.get(OpenInvoiceQueueControlTable.pauseCode), + reason = control?.get(OpenInvoiceQueueControlTable.reason), + pending = group.count { it[OpenInvoiceTaskTable.status] == STATUS_PENDING }.toLong(), + processing = group.count { it[OpenInvoiceTaskTable.status] == STATUS_PROCESSING }.toLong(), + success = group.count { it[OpenInvoiceTaskTable.status] == STATUS_SUCCESS }.toLong(), + failed = group.count { it[OpenInvoiceTaskTable.status] == STATUS_FAILED }.toLong(), + waitingAuth = group.count { it[OpenInvoiceTaskTable.status] == STATUS_WAITING_AUTH }.toLong(), + total = group.size.toLong(), + lastCreatedAt = formatDateTime(group.maxOfOrNull { it[OpenInvoiceTaskTable.createdAt] }), + ) + }.sortedByDescending { it.lastCreatedAt ?: "" } + } + return rows + } + + suspend fun taskPage( + user: CurrentUser, + digitalAccountId: String?, + status: String?, + sourceType: String?, + runMode: String?, + page: Int, + pageSize: Int, + ): PageResult = dbQuery { + var where = taskScope(user) + digitalAccountId?.takeIf { it.isNotBlank() }?.let { + where = where and (OpenInvoiceTaskTable.digitalAccountId eq Uuid.parse(it)) + } + status?.takeIf { it.isNotBlank() }?.let { + where = where and (OpenInvoiceTaskTable.status eq it) + } + sourceType?.takeIf { it.isNotBlank() }?.let { + where = where and (OpenInvoiceTaskTable.sourceType eq it) + } + runMode?.takeIf { it.isNotBlank() }?.let { + where = where and (OpenInvoiceTaskTable.runMode eq it) + } + val accountRows = PtDigitalAccountTable.selectAll() + .where { accountScope(user) } + .associateBy { it[PtDigitalAccountTable.id] } + val total = OpenInvoiceTaskTable.selectAll().where { where }.count() + val items = OpenInvoiceTaskTable.selectAll() + .where { where } + .orderBy(OpenInvoiceTaskTable.createdAt, SortOrder.DESC) + .limit(pageSize) + .offset(((page - 1).coerceAtLeast(0) * pageSize).toLong()) + .map { it.toTaskItem(accountRows[it[OpenInvoiceTaskTable.digitalAccountId]]?.get(PtDigitalAccountTable.account)) } + PageResult(items, page, pageSize, total) + } + + suspend fun pauseQueue(user: CurrentUser, digitalAccountId: String, reason: String?): String { + val account = requireManagedAccount(user, digitalAccountId) + pauseApiKey(account[PtDigitalAccountTable.apiKey] ?: throw BizException(ErrorCode.BAD_REQUEST.code, "数电账号未配置 api-key"), AUTH_REQUIRED_CODE, reason ?: "手动暂停") + return "队列已暂停" + } + + suspend fun resumeQueue(user: CurrentUser, digitalAccountId: String): String { + val account = requireManagedAccount(user, digitalAccountId) + val apiKey = account[PtDigitalAccountTable.apiKey] ?: throw BizException(ErrorCode.BAD_REQUEST.code, "数电账号未配置 api-key") + val now = OffsetDateTime.now() + dbQuery { + OpenInvoiceQueueControlTable.update({ OpenInvoiceQueueControlTable.apiKey eq apiKey }) { + it[status] = QUEUE_RUNNING + it[pauseCode] = null + it[reason] = null + it[updatedAt] = now + } + OpenInvoiceTaskTable.update({ + (OpenInvoiceTaskTable.apiKey eq apiKey) and + (OpenInvoiceTaskTable.status eq STATUS_WAITING_AUTH) + }) { + it[status] = STATUS_PENDING + it[nextRunAt] = now + it[lockedBy] = null + it[lockedAt] = null + it[updatedAt] = now + } + } + return "队列已恢复" + } + + internal suspend fun processIssueTasks(workerId: String, limit: Int) { + val tasks = claimTasks(TASK_ISSUE_BLUE, workerId, limit) + tasks.forEach { processIssueTask(it) } + } + + internal suspend fun processQueryTasks(workerId: String, limit: Int) { + val tasks = claimTasks(TASK_QUERY_BLUE, workerId, limit) + tasks.forEach { processQueryTask(it) } + } + + private suspend fun processIssueTask(task: ResultRow) { + val taskId = task[OpenInvoiceTaskTable.id] + val runMode = task[OpenInvoiceTaskTable.runMode] + val request = task[OpenInvoiceTaskTable.requestBody] + ?.let { myJson.decodeFromString(it) } + ?: return failTask(taskId, "REQUEST_BODY_NULL", "任务请求体为空") + val askRequest = request.toAskInvoiceRequest(task) + createHistoryPlaceholder(task, askRequest) + try { + if (runMode == MODE_SIMULATED) { + delay(2000) + } else { + PTApi.invoiceBlue(askRequest) + } + completeIssueTask(task) + } catch (e: PTException) { + failTask(taskId, e.code, e.message) + if (e.code == AUTH_REQUIRED_CODE) { + pauseApiKey(task[OpenInvoiceTaskTable.apiKey], e.code, e.message) + } + } catch (e: Exception) { + retryOrFail(task, "EXCEPTION", e.message ?: "开票任务执行失败") + } + } + + private suspend fun processQueryTask(task: ResultRow) { + val taskId = task[OpenInvoiceTaskTable.id] + val runMode = task[OpenInvoiceTaskTable.runMode] + val invoiceReqSerialNo = task[OpenInvoiceTaskTable.invoiceReqSerialNo] + try { + val code = if (runMode == MODE_SIMULATED) { + delay(2000) + simulatedQueryCode(invoiceReqSerialNo, task[OpenInvoiceTaskTable.pollCount]) + } else { + val res = PTApi.queryInvoiceInfo( + QueryInvoiceRequest( + taxpayerNum = task[OpenInvoiceTaskTable.taxpayerNum], + invoiceReqSerialNo = invoiceReqSerialNo, + ) + ) + dbQuery { + BlueInvoiceDao.upsertInvoiceInfo( + task[OpenInvoiceTaskTable.userId], + res, + task[OpenInvoiceTaskTable.enterpriseId], + task[OpenInvoiceTaskTable.digitalAccountId], + ) + } + res.code + } + handleQueryCode(task, code) + } catch (e: PTException) { + retryOrFail(task, e.code, e.message) + } catch (e: Exception) { + retryOrFail(task, "EXCEPTION", e.message ?: "查询任务执行失败") + } + } + + private suspend fun completeIssueTask(task: ResultRow) { + val now = OffsetDateTime.now() + val invoiceReqSerialNo = task[OpenInvoiceTaskTable.invoiceReqSerialNo] + dbQuery { + OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) { + it[status] = STATUS_SUCCESS + it[ptCode] = "0000" + it[errorMessage] = null + it[finishedAt] = now + it[updatedAt] = now + it[lockedBy] = null + it[lockedAt] = null + } + val exists = OpenInvoiceTaskTable.selectAll() + .where { + (OpenInvoiceTaskTable.taskType eq TASK_QUERY_BLUE) and + (OpenInvoiceTaskTable.invoiceReqSerialNo eq invoiceReqSerialNo) + } + .singleOrNull() != null + if (!exists) { + OpenInvoiceTaskTable.insert { + it[apiKey] = task[OpenInvoiceTaskTable.apiKey] + it[userId] = task[OpenInvoiceTaskTable.userId] + it[enterpriseId] = task[OpenInvoiceTaskTable.enterpriseId] + it[digitalAccountId] = task[OpenInvoiceTaskTable.digitalAccountId] + it[taxpayerNum] = task[OpenInvoiceTaskTable.taxpayerNum] + it[taxAccount] = task[OpenInvoiceTaskTable.taxAccount] + it[taskType] = TASK_QUERY_BLUE + it[sourceType] = task[OpenInvoiceTaskTable.sourceType] + it[runMode] = task[OpenInvoiceTaskTable.runMode] + it[OpenInvoiceTaskTable.invoiceReqSerialNo] = invoiceReqSerialNo + it[batchNo] = task[OpenInvoiceTaskTable.batchNo] + it[status] = STATUS_PENDING + it[requestBody] = task[OpenInvoiceTaskTable.taxpayerNum] + it[maxAttemptCount] = task[OpenInvoiceTaskTable.maxAttemptCount] + it[maxPollCount] = task[OpenInvoiceTaskTable.maxPollCount] + it[nextRunAt] = now.plusSeconds(com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.queryDelaySeconds) + it[createdAt] = now + } + } + } + } + + private suspend fun handleQueryCode(task: ResultRow, code: String) { + when (code) { + "0000" -> finishQueryTask(task, STATUS_SUCCESS, code, null) + "9999" -> finishQueryTask(task, STATUS_FAILED, code, "开票失败") + AUTH_REQUIRED_CODE -> { + val message = "需要登录/风险认证" + dbQuery { + OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) { + it[status] = STATUS_WAITING_AUTH + it[ptCode] = code + it[errorMessage] = message + it[updatedAt] = OffsetDateTime.now() + it[lockedBy] = null + it[lockedAt] = null + } + } + pauseApiKey(task[OpenInvoiceTaskTable.apiKey], code, message) + } + "7777", "6666" -> requeueQueryTask(task, code) + else -> requeueQueryTask(task, code) + } + } + + private suspend fun finishQueryTask(task: ResultRow, status: String, code: String, message: String?) { + val now = OffsetDateTime.now() + dbQuery { + OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) { + it[OpenInvoiceTaskTable.status] = status + it[ptCode] = code + it[errorMessage] = message + it[finishedAt] = now + it[updatedAt] = now + it[lockedBy] = null + it[lockedAt] = null + } + HistoryInvoiceBasicTable.update({ + HistoryInvoiceBasicTable.invoiceReqSerialNo eq task[OpenInvoiceTaskTable.invoiceReqSerialNo] + }) { + it[HistoryInvoiceBasicTable.code] = code + it[msg] = message ?: if (code == "0000") "开票成功" else "开票失败" + it[updatedAt] = now + } + } + } + + private suspend fun requeueQueryTask(task: ResultRow, code: String) { + val nextPollCount = task[OpenInvoiceTaskTable.pollCount] + 1 + if (nextPollCount >= task[OpenInvoiceTaskTable.maxPollCount]) { + finishQueryTask(task, STATUS_FAILED, code, "查询超过最大次数") + return + } + val now = OffsetDateTime.now() + dbQuery { + OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) { + it[status] = STATUS_PENDING + it[ptCode] = code + it[pollCount] = nextPollCount + it[nextRunAt] = now.plusSeconds(com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.queryDelaySeconds) + it[updatedAt] = now + it[lockedBy] = null + it[lockedAt] = null + } + HistoryInvoiceBasicTable.update({ + HistoryInvoiceBasicTable.invoiceReqSerialNo eq task[OpenInvoiceTaskTable.invoiceReqSerialNo] + }) { + it[HistoryInvoiceBasicTable.code] = code + it[msg] = if (code == "6666") "未开票" else "开票中..." + it[updatedAt] = now + } + } + } + + private suspend fun retryOrFail(task: ResultRow, code: String, message: String) { + val nextAttempt = task[OpenInvoiceTaskTable.attemptCount] + 1 + if (nextAttempt >= task[OpenInvoiceTaskTable.maxAttemptCount]) { + failTask(task[OpenInvoiceTaskTable.id], code, message) + return + } + val now = OffsetDateTime.now() + dbQuery { + OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) { + it[status] = STATUS_PENDING + it[ptCode] = code + it[errorMessage] = message + it[attemptCount] = nextAttempt + it[nextRunAt] = now.plusSeconds(10L * nextAttempt) + it[updatedAt] = now + it[lockedBy] = null + it[lockedAt] = null + } + } + } + + private suspend fun failTask(taskId: Uuid, code: String, message: String) { + val now = OffsetDateTime.now() + dbQuery { + val task = OpenInvoiceTaskTable.selectAll() + .where { OpenInvoiceTaskTable.id eq taskId } + .singleOrNull() + OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq taskId }) { + it[status] = STATUS_FAILED + it[ptCode] = code + it[errorMessage] = message + it[finishedAt] = now + it[updatedAt] = now + it[lockedBy] = null + it[lockedAt] = null + } + if (task != null) { + HistoryInvoiceBasicTable.update({ + HistoryInvoiceBasicTable.invoiceReqSerialNo eq task[OpenInvoiceTaskTable.invoiceReqSerialNo] + }) { + it[HistoryInvoiceBasicTable.code] = code + it[msg] = message + it[updatedAt] = now + } + } + } + } + + private suspend fun claimTasks(taskType: String, workerId: String, limit: Int): List { + val now = OffsetDateTime.now() + val candidateIds = dbQuery { + val pausedApiKeys = OpenInvoiceQueueControlTable.selectAll() + .where { OpenInvoiceQueueControlTable.status eq QUEUE_PAUSED } + .map { it[OpenInvoiceQueueControlTable.apiKey] } + var where = (OpenInvoiceTaskTable.taskType eq taskType) and + (OpenInvoiceTaskTable.status eq STATUS_PENDING) and + (OpenInvoiceTaskTable.nextRunAt lessEq now) + if (pausedApiKeys.isNotEmpty()) { + where = where and (OpenInvoiceTaskTable.apiKey notInList pausedApiKeys) + } + OpenInvoiceTaskTable.selectAll() + .where { where } + .orderBy(OpenInvoiceTaskTable.createdAt, SortOrder.ASC) + .limit(limit) + .map { it[OpenInvoiceTaskTable.id] } + } + if (candidateIds.isEmpty()) { + return emptyList() + } + return dbQuery { + candidateIds.mapNotNull { id -> + val updated = OpenInvoiceTaskTable.update({ + (OpenInvoiceTaskTable.id eq id) and (OpenInvoiceTaskTable.status eq STATUS_PENDING) + }) { + it[status] = STATUS_PROCESSING + it[lockedBy] = workerId + it[lockedAt] = now + it[startedAt] = now + it[updatedAt] = now + } + if (updated == 1) { + OpenInvoiceTaskTable.selectAll().where { OpenInvoiceTaskTable.id eq id }.singleOrNull() + } else { + null + } + } + } + } + + private suspend fun createHistoryPlaceholder(task: ResultRow, request: AskInvoiceRequest) = dbQuery { + val exists = HistoryInvoiceBasicTable.selectAll() + .where { HistoryInvoiceBasicTable.invoiceReqSerialNo eq request.invoiceReqSerialNo } + .singleOrNull() != null + if (!exists) { + val now = OffsetDateTime.now() + HistoryInvoiceBasicTable.insert { + it[userId] = task[OpenInvoiceTaskTable.userId] + it[enterpriseId] = task[OpenInvoiceTaskTable.enterpriseId] + it[digitalAccountId] = task[OpenInvoiceTaskTable.digitalAccountId] + it[invoiceReqSerialNo] = request.invoiceReqSerialNo + it[code] = "7777" + it[msg] = "开票中..." + it[sellerTaxpayerNum] = request.taxpayerNum + it[invoiceKind] = request.invoiceIssueKindCode + it[invoiceType] = "1" + it[invDeletedFlag] = "0" + it[createdAt] = now + } + } + } + + private suspend fun pauseApiKey(apiKey: String, code: String, reason: String) { + val now = OffsetDateTime.now() + dbQuery { + val exists = OpenInvoiceQueueControlTable.selectAll() + .where { OpenInvoiceQueueControlTable.apiKey eq apiKey } + .singleOrNull() + if (exists == null) { + val digitalAccountId = PtDigitalAccountTable.selectAll() + .where { PtDigitalAccountTable.apiKey eq apiKey } + .singleOrNull() + ?.get(PtDigitalAccountTable.id) + OpenInvoiceQueueControlTable.insert { + it[OpenInvoiceQueueControlTable.apiKey] = apiKey + it[OpenInvoiceQueueControlTable.digitalAccountId] = digitalAccountId + it[status] = QUEUE_PAUSED + it[pauseCode] = code + it[OpenInvoiceQueueControlTable.reason] = reason + it[createdAt] = now + } + } else { + OpenInvoiceQueueControlTable.update({ OpenInvoiceQueueControlTable.apiKey eq apiKey }) { + it[status] = QUEUE_PAUSED + it[pauseCode] = code + it[OpenInvoiceQueueControlTable.reason] = reason + it[updatedAt] = now + } + } + } + } + + private suspend fun ensureQueueCanAccept(apiKey: String) { + val maxPending = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.maxPendingPerApiKey + val pending = dbQuery { + OpenInvoiceTaskTable.selectAll() + .where { + (OpenInvoiceTaskTable.apiKey eq apiKey) and + ((OpenInvoiceTaskTable.status eq STATUS_PENDING) or (OpenInvoiceTaskTable.status eq STATUS_PROCESSING)) + } + .count() + } + if (pending >= maxPending) { + throw BizException(ErrorCode.BAD_REQUEST.code, "当前 api-key 任务积压已达上限") + } + } + + private fun validateRequest(request: OpenBlueInvoiceCreateRequest) { + if (request.buyerName.isBlank()) { + throw BizException(ErrorCode.BAD_REQUEST.code, "buyerName 不能为空") + } + if (request.itemList.isEmpty()) { + throw BizException(ErrorCode.BAD_REQUEST.code, "itemList 不能为空") + } + } + + private fun OpenBlueInvoiceCreateRequest.toAskInvoiceRequest(task: ResultRow): AskInvoiceRequest = + AskInvoiceRequest( + taxpayerNum = task[OpenInvoiceTaskTable.taxpayerNum], + invoiceReqSerialNo = invoiceReqSerialNo ?: "", + invoiceIssueKindCode = invoiceIssueKindCode, + buyerName = buyerName, + purchaseInvSellerIdType = purchaseInvSellerIdType, + buyerTaxpayerNum = buyerTaxpayerNum, + naturalPersonFlag = naturalPersonFlag, + buyerAddress = buyerAddress, + buyerTel = buyerTel, + buyerBankName = buyerBankName, + buyerBankAccount = buyerBankAccount, + showBuyerBank = showBuyerBank, + showSellerBank = showSellerBank, + showBuyerAddrTel = showBuyerAddrTel, + showSellerAddrTel = showSellerAddrTel, + account = task[OpenInvoiceTaskTable.taxAccount], + variableLevyFlag = variableLevyFlag, + casherName = casherName, + reviewerName = reviewerName, + takerName = takerName, + takerTel = takerTel, + takerEmail = takerEmail, + specialInvoiceKind = specialInvoiceKind, + remark = remark, + definedData = definedData, + tradeNo = tradeNo ?: invoiceReqSerialNo, + shopNum = shopNum, + itemList = itemList, + variableLevyProofList = variableLevyProofList, + orderList = orderList, + ) + + private fun simulatedQueryCode(invoiceReqSerialNo: String, pollCount: Int): String = + when { + invoiceReqSerialNo.contains("3999", ignoreCase = true) -> AUTH_REQUIRED_CODE + invoiceReqSerialNo.contains("FAIL", ignoreCase = true) -> "9999" + pollCount <= 0 -> "7777" + pollCount == 1 -> "6666" + else -> "0000" + } + + private fun scopedTaskQuery(user: CurrentUser): Query = + OpenInvoiceTaskTable.selectAll().where { taskScope(user) } + + private fun taskScope(user: CurrentUser): Op = + when { + user.isSuperAdmin -> Op.TRUE + user.isDigitalOperator && user.digitalAccountId != null -> OpenInvoiceTaskTable.digitalAccountId eq user.digitalAccountId + user.enterpriseId != null -> OpenInvoiceTaskTable.enterpriseId eq user.enterpriseId + else -> OpenInvoiceTaskTable.userId eq user.id + } + + private fun accountScope(user: CurrentUser): Op = + when { + user.isSuperAdmin -> Op.TRUE + user.isDigitalOperator && user.digitalAccountId != null -> PtDigitalAccountTable.id eq user.digitalAccountId + user.enterpriseId != null -> PtDigitalAccountTable.enterpriseId eq user.enterpriseId + else -> PtDigitalAccountTable.platformUserId eq user.id + } + + private suspend fun requireManagedAccount(user: CurrentUser, digitalAccountId: String): ResultRow = dbQuery { + PtDigitalAccountTable.selectAll() + .where { accountScope(user) and (PtDigitalAccountTable.id eq Uuid.parse(digitalAccountId)) } + .singleOrNull() + } ?: throw BizException(ErrorCode.BAD_REQUEST.code, "数电账号不存在或无权操作") + + private fun ResultRow.toSubmitResponse(): OpenInvoiceTaskSubmitResponse = + OpenInvoiceTaskSubmitResponse( + taskId = this[OpenInvoiceTaskTable.id].toString(), + invoiceReqSerialNo = this[OpenInvoiceTaskTable.invoiceReqSerialNo], + status = this[OpenInvoiceTaskTable.status], + taskType = this[OpenInvoiceTaskTable.taskType], + runMode = this[OpenInvoiceTaskTable.runMode], + ) + + private fun ResultRow.toTaskItem(account: String?): OpenInvoiceTaskItem = + OpenInvoiceTaskItem( + id = this[OpenInvoiceTaskTable.id].toString(), + digitalAccountId = this[OpenInvoiceTaskTable.digitalAccountId].toString(), + apiKey = this[OpenInvoiceTaskTable.apiKey], + account = account, + taskType = this[OpenInvoiceTaskTable.taskType], + sourceType = this[OpenInvoiceTaskTable.sourceType], + runMode = this[OpenInvoiceTaskTable.runMode], + invoiceReqSerialNo = this[OpenInvoiceTaskTable.invoiceReqSerialNo], + batchNo = this[OpenInvoiceTaskTable.batchNo], + status = this[OpenInvoiceTaskTable.status], + ptCode = this[OpenInvoiceTaskTable.ptCode], + errorMessage = this[OpenInvoiceTaskTable.errorMessage], + attemptCount = this[OpenInvoiceTaskTable.attemptCount], + maxAttemptCount = this[OpenInvoiceTaskTable.maxAttemptCount], + pollCount = this[OpenInvoiceTaskTable.pollCount], + maxPollCount = this[OpenInvoiceTaskTable.maxPollCount], + nextRunAt = formatDateTime(this[OpenInvoiceTaskTable.nextRunAt]) ?: "", + createdAt = formatDateTime(this[OpenInvoiceTaskTable.createdAt]) ?: "", + updatedAt = formatDateTime(this[OpenInvoiceTaskTable.updatedAt]), + startedAt = formatDateTime(this[OpenInvoiceTaskTable.startedAt]), + finishedAt = formatDateTime(this[OpenInvoiceTaskTable.finishedAt]), + ) +} + +object OpenInvoiceTaskWorker { + fun start(application: Application) { + val config = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue + repeat(config.issueWorkerCount) { index -> + launchWorker(application, "issue-$index") { + OpenInvoiceTaskService.processIssueTasks("issue-$index", 1) + } + } + repeat(config.queryWorkerCount) { index -> + launchWorker(application, "query-$index") { + OpenInvoiceTaskService.processQueryTasks("query-$index", 1) + } + } + } + + private fun launchWorker(application: Application, name: String, block: suspend () -> Unit) { + CoroutineScope(application.coroutineContext + Dispatchers.Default).launch { + while (isActive) { + runCatching { block() } + delay(1000) + } + } + } +} + +private object OpenApiRateLimiter { + private data class Bucket(val second: Long, val count: Int) + + private val buckets = ConcurrentHashMap() + + fun check(apiKey: String) { + val limit = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.perApiKeyPerSecond + val nowSecond = System.currentTimeMillis() / 1000 + val updated = buckets.compute(apiKey) { _, old -> + if (old == null || old.second != nowSecond) { + Bucket(nowSecond, 1) + } else { + Bucket(nowSecond, old.count + 1) + } + } ?: Bucket(nowSecond, 1) + if (updated.count > limit) { + throw BizException(ErrorCode.BAD_REQUEST.code, "当前 api-key 请求过于频繁") + } + } +} diff --git a/server/src/main/kotlin/com/bbit/ticket/service/piaotong/PTBlueService.kt b/server/src/main/kotlin/com/bbit/ticket/service/piaotong/PTBlueService.kt index ae94bda..f473c41 100644 --- a/server/src/main/kotlin/com/bbit/ticket/service/piaotong/PTBlueService.kt +++ b/server/src/main/kotlin/com/bbit/ticket/service/piaotong/PTBlueService.kt @@ -5,14 +5,11 @@ package com.bbit.ticket.service.piaotong import com.bbit.ticket.dao.piaotong.BlueInvoiceDao import com.bbit.ticket.entity.common.PageResult import com.bbit.ticket.entity.request.AskInvoiceRequest -import com.bbit.ticket.entity.request.InvoiceQueryRequest -import com.bbit.ticket.entity.request.InvoiceQuerySubmitRequest import com.bbit.ticket.entity.request.QueryInvoiceRequest import com.bbit.ticket.entity.response.InvoiceDownloadUrlResponse import com.bbit.ticket.entity.response.QueryInvoiceResult import com.bbit.ticket.entity.response.InvoiceDetailResponse import com.bbit.ticket.entity.response.InvoiceHistoryItem -import com.bbit.ticket.entity.response.InvoiceQueryResponse import com.bbit.ticket.utils.CurrentUser import io.ktor.client.request.get import io.ktor.client.statement.bodyAsBytes @@ -163,18 +160,28 @@ object PTBlueService { /** * 查询并更新发票状态(复用 syncInvoiceFromPT) */ - suspend fun queryInvoiceAllInfo(req: QueryInvoiceRequest): QueryInvoiceResult { - val invoiceReqSerialNo = req.invoiceReqSerialNo + suspend fun queryInvoiceAllInfo(user: CurrentUser, invoiceReqSerialNo: String): QueryInvoiceResult { val scope = dbQuery { - BlueInvoiceDao.findInvoiceScopeBySerialNo(invoiceReqSerialNo) + BlueInvoiceDao.findInvoiceScopeBySerialNo( + user.id, + user.enterpriseId, + if (user.isDigitalOperator) user.digitalAccountId else null, + invoiceReqSerialNo, + ) } - val result = syncInvoiceFromPT(scope.userId, invoiceReqSerialNo, req.taxpayerNum, scope.enterpriseId, scope.digitalAccountId) + val result = syncInvoiceFromPT( + scope.userId, + invoiceReqSerialNo, + scope.taxpayerNum, + scope.enterpriseId, + scope.digitalAccountId, + ) val relatedSerialNos = dbQuery { BlueInvoiceDao.findRelatedInvoiceReqSerialNos(scope.userId, invoiceReqSerialNo) } relatedSerialNos.forEach { relatedSerialNo -> runCatching { - syncInvoiceFromPT(scope.userId, relatedSerialNo, req.taxpayerNum, scope.enterpriseId, scope.digitalAccountId) + syncInvoiceFromPT(scope.userId, relatedSerialNo, scope.taxpayerNum, scope.enterpriseId, scope.digitalAccountId) } } return result diff --git a/server/src/main/kotlin/com/bbit/ticket/utils/bootstrap/AppConfig.kt b/server/src/main/kotlin/com/bbit/ticket/utils/bootstrap/AppConfig.kt index a8c8e74..f08d0c6 100644 --- a/server/src/main/kotlin/com/bbit/ticket/utils/bootstrap/AppConfig.kt +++ b/server/src/main/kotlin/com/bbit/ticket/utils/bootstrap/AppConfig.kt @@ -33,6 +33,16 @@ object AppConfig { val allowedHosts: List, ) + data class OpenApiQueue( + val issueWorkerCount: Int, + val queryWorkerCount: Int, + val maxPendingPerApiKey: Int, + val queryDelaySeconds: Long, + val maxQueryPollCount: Int, + val maxAttemptCount: Int, + val perApiKeyPerSecond: Int, + ) + lateinit var app: App private set @@ -48,6 +58,9 @@ object AppConfig { lateinit var cors: Cors private set + lateinit var openApiQueue: OpenApiQueue + private set + fun init(environment: ApplicationEnvironment) { app = App( name = string(environment, "app.name", "Platform A"), @@ -81,6 +94,16 @@ object AppConfig { .map { it.trim() } .filter { it.isNotEmpty() }, ) + + openApiQueue = OpenApiQueue( + issueWorkerCount = int(environment, "openapi.queue.issueWorkerCount", 4), + queryWorkerCount = int(environment, "openapi.queue.queryWorkerCount", 4), + maxPendingPerApiKey = int(environment, "openapi.queue.maxPendingPerApiKey", 1000), + queryDelaySeconds = long(environment, "openapi.queue.queryDelaySeconds", 10), + maxQueryPollCount = int(environment, "openapi.queue.maxQueryPollCount", 30), + maxAttemptCount = int(environment, "openapi.queue.maxAttemptCount", 3), + perApiKeyPerSecond = int(environment, "openapi.rateLimit.perApiKeyPerSecond", 20), + ) } private fun string(environment: ApplicationEnvironment, path: String, default: String): String = @@ -91,4 +114,4 @@ object AppConfig { private fun long(environment: ApplicationEnvironment, path: String, default: Long): Long = string(environment, path, default.toString()).toLong() -} \ No newline at end of file +} diff --git a/server/src/main/kotlin/com/bbit/ticket/utils/bootstrap/DatabaseInitializer.kt b/server/src/main/kotlin/com/bbit/ticket/utils/bootstrap/DatabaseInitializer.kt index 27ce51b..b39d29f 100644 --- a/server/src/main/kotlin/com/bbit/ticket/utils/bootstrap/DatabaseInitializer.kt +++ b/server/src/main/kotlin/com/bbit/ticket/utils/bootstrap/DatabaseInitializer.kt @@ -7,6 +7,8 @@ import com.bbit.ticket.database.piaotong.HistoryInvoiceVoucherTable import com.bbit.ticket.database.piaotong.HistoryInvoiceRedTable import com.bbit.ticket.database.piaotong.OpenInvoiceBatchItemTable import com.bbit.ticket.database.piaotong.OpenInvoiceBatchTable +import com.bbit.ticket.database.piaotong.OpenInvoiceQueueControlTable +import com.bbit.ticket.database.piaotong.OpenInvoiceTaskTable import com.bbit.ticket.database.piaotong.PtDigitalAccountTable import com.bbit.ticket.database.piaotong.PtEnterpriseTable import com.bbit.ticket.database.system.SysApiAccessLogTable @@ -47,6 +49,8 @@ object DatabaseInitializer { HistoryInvoiceOrderTable, OpenInvoiceBatchTable, OpenInvoiceBatchItemTable, + OpenInvoiceTaskTable, + OpenInvoiceQueueControlTable, ) // 先通过 Exposed 生成迁移 SQL,再逐条执行,避免启动时静默跳过缺失表或字段。 transaction { diff --git a/server/src/main/resources/application.yaml b/server/src/main/resources/application.yaml index 75099cf..7ab01ff 100644 --- a/server/src/main/resources/application.yaml +++ b/server/src/main/resources/application.yaml @@ -30,3 +30,14 @@ security: cors: allowedHosts: "localhost:5173,127.0.0.1:5173" + +openapi: + queue: + issueWorkerCount: 4 + queryWorkerCount: 4 + maxPendingPerApiKey: 1000 + queryDelaySeconds: 10 + maxQueryPollCount: 30 + maxAttemptCount: 3 + rateLimit: + perApiKeyPerSecond: 20 diff --git a/web/src/api/piaotong/index.ts b/web/src/api/piaotong/index.ts index 7968ddb..500a63f 100644 --- a/web/src/api/piaotong/index.ts +++ b/web/src/api/piaotong/index.ts @@ -221,6 +221,72 @@ export function openApiStatisticsApi(): Promise { return http.get('/pt/openapi/statistics') } +export interface OpenInvoiceTaskOverviewItem { + digitalAccountId: string + apiKey: string + account?: string | null + status: string + pauseCode?: string | null + reason?: string | null + pending: number + processing: number + success: number + failed: number + waitingAuth: number + total: number + lastCreatedAt?: string | null +} + +export interface OpenInvoiceTaskItem { + id: string + digitalAccountId: string + apiKey: string + account?: string | null + taskType: string + sourceType: string + runMode: string + invoiceReqSerialNo: string + batchNo?: string | null + status: string + ptCode?: string | null + errorMessage?: string | null + attemptCount: number + maxAttemptCount: number + pollCount: number + maxPollCount: number + nextRunAt: string + createdAt: string + updatedAt?: string | null + startedAt?: string | null + finishedAt?: string | null +} + +export function openInvoiceTaskOverviewApi(): Promise { + return http.get('/pt/openapi/tasks/overview') +} + +export function openInvoiceTaskPageApi(params: { + page: number + pageSize: number + digitalAccountId?: string + status?: string | null + sourceType?: string | null + runMode?: string | null +}): Promise> { + return http.get('/pt/openapi/tasks', { params }) +} + +export function pauseOpenInvoiceTaskQueueApi( + digitalAccountId: string, + reason?: string +): Promise { + return http.post(`/pt/openapi/tasks/queues/${digitalAccountId}/pause`, { reason }) +} + +export function resumeOpenInvoiceTaskQueueApi(digitalAccountId: string): Promise { + return http.post(`/pt/openapi/tasks/queues/${digitalAccountId}/resume`) +} + // ============================================= // 开票相关 // ============================================= diff --git a/web/src/features/statistics/openapi/index.vue b/web/src/features/statistics/openapi/index.vue index 7a477f0..6813ebe 100644 --- a/web/src/features/statistics/openapi/index.vue +++ b/web/src/features/statistics/openapi/index.vue @@ -11,6 +11,25 @@ +
+
+ 数电账号 + {{ rows.length }} +
+
+ 待处理 + {{ totals.pending }} +
+
+ 处理中 + {{ totals.processing }} +
+
+ 需认证 + {{ totals.waitingAuth }} +
+
+
+ + + +
+ + + + + + 刷新 + +
+ + +
+
+ +