提升OpenAPI接口高并发能力

This commit is contained in:
BBIT-Kai
2026-05-25 14:41:24 +08:00
parent c2899ae64d
commit 62f9fd5b7f
15 changed files with 1454 additions and 43 deletions
@@ -17,12 +17,15 @@ import com.bbit.ticket.utils.plugins.configureTrace
import com.bbit.ticket.route.piaotong.registerPTAuthRoutes import com.bbit.ticket.route.piaotong.registerPTAuthRoutes
import com.bbit.ticket.route.piaotong.registerPTInvoiceRoutes import com.bbit.ticket.route.piaotong.registerPTInvoiceRoutes
import com.bbit.ticket.route.openapi.registerOpenBlueInvoiceRoutes import com.bbit.ticket.route.openapi.registerOpenBlueInvoiceRoutes
import com.bbit.ticket.route.openapi.registerOpenInvoiceTaskRoutes
import com.bbit.ticket.route.piaotong.registerOpenInvoiceTaskManageRoutes
import com.bbit.ticket.route.system.registerDictRoutes import com.bbit.ticket.route.system.registerDictRoutes
import com.bbit.ticket.route.system.registerLogsQueryRoutes import com.bbit.ticket.route.system.registerLogsQueryRoutes
import com.bbit.ticket.route.system.registerMenuRoutes import com.bbit.ticket.route.system.registerMenuRoutes
import com.bbit.ticket.route.system.registerOrgRoutes import com.bbit.ticket.route.system.registerOrgRoutes
import com.bbit.ticket.route.system.registerRoleRoutes import com.bbit.ticket.route.system.registerRoleRoutes
import com.bbit.ticket.route.system.registerUserRoutes import com.bbit.ticket.route.system.registerUserRoutes
import com.bbit.ticket.service.openapi.OpenInvoiceTaskWorker
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import io.ktor.server.application.Application import io.ktor.server.application.Application
import io.ktor.server.auth.authenticate import io.ktor.server.auth.authenticate
@@ -52,6 +55,7 @@ fun Application.module() {
DatabaseInitializer.initialize() DatabaseInitializer.initialize()
SeedData.seed() SeedData.seed()
} }
OpenInvoiceTaskWorker.start(this)
routing { routing {
get("/health") { get("/health") {
@@ -69,6 +73,9 @@ fun Application.module() {
route("/blue-invoices") { route("/blue-invoices") {
registerOpenBlueInvoiceRoutes() registerOpenBlueInvoiceRoutes()
} }
route("/blue-invoice-tasks") {
registerOpenInvoiceTaskRoutes()
}
route("/f8") { route("/f8") {
} }
@@ -77,6 +84,7 @@ fun Application.module() {
authenticate("auth-jwt") { authenticate("auth-jwt") {
registerPTAuthRoutes() registerPTAuthRoutes()
registerPTInvoiceRoutes() registerPTInvoiceRoutes()
registerOpenInvoiceTaskManageRoutes()
} }
} }
} }
@@ -444,9 +444,18 @@ object BlueInvoiceDao {
it[HistoryInvoiceBasicTable.invDeletedFlag] = req.invDeletedFlag ?: "0" it[HistoryInvoiceBasicTable.invDeletedFlag] = req.invDeletedFlag ?: "0"
} }
fun findInvoiceScopeBySerialNo(invoiceReqSerialNo: String): InvoiceScope { fun findInvoiceScopeBySerialNo(
userId: Uuid,
enterpriseId: Uuid?,
digitalAccountId: Uuid?,
invoiceReqSerialNo: String,
): InvoiceScope {
val row = HistoryInvoiceBasicTable.selectAll() val row = HistoryInvoiceBasicTable.selectAll()
.where { HistoryInvoiceBasicTable.invoiceReqSerialNo eq invoiceReqSerialNo } .where {
invoiceScopeWhere(userId, enterpriseId, digitalAccountId) and
(HistoryInvoiceBasicTable.invoiceReqSerialNo eq invoiceReqSerialNo) and
HistoryInvoiceBasicTable.deletedAt.isNull()
}
.singleOrNull() .singleOrNull()
?: throw com.bbit.ticket.entity.common.BizException("NOT_FOUND", "发票记录不存在") ?: throw com.bbit.ticket.entity.common.BizException("NOT_FOUND", "发票记录不存在")
return InvoiceScope( return InvoiceScope(
@@ -454,6 +463,7 @@ object BlueInvoiceDao {
?: throw com.bbit.ticket.entity.common.BizException("NOT_FOUND", "发票记录不存在用户信息"), ?: throw com.bbit.ticket.entity.common.BizException("NOT_FOUND", "发票记录不存在用户信息"),
enterpriseId = row[HistoryInvoiceBasicTable.enterpriseId], enterpriseId = row[HistoryInvoiceBasicTable.enterpriseId],
digitalAccountId = row[HistoryInvoiceBasicTable.digitalAccountId], digitalAccountId = row[HistoryInvoiceBasicTable.digitalAccountId],
taxpayerNum = row[HistoryInvoiceBasicTable.sellerTaxpayerNum],
) )
} }
@@ -461,6 +471,7 @@ object BlueInvoiceDao {
val userId: Uuid, val userId: Uuid,
val enterpriseId: Uuid?, val enterpriseId: Uuid?,
val digitalAccountId: Uuid?, val digitalAccountId: Uuid?,
val taxpayerNum: String,
) )
fun findRelatedInvoiceReqSerialNos(userId: Uuid, invoiceReqSerialNo: String): List<String> { fun findRelatedInvoiceReqSerialNos(userId: Uuid, invoiceReqSerialNo: String): List<String> {
@@ -0,0 +1,62 @@
package com.bbit.ticket.database.piaotong
import org.jetbrains.exposed.v1.core.Table
import org.jetbrains.exposed.v1.javatime.timestampWithTimeZone
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid
@OptIn(ExperimentalUuidApi::class)
object OpenInvoiceTaskTable : Table("open_invoice_task") {
val id = uuid("id").clientDefault { Uuid.random() }
val apiKey = varchar("api_key", 128)
val userId = uuid("user_id")
val enterpriseId = uuid("enterprise_id").nullable()
val digitalAccountId = uuid("digital_account_id")
val taxpayerNum = varchar("taxpayer_num", 32)
val taxAccount = varchar("tax_account", 64).nullable()
val taskType = varchar("task_type", 32)
val sourceType = varchar("source_type", 32)
val runMode = varchar("run_mode", 16)
val invoiceReqSerialNo = varchar("invoice_req_serial_no", 20)
val batchNo = varchar("batch_no", 64).nullable()
val status = varchar("status", 32)
val ptCode = varchar("pt_code", 64).nullable()
val errorMessage = text("error_message").nullable()
val requestBody = text("request_body").nullable()
val attemptCount = integer("attempt_count").default(0)
val maxAttemptCount = integer("max_attempt_count").default(3)
val pollCount = integer("poll_count").default(0)
val maxPollCount = integer("max_poll_count").default(30)
val nextRunAt = timestampWithTimeZone("next_run_at")
val lockedBy = varchar("locked_by", 64).nullable()
val lockedAt = timestampWithTimeZone("locked_at").nullable()
val createdAt = timestampWithTimeZone("created_at")
val updatedAt = timestampWithTimeZone("updated_at").nullable()
val startedAt = timestampWithTimeZone("started_at").nullable()
val finishedAt = timestampWithTimeZone("finished_at").nullable()
override val primaryKey = PrimaryKey(id)
init {
uniqueIndex(taskType, invoiceReqSerialNo)
index(false, status, taskType, nextRunAt)
index(false, apiKey, status)
index(false, digitalAccountId, status)
index(false, invoiceReqSerialNo)
index(false, batchNo)
}
}
@OptIn(ExperimentalUuidApi::class)
object OpenInvoiceQueueControlTable : Table("open_invoice_queue_control") {
val id = uuid("id").clientDefault { Uuid.random() }
val apiKey = varchar("api_key", 128).uniqueIndex()
val digitalAccountId = uuid("digital_account_id").nullable()
val status = varchar("status", 16)
val pauseCode = varchar("pause_code", 64).nullable()
val reason = text("reason").nullable()
val createdAt = timestampWithTimeZone("created_at")
val updatedAt = timestampWithTimeZone("updated_at").nullable()
override val primaryKey = PrimaryKey(id)
}
@@ -0,0 +1,57 @@
package com.bbit.ticket.entity.openapi
import com.bbit.ticket.entity.common.PageResult
import kotlinx.serialization.Serializable
@Serializable
data class OpenInvoiceTaskSubmitResponse(
val taskId: String,
val invoiceReqSerialNo: String,
val status: String,
val taskType: String,
val runMode: String,
)
@Serializable
data class OpenInvoiceTaskOverviewItem(
val digitalAccountId: String,
val apiKey: String,
val account: String? = null,
val status: String,
val pauseCode: String? = null,
val reason: String? = null,
val pending: Long,
val processing: Long,
val success: Long,
val failed: Long,
val waitingAuth: Long,
val total: Long,
val lastCreatedAt: String? = null,
)
@Serializable
data class OpenInvoiceTaskItem(
val id: String,
val digitalAccountId: String,
val apiKey: String,
val account: String? = null,
val taskType: String,
val sourceType: String,
val runMode: String,
val invoiceReqSerialNo: String,
val batchNo: String? = null,
val status: String,
val ptCode: String? = null,
val errorMessage: String? = null,
val attemptCount: Int,
val maxAttemptCount: Int,
val pollCount: Int,
val maxPollCount: Int,
val nextRunAt: String,
val createdAt: String,
val updatedAt: String? = null,
val startedAt: String? = null,
val finishedAt: String? = null,
)
typealias OpenInvoiceTaskPage = PageResult<OpenInvoiceTaskItem>
@@ -84,7 +84,7 @@ fun Route.registerOpenBlueInvoiceRoutes() {
* @param requestBody 请求体 JSON 文本。 * @param requestBody 请求体 JSON 文本。
* @param block 当前接口要执行的业务逻辑。 * @param block 当前接口要执行的业务逻辑。
*/ */
private suspend inline fun <reified T> ApplicationCall.respondOpenApi( internal suspend inline fun <reified T> ApplicationCall.respondOpenApi(
principal: OpenApiPrincipal, principal: OpenApiPrincipal,
interfaceCode: String, interfaceCode: String,
requestBody: String?, requestBody: String?,
@@ -0,0 +1,35 @@
package com.bbit.ticket.route.openapi
import com.bbit.ticket.entity.openapi.OpenBlueInvoiceCreateRequest
import com.bbit.ticket.service.openapi.OpenInvoiceTaskService
import com.bbit.ticket.utils.requireOpenApiPrincipal
import io.ktor.server.request.receive
import io.ktor.server.routing.Route
import io.ktor.server.routing.post
fun Route.registerOpenInvoiceTaskRoutes() {
post("/test") {
val principal = call.requireOpenApiPrincipal()
val request = call.receive<OpenBlueInvoiceCreateRequest>()
call.respondOpenApi(principal, "blue-invoice-task.test", null) {
OpenInvoiceTaskService.createIssueTask(
principal = principal,
request = request,
runMode = OpenInvoiceTaskService.MODE_SIMULATED,
sourceType = OpenInvoiceTaskService.SOURCE_TEST,
)
}
}
post("/production") {
val principal = call.requireOpenApiPrincipal()
val request = call.receive<OpenBlueInvoiceCreateRequest>()
call.respondOpenApi(principal, "blue-invoice-task.production", null) {
OpenInvoiceTaskService.createIssueTask(
principal = principal,
request = request,
runMode = OpenInvoiceTaskService.MODE_REAL,
)
}
}
}
@@ -0,0 +1,49 @@
package com.bbit.ticket.route.piaotong
import com.bbit.ticket.service.openapi.OpenInvoiceTaskService
import com.bbit.ticket.utils.requireCurrentUser
import io.ktor.server.request.receiveNullable
import io.ktor.server.routing.Route
import io.ktor.server.routing.get
import io.ktor.server.routing.post
fun Route.registerOpenInvoiceTaskManageRoutes() {
get("/openapi/tasks/overview") {
call.respondPt("查询 OpenAPI 任务概览失败") {
OpenInvoiceTaskService.overview(call.requireCurrentUser())
}
}
get("/openapi/tasks") {
call.respondPt("查询 OpenAPI 任务列表失败") {
OpenInvoiceTaskService.taskPage(
user = call.requireCurrentUser(),
digitalAccountId = call.request.queryParameters["digitalAccountId"],
status = call.request.queryParameters["status"],
sourceType = call.request.queryParameters["sourceType"],
runMode = call.request.queryParameters["runMode"],
page = call.request.queryParameters["page"]?.toIntOrNull() ?: 1,
pageSize = call.request.queryParameters["pageSize"]?.toIntOrNull() ?: 20,
)
}
}
post("/openapi/tasks/queues/{digitalAccountId}/pause") {
val digitalAccountId = call.parameters["digitalAccountId"].orEmpty()
val body = call.receiveNullable<Map<String, String>>() ?: emptyMap()
call.respondPt("暂停 OpenAPI 队列失败") {
OpenInvoiceTaskService.pauseQueue(
user = call.requireCurrentUser(),
digitalAccountId = digitalAccountId,
reason = body["reason"],
)
}
}
post("/openapi/tasks/queues/{digitalAccountId}/resume") {
val digitalAccountId = call.parameters["digitalAccountId"].orEmpty()
call.respondPt("恢复 OpenAPI 队列失败") {
OpenInvoiceTaskService.resumeQueue(call.requireCurrentUser(), digitalAccountId)
}
}
}
@@ -4,8 +4,6 @@ package com.bbit.ticket.route.piaotong
import com.bbit.ticket.entity.common.BizException import com.bbit.ticket.entity.common.BizException
import com.bbit.ticket.entity.request.AskInvoiceRequest import com.bbit.ticket.entity.request.AskInvoiceRequest
import com.bbit.ticket.entity.request.InvoiceQuerySubmitRequest
import com.bbit.ticket.entity.request.QueryInvoiceRequest
import com.bbit.ticket.entity.request.RedCreateRequest import com.bbit.ticket.entity.request.RedCreateRequest
import com.bbit.ticket.service.piaotong.PTBlueService import com.bbit.ticket.service.piaotong.PTBlueService
import com.bbit.ticket.service.piaotong.PTRedService import com.bbit.ticket.service.piaotong.PTRedService
@@ -103,17 +101,7 @@ fun Route.registerPTInvoiceRoutes() {
val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号") val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号")
?: return@get ?: return@get
call.respondPt("刷新发票状态失败") { call.respondPt("刷新发票状态失败") {
val currentUser = call.requireCurrentUser() PTBlueService.queryInvoiceAllInfo(call.requireCurrentUser(), invoiceReqSerialNo)
val account = com.bbit.ticket.service.piaotong.PTConfigService.requireDigitalAccountForAction(
currentUser,
call.request.queryParameters["digitalAccountId"],
)
PTBlueService.queryInvoiceAllInfo(
QueryInvoiceRequest(
taxpayerNum = account.taxpayerNum,
invoiceReqSerialNo = invoiceReqSerialNo,
)
)
} }
} }
@@ -0,0 +1,742 @@
@file:OptIn(ExperimentalUuidApi::class)
package com.bbit.ticket.service.openapi
import com.bbit.ticket.dao.piaotong.BlueInvoiceDao
import com.bbit.ticket.database.piaotong.HistoryInvoiceBasicTable
import com.bbit.ticket.database.piaotong.OpenInvoiceQueueControlTable
import com.bbit.ticket.database.piaotong.OpenInvoiceTaskTable
import com.bbit.ticket.database.piaotong.PtDigitalAccountTable
import com.bbit.ticket.entity.common.BizException
import com.bbit.ticket.entity.common.ErrorCode
import com.bbit.ticket.entity.common.PTException
import com.bbit.ticket.entity.common.PageResult
import com.bbit.ticket.entity.openapi.OpenBlueInvoiceCreateRequest
import com.bbit.ticket.entity.openapi.OpenInvoiceTaskItem
import com.bbit.ticket.entity.openapi.OpenInvoiceTaskOverviewItem
import com.bbit.ticket.entity.openapi.OpenInvoiceTaskSubmitResponse
import com.bbit.ticket.entity.request.AskInvoiceRequest
import com.bbit.ticket.entity.request.QueryInvoiceRequest
import com.bbit.ticket.service.piaotong.PTConfigService
import com.bbit.ticket.utils.CurrentUser
import com.bbit.ticket.utils.OpenApiPrincipal
import com.bbit.ticket.utils.formatDateTime
import com.bbit.ticket.utils.net.PTApi
import com.bbit.ticket.utils.plugins.dbQuery
import com.bbit.ticket.utils.plugins.myJson
import io.ktor.server.application.Application
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import org.jetbrains.exposed.v1.core.Op
import org.jetbrains.exposed.v1.core.ResultRow
import org.jetbrains.exposed.v1.core.SortOrder
import org.jetbrains.exposed.v1.core.and
import org.jetbrains.exposed.v1.core.eq
import org.jetbrains.exposed.v1.core.inList
import org.jetbrains.exposed.v1.core.isNull
import org.jetbrains.exposed.v1.core.lessEq
import org.jetbrains.exposed.v1.core.notInList
import org.jetbrains.exposed.v1.core.or
import org.jetbrains.exposed.v1.jdbc.Query
import org.jetbrains.exposed.v1.jdbc.insert
import org.jetbrains.exposed.v1.jdbc.selectAll
import org.jetbrains.exposed.v1.jdbc.update
import java.time.OffsetDateTime
import java.util.concurrent.ConcurrentHashMap
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid
object OpenInvoiceTaskService {
const val TASK_ISSUE_BLUE = "ISSUE_BLUE"
const val TASK_QUERY_BLUE = "QUERY_BLUE"
const val SOURCE_SINGLE = "SINGLE"
const val SOURCE_TEST = "TEST"
const val MODE_REAL = "REAL"
const val MODE_SIMULATED = "SIMULATED"
private const val STATUS_PENDING = "PENDING"
private const val STATUS_PROCESSING = "PROCESSING"
private const val STATUS_SUCCESS = "SUCCESS"
private const val STATUS_FAILED = "FAILED"
private const val STATUS_WAITING_AUTH = "WAITING_AUTH"
private const val QUEUE_RUNNING = "RUNNING"
private const val QUEUE_PAUSED = "PAUSED"
private const val AUTH_REQUIRED_CODE = "3999"
suspend fun createIssueTask(
principal: OpenApiPrincipal,
request: OpenBlueInvoiceCreateRequest,
runMode: String,
sourceType: String = SOURCE_SINGLE,
batchNo: String? = null,
): OpenInvoiceTaskSubmitResponse {
validateRequest(request)
val invoiceReqSerialNo = request.invoiceReqSerialNo?.trim()
?: throw BizException(ErrorCode.BAD_REQUEST.code, "invoiceReqSerialNo 不能为空")
if (invoiceReqSerialNo.length > 20) {
throw BizException(ErrorCode.BAD_REQUEST.code, "invoiceReqSerialNo 长度不能超过 20")
}
OpenApiRateLimiter.check(principal.apiKey)
val now = OffsetDateTime.now()
val existing = dbQuery {
OpenInvoiceTaskTable.selectAll()
.where {
(OpenInvoiceTaskTable.taskType eq TASK_ISSUE_BLUE) and
(OpenInvoiceTaskTable.invoiceReqSerialNo eq invoiceReqSerialNo)
}
.singleOrNull()
}
if (existing != null) {
if (existing[OpenInvoiceTaskTable.runMode] != runMode) {
throw BizException(
ErrorCode.BAD_REQUEST.code,
"invoiceReqSerialNo 已存在 ${existing[OpenInvoiceTaskTable.runMode]} 任务,不能重复创建 $runMode 任务",
)
}
return existing.toSubmitResponse()
}
val historyExists = dbQuery {
HistoryInvoiceBasicTable.selectAll()
.where { HistoryInvoiceBasicTable.invoiceReqSerialNo eq invoiceReqSerialNo }
.singleOrNull() != null
}
if (historyExists) {
throw BizException(ErrorCode.BAD_REQUEST.code, "invoiceReqSerialNo 已存在历史发票记录,请勿重复开票")
}
ensureQueueCanAccept(principal.apiKey)
val taskId = dbQuery {
OpenInvoiceTaskTable.insert {
it[apiKey] = principal.apiKey
it[userId] = principal.userId
it[enterpriseId] = principal.enterpriseId
it[digitalAccountId] = principal.digitalAccountId
it[taxpayerNum] = principal.taxPayerNum
it[taxAccount] = principal.taxAccount
it[taskType] = TASK_ISSUE_BLUE
it[OpenInvoiceTaskTable.sourceType] = sourceType
it[OpenInvoiceTaskTable.runMode] = runMode
it[OpenInvoiceTaskTable.invoiceReqSerialNo] = invoiceReqSerialNo
it[OpenInvoiceTaskTable.batchNo] = batchNo
it[status] = STATUS_PENDING
it[requestBody] = myJson.encodeToString(request.copy(invoiceReqSerialNo = invoiceReqSerialNo))
it[maxAttemptCount] = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.maxAttemptCount
it[maxPollCount] = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.maxQueryPollCount
it[nextRunAt] = now
it[createdAt] = now
}[OpenInvoiceTaskTable.id]
}
return OpenInvoiceTaskSubmitResponse(
taskId = taskId.toString(),
invoiceReqSerialNo = invoiceReqSerialNo,
status = STATUS_PENDING,
taskType = TASK_ISSUE_BLUE,
runMode = runMode,
)
}
suspend fun overview(user: CurrentUser): List<OpenInvoiceTaskOverviewItem> {
val rows = dbQuery {
val tasks = scopedTaskQuery(user).toList()
val accountRows = PtDigitalAccountTable.selectAll()
.where { accountScope(user) }
.associateBy { it[PtDigitalAccountTable.id] }
val controls = OpenInvoiceQueueControlTable.selectAll()
.associateBy { it[OpenInvoiceQueueControlTable.apiKey] }
val taskGroups = tasks.groupBy { it[OpenInvoiceTaskTable.digitalAccountId] }
accountRows.map { (digitalAccountId, account) ->
val group = taskGroups[digitalAccountId].orEmpty()
val apiKey = account[PtDigitalAccountTable.apiKey].orEmpty()
val control = controls[apiKey]
OpenInvoiceTaskOverviewItem(
digitalAccountId = digitalAccountId.toString(),
apiKey = apiKey,
account = account[PtDigitalAccountTable.account],
status = control?.get(OpenInvoiceQueueControlTable.status) ?: QUEUE_RUNNING,
pauseCode = control?.get(OpenInvoiceQueueControlTable.pauseCode),
reason = control?.get(OpenInvoiceQueueControlTable.reason),
pending = group.count { it[OpenInvoiceTaskTable.status] == STATUS_PENDING }.toLong(),
processing = group.count { it[OpenInvoiceTaskTable.status] == STATUS_PROCESSING }.toLong(),
success = group.count { it[OpenInvoiceTaskTable.status] == STATUS_SUCCESS }.toLong(),
failed = group.count { it[OpenInvoiceTaskTable.status] == STATUS_FAILED }.toLong(),
waitingAuth = group.count { it[OpenInvoiceTaskTable.status] == STATUS_WAITING_AUTH }.toLong(),
total = group.size.toLong(),
lastCreatedAt = formatDateTime(group.maxOfOrNull { it[OpenInvoiceTaskTable.createdAt] }),
)
}.sortedByDescending { it.lastCreatedAt ?: "" }
}
return rows
}
suspend fun taskPage(
user: CurrentUser,
digitalAccountId: String?,
status: String?,
sourceType: String?,
runMode: String?,
page: Int,
pageSize: Int,
): PageResult<OpenInvoiceTaskItem> = dbQuery {
var where = taskScope(user)
digitalAccountId?.takeIf { it.isNotBlank() }?.let {
where = where and (OpenInvoiceTaskTable.digitalAccountId eq Uuid.parse(it))
}
status?.takeIf { it.isNotBlank() }?.let {
where = where and (OpenInvoiceTaskTable.status eq it)
}
sourceType?.takeIf { it.isNotBlank() }?.let {
where = where and (OpenInvoiceTaskTable.sourceType eq it)
}
runMode?.takeIf { it.isNotBlank() }?.let {
where = where and (OpenInvoiceTaskTable.runMode eq it)
}
val accountRows = PtDigitalAccountTable.selectAll()
.where { accountScope(user) }
.associateBy { it[PtDigitalAccountTable.id] }
val total = OpenInvoiceTaskTable.selectAll().where { where }.count()
val items = OpenInvoiceTaskTable.selectAll()
.where { where }
.orderBy(OpenInvoiceTaskTable.createdAt, SortOrder.DESC)
.limit(pageSize)
.offset(((page - 1).coerceAtLeast(0) * pageSize).toLong())
.map { it.toTaskItem(accountRows[it[OpenInvoiceTaskTable.digitalAccountId]]?.get(PtDigitalAccountTable.account)) }
PageResult(items, page, pageSize, total)
}
suspend fun pauseQueue(user: CurrentUser, digitalAccountId: String, reason: String?): String {
val account = requireManagedAccount(user, digitalAccountId)
pauseApiKey(account[PtDigitalAccountTable.apiKey] ?: throw BizException(ErrorCode.BAD_REQUEST.code, "数电账号未配置 api-key"), AUTH_REQUIRED_CODE, reason ?: "手动暂停")
return "队列已暂停"
}
suspend fun resumeQueue(user: CurrentUser, digitalAccountId: String): String {
val account = requireManagedAccount(user, digitalAccountId)
val apiKey = account[PtDigitalAccountTable.apiKey] ?: throw BizException(ErrorCode.BAD_REQUEST.code, "数电账号未配置 api-key")
val now = OffsetDateTime.now()
dbQuery {
OpenInvoiceQueueControlTable.update({ OpenInvoiceQueueControlTable.apiKey eq apiKey }) {
it[status] = QUEUE_RUNNING
it[pauseCode] = null
it[reason] = null
it[updatedAt] = now
}
OpenInvoiceTaskTable.update({
(OpenInvoiceTaskTable.apiKey eq apiKey) and
(OpenInvoiceTaskTable.status eq STATUS_WAITING_AUTH)
}) {
it[status] = STATUS_PENDING
it[nextRunAt] = now
it[lockedBy] = null
it[lockedAt] = null
it[updatedAt] = now
}
}
return "队列已恢复"
}
internal suspend fun processIssueTasks(workerId: String, limit: Int) {
val tasks = claimTasks(TASK_ISSUE_BLUE, workerId, limit)
tasks.forEach { processIssueTask(it) }
}
internal suspend fun processQueryTasks(workerId: String, limit: Int) {
val tasks = claimTasks(TASK_QUERY_BLUE, workerId, limit)
tasks.forEach { processQueryTask(it) }
}
private suspend fun processIssueTask(task: ResultRow) {
val taskId = task[OpenInvoiceTaskTable.id]
val runMode = task[OpenInvoiceTaskTable.runMode]
val request = task[OpenInvoiceTaskTable.requestBody]
?.let { myJson.decodeFromString<OpenBlueInvoiceCreateRequest>(it) }
?: return failTask(taskId, "REQUEST_BODY_NULL", "任务请求体为空")
val askRequest = request.toAskInvoiceRequest(task)
createHistoryPlaceholder(task, askRequest)
try {
if (runMode == MODE_SIMULATED) {
delay(2000)
} else {
PTApi.invoiceBlue(askRequest)
}
completeIssueTask(task)
} catch (e: PTException) {
failTask(taskId, e.code, e.message)
if (e.code == AUTH_REQUIRED_CODE) {
pauseApiKey(task[OpenInvoiceTaskTable.apiKey], e.code, e.message)
}
} catch (e: Exception) {
retryOrFail(task, "EXCEPTION", e.message ?: "开票任务执行失败")
}
}
private suspend fun processQueryTask(task: ResultRow) {
val taskId = task[OpenInvoiceTaskTable.id]
val runMode = task[OpenInvoiceTaskTable.runMode]
val invoiceReqSerialNo = task[OpenInvoiceTaskTable.invoiceReqSerialNo]
try {
val code = if (runMode == MODE_SIMULATED) {
delay(2000)
simulatedQueryCode(invoiceReqSerialNo, task[OpenInvoiceTaskTable.pollCount])
} else {
val res = PTApi.queryInvoiceInfo(
QueryInvoiceRequest(
taxpayerNum = task[OpenInvoiceTaskTable.taxpayerNum],
invoiceReqSerialNo = invoiceReqSerialNo,
)
)
dbQuery {
BlueInvoiceDao.upsertInvoiceInfo(
task[OpenInvoiceTaskTable.userId],
res,
task[OpenInvoiceTaskTable.enterpriseId],
task[OpenInvoiceTaskTable.digitalAccountId],
)
}
res.code
}
handleQueryCode(task, code)
} catch (e: PTException) {
retryOrFail(task, e.code, e.message)
} catch (e: Exception) {
retryOrFail(task, "EXCEPTION", e.message ?: "查询任务执行失败")
}
}
private suspend fun completeIssueTask(task: ResultRow) {
val now = OffsetDateTime.now()
val invoiceReqSerialNo = task[OpenInvoiceTaskTable.invoiceReqSerialNo]
dbQuery {
OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) {
it[status] = STATUS_SUCCESS
it[ptCode] = "0000"
it[errorMessage] = null
it[finishedAt] = now
it[updatedAt] = now
it[lockedBy] = null
it[lockedAt] = null
}
val exists = OpenInvoiceTaskTable.selectAll()
.where {
(OpenInvoiceTaskTable.taskType eq TASK_QUERY_BLUE) and
(OpenInvoiceTaskTable.invoiceReqSerialNo eq invoiceReqSerialNo)
}
.singleOrNull() != null
if (!exists) {
OpenInvoiceTaskTable.insert {
it[apiKey] = task[OpenInvoiceTaskTable.apiKey]
it[userId] = task[OpenInvoiceTaskTable.userId]
it[enterpriseId] = task[OpenInvoiceTaskTable.enterpriseId]
it[digitalAccountId] = task[OpenInvoiceTaskTable.digitalAccountId]
it[taxpayerNum] = task[OpenInvoiceTaskTable.taxpayerNum]
it[taxAccount] = task[OpenInvoiceTaskTable.taxAccount]
it[taskType] = TASK_QUERY_BLUE
it[sourceType] = task[OpenInvoiceTaskTable.sourceType]
it[runMode] = task[OpenInvoiceTaskTable.runMode]
it[OpenInvoiceTaskTable.invoiceReqSerialNo] = invoiceReqSerialNo
it[batchNo] = task[OpenInvoiceTaskTable.batchNo]
it[status] = STATUS_PENDING
it[requestBody] = task[OpenInvoiceTaskTable.taxpayerNum]
it[maxAttemptCount] = task[OpenInvoiceTaskTable.maxAttemptCount]
it[maxPollCount] = task[OpenInvoiceTaskTable.maxPollCount]
it[nextRunAt] = now.plusSeconds(com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.queryDelaySeconds)
it[createdAt] = now
}
}
}
}
private suspend fun handleQueryCode(task: ResultRow, code: String) {
when (code) {
"0000" -> finishQueryTask(task, STATUS_SUCCESS, code, null)
"9999" -> finishQueryTask(task, STATUS_FAILED, code, "开票失败")
AUTH_REQUIRED_CODE -> {
val message = "需要登录/风险认证"
dbQuery {
OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) {
it[status] = STATUS_WAITING_AUTH
it[ptCode] = code
it[errorMessage] = message
it[updatedAt] = OffsetDateTime.now()
it[lockedBy] = null
it[lockedAt] = null
}
}
pauseApiKey(task[OpenInvoiceTaskTable.apiKey], code, message)
}
"7777", "6666" -> requeueQueryTask(task, code)
else -> requeueQueryTask(task, code)
}
}
private suspend fun finishQueryTask(task: ResultRow, status: String, code: String, message: String?) {
val now = OffsetDateTime.now()
dbQuery {
OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) {
it[OpenInvoiceTaskTable.status] = status
it[ptCode] = code
it[errorMessage] = message
it[finishedAt] = now
it[updatedAt] = now
it[lockedBy] = null
it[lockedAt] = null
}
HistoryInvoiceBasicTable.update({
HistoryInvoiceBasicTable.invoiceReqSerialNo eq task[OpenInvoiceTaskTable.invoiceReqSerialNo]
}) {
it[HistoryInvoiceBasicTable.code] = code
it[msg] = message ?: if (code == "0000") "开票成功" else "开票失败"
it[updatedAt] = now
}
}
}
private suspend fun requeueQueryTask(task: ResultRow, code: String) {
val nextPollCount = task[OpenInvoiceTaskTable.pollCount] + 1
if (nextPollCount >= task[OpenInvoiceTaskTable.maxPollCount]) {
finishQueryTask(task, STATUS_FAILED, code, "查询超过最大次数")
return
}
val now = OffsetDateTime.now()
dbQuery {
OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) {
it[status] = STATUS_PENDING
it[ptCode] = code
it[pollCount] = nextPollCount
it[nextRunAt] = now.plusSeconds(com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.queryDelaySeconds)
it[updatedAt] = now
it[lockedBy] = null
it[lockedAt] = null
}
HistoryInvoiceBasicTable.update({
HistoryInvoiceBasicTable.invoiceReqSerialNo eq task[OpenInvoiceTaskTable.invoiceReqSerialNo]
}) {
it[HistoryInvoiceBasicTable.code] = code
it[msg] = if (code == "6666") "未开票" else "开票中..."
it[updatedAt] = now
}
}
}
private suspend fun retryOrFail(task: ResultRow, code: String, message: String) {
val nextAttempt = task[OpenInvoiceTaskTable.attemptCount] + 1
if (nextAttempt >= task[OpenInvoiceTaskTable.maxAttemptCount]) {
failTask(task[OpenInvoiceTaskTable.id], code, message)
return
}
val now = OffsetDateTime.now()
dbQuery {
OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq task[OpenInvoiceTaskTable.id] }) {
it[status] = STATUS_PENDING
it[ptCode] = code
it[errorMessage] = message
it[attemptCount] = nextAttempt
it[nextRunAt] = now.plusSeconds(10L * nextAttempt)
it[updatedAt] = now
it[lockedBy] = null
it[lockedAt] = null
}
}
}
private suspend fun failTask(taskId: Uuid, code: String, message: String) {
val now = OffsetDateTime.now()
dbQuery {
val task = OpenInvoiceTaskTable.selectAll()
.where { OpenInvoiceTaskTable.id eq taskId }
.singleOrNull()
OpenInvoiceTaskTable.update({ OpenInvoiceTaskTable.id eq taskId }) {
it[status] = STATUS_FAILED
it[ptCode] = code
it[errorMessage] = message
it[finishedAt] = now
it[updatedAt] = now
it[lockedBy] = null
it[lockedAt] = null
}
if (task != null) {
HistoryInvoiceBasicTable.update({
HistoryInvoiceBasicTable.invoiceReqSerialNo eq task[OpenInvoiceTaskTable.invoiceReqSerialNo]
}) {
it[HistoryInvoiceBasicTable.code] = code
it[msg] = message
it[updatedAt] = now
}
}
}
}
private suspend fun claimTasks(taskType: String, workerId: String, limit: Int): List<ResultRow> {
val now = OffsetDateTime.now()
val candidateIds = dbQuery {
val pausedApiKeys = OpenInvoiceQueueControlTable.selectAll()
.where { OpenInvoiceQueueControlTable.status eq QUEUE_PAUSED }
.map { it[OpenInvoiceQueueControlTable.apiKey] }
var where = (OpenInvoiceTaskTable.taskType eq taskType) and
(OpenInvoiceTaskTable.status eq STATUS_PENDING) and
(OpenInvoiceTaskTable.nextRunAt lessEq now)
if (pausedApiKeys.isNotEmpty()) {
where = where and (OpenInvoiceTaskTable.apiKey notInList pausedApiKeys)
}
OpenInvoiceTaskTable.selectAll()
.where { where }
.orderBy(OpenInvoiceTaskTable.createdAt, SortOrder.ASC)
.limit(limit)
.map { it[OpenInvoiceTaskTable.id] }
}
if (candidateIds.isEmpty()) {
return emptyList()
}
return dbQuery {
candidateIds.mapNotNull { id ->
val updated = OpenInvoiceTaskTable.update({
(OpenInvoiceTaskTable.id eq id) and (OpenInvoiceTaskTable.status eq STATUS_PENDING)
}) {
it[status] = STATUS_PROCESSING
it[lockedBy] = workerId
it[lockedAt] = now
it[startedAt] = now
it[updatedAt] = now
}
if (updated == 1) {
OpenInvoiceTaskTable.selectAll().where { OpenInvoiceTaskTable.id eq id }.singleOrNull()
} else {
null
}
}
}
}
private suspend fun createHistoryPlaceholder(task: ResultRow, request: AskInvoiceRequest) = dbQuery {
val exists = HistoryInvoiceBasicTable.selectAll()
.where { HistoryInvoiceBasicTable.invoiceReqSerialNo eq request.invoiceReqSerialNo }
.singleOrNull() != null
if (!exists) {
val now = OffsetDateTime.now()
HistoryInvoiceBasicTable.insert {
it[userId] = task[OpenInvoiceTaskTable.userId]
it[enterpriseId] = task[OpenInvoiceTaskTable.enterpriseId]
it[digitalAccountId] = task[OpenInvoiceTaskTable.digitalAccountId]
it[invoiceReqSerialNo] = request.invoiceReqSerialNo
it[code] = "7777"
it[msg] = "开票中..."
it[sellerTaxpayerNum] = request.taxpayerNum
it[invoiceKind] = request.invoiceIssueKindCode
it[invoiceType] = "1"
it[invDeletedFlag] = "0"
it[createdAt] = now
}
}
}
private suspend fun pauseApiKey(apiKey: String, code: String, reason: String) {
val now = OffsetDateTime.now()
dbQuery {
val exists = OpenInvoiceQueueControlTable.selectAll()
.where { OpenInvoiceQueueControlTable.apiKey eq apiKey }
.singleOrNull()
if (exists == null) {
val digitalAccountId = PtDigitalAccountTable.selectAll()
.where { PtDigitalAccountTable.apiKey eq apiKey }
.singleOrNull()
?.get(PtDigitalAccountTable.id)
OpenInvoiceQueueControlTable.insert {
it[OpenInvoiceQueueControlTable.apiKey] = apiKey
it[OpenInvoiceQueueControlTable.digitalAccountId] = digitalAccountId
it[status] = QUEUE_PAUSED
it[pauseCode] = code
it[OpenInvoiceQueueControlTable.reason] = reason
it[createdAt] = now
}
} else {
OpenInvoiceQueueControlTable.update({ OpenInvoiceQueueControlTable.apiKey eq apiKey }) {
it[status] = QUEUE_PAUSED
it[pauseCode] = code
it[OpenInvoiceQueueControlTable.reason] = reason
it[updatedAt] = now
}
}
}
}
private suspend fun ensureQueueCanAccept(apiKey: String) {
val maxPending = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.maxPendingPerApiKey
val pending = dbQuery {
OpenInvoiceTaskTable.selectAll()
.where {
(OpenInvoiceTaskTable.apiKey eq apiKey) and
((OpenInvoiceTaskTable.status eq STATUS_PENDING) or (OpenInvoiceTaskTable.status eq STATUS_PROCESSING))
}
.count()
}
if (pending >= maxPending) {
throw BizException(ErrorCode.BAD_REQUEST.code, "当前 api-key 任务积压已达上限")
}
}
private fun validateRequest(request: OpenBlueInvoiceCreateRequest) {
if (request.buyerName.isBlank()) {
throw BizException(ErrorCode.BAD_REQUEST.code, "buyerName 不能为空")
}
if (request.itemList.isEmpty()) {
throw BizException(ErrorCode.BAD_REQUEST.code, "itemList 不能为空")
}
}
private fun OpenBlueInvoiceCreateRequest.toAskInvoiceRequest(task: ResultRow): AskInvoiceRequest =
AskInvoiceRequest(
taxpayerNum = task[OpenInvoiceTaskTable.taxpayerNum],
invoiceReqSerialNo = invoiceReqSerialNo ?: "",
invoiceIssueKindCode = invoiceIssueKindCode,
buyerName = buyerName,
purchaseInvSellerIdType = purchaseInvSellerIdType,
buyerTaxpayerNum = buyerTaxpayerNum,
naturalPersonFlag = naturalPersonFlag,
buyerAddress = buyerAddress,
buyerTel = buyerTel,
buyerBankName = buyerBankName,
buyerBankAccount = buyerBankAccount,
showBuyerBank = showBuyerBank,
showSellerBank = showSellerBank,
showBuyerAddrTel = showBuyerAddrTel,
showSellerAddrTel = showSellerAddrTel,
account = task[OpenInvoiceTaskTable.taxAccount],
variableLevyFlag = variableLevyFlag,
casherName = casherName,
reviewerName = reviewerName,
takerName = takerName,
takerTel = takerTel,
takerEmail = takerEmail,
specialInvoiceKind = specialInvoiceKind,
remark = remark,
definedData = definedData,
tradeNo = tradeNo ?: invoiceReqSerialNo,
shopNum = shopNum,
itemList = itemList,
variableLevyProofList = variableLevyProofList,
orderList = orderList,
)
private fun simulatedQueryCode(invoiceReqSerialNo: String, pollCount: Int): String =
when {
invoiceReqSerialNo.contains("3999", ignoreCase = true) -> AUTH_REQUIRED_CODE
invoiceReqSerialNo.contains("FAIL", ignoreCase = true) -> "9999"
pollCount <= 0 -> "7777"
pollCount == 1 -> "6666"
else -> "0000"
}
private fun scopedTaskQuery(user: CurrentUser): Query =
OpenInvoiceTaskTable.selectAll().where { taskScope(user) }
private fun taskScope(user: CurrentUser): Op<Boolean> =
when {
user.isSuperAdmin -> Op.TRUE
user.isDigitalOperator && user.digitalAccountId != null -> OpenInvoiceTaskTable.digitalAccountId eq user.digitalAccountId
user.enterpriseId != null -> OpenInvoiceTaskTable.enterpriseId eq user.enterpriseId
else -> OpenInvoiceTaskTable.userId eq user.id
}
private fun accountScope(user: CurrentUser): Op<Boolean> =
when {
user.isSuperAdmin -> Op.TRUE
user.isDigitalOperator && user.digitalAccountId != null -> PtDigitalAccountTable.id eq user.digitalAccountId
user.enterpriseId != null -> PtDigitalAccountTable.enterpriseId eq user.enterpriseId
else -> PtDigitalAccountTable.platformUserId eq user.id
}
private suspend fun requireManagedAccount(user: CurrentUser, digitalAccountId: String): ResultRow = dbQuery {
PtDigitalAccountTable.selectAll()
.where { accountScope(user) and (PtDigitalAccountTable.id eq Uuid.parse(digitalAccountId)) }
.singleOrNull()
} ?: throw BizException(ErrorCode.BAD_REQUEST.code, "数电账号不存在或无权操作")
private fun ResultRow.toSubmitResponse(): OpenInvoiceTaskSubmitResponse =
OpenInvoiceTaskSubmitResponse(
taskId = this[OpenInvoiceTaskTable.id].toString(),
invoiceReqSerialNo = this[OpenInvoiceTaskTable.invoiceReqSerialNo],
status = this[OpenInvoiceTaskTable.status],
taskType = this[OpenInvoiceTaskTable.taskType],
runMode = this[OpenInvoiceTaskTable.runMode],
)
private fun ResultRow.toTaskItem(account: String?): OpenInvoiceTaskItem =
OpenInvoiceTaskItem(
id = this[OpenInvoiceTaskTable.id].toString(),
digitalAccountId = this[OpenInvoiceTaskTable.digitalAccountId].toString(),
apiKey = this[OpenInvoiceTaskTable.apiKey],
account = account,
taskType = this[OpenInvoiceTaskTable.taskType],
sourceType = this[OpenInvoiceTaskTable.sourceType],
runMode = this[OpenInvoiceTaskTable.runMode],
invoiceReqSerialNo = this[OpenInvoiceTaskTable.invoiceReqSerialNo],
batchNo = this[OpenInvoiceTaskTable.batchNo],
status = this[OpenInvoiceTaskTable.status],
ptCode = this[OpenInvoiceTaskTable.ptCode],
errorMessage = this[OpenInvoiceTaskTable.errorMessage],
attemptCount = this[OpenInvoiceTaskTable.attemptCount],
maxAttemptCount = this[OpenInvoiceTaskTable.maxAttemptCount],
pollCount = this[OpenInvoiceTaskTable.pollCount],
maxPollCount = this[OpenInvoiceTaskTable.maxPollCount],
nextRunAt = formatDateTime(this[OpenInvoiceTaskTable.nextRunAt]) ?: "",
createdAt = formatDateTime(this[OpenInvoiceTaskTable.createdAt]) ?: "",
updatedAt = formatDateTime(this[OpenInvoiceTaskTable.updatedAt]),
startedAt = formatDateTime(this[OpenInvoiceTaskTable.startedAt]),
finishedAt = formatDateTime(this[OpenInvoiceTaskTable.finishedAt]),
)
}
object OpenInvoiceTaskWorker {
fun start(application: Application) {
val config = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue
repeat(config.issueWorkerCount) { index ->
launchWorker(application, "issue-$index") {
OpenInvoiceTaskService.processIssueTasks("issue-$index", 1)
}
}
repeat(config.queryWorkerCount) { index ->
launchWorker(application, "query-$index") {
OpenInvoiceTaskService.processQueryTasks("query-$index", 1)
}
}
}
private fun launchWorker(application: Application, name: String, block: suspend () -> Unit) {
CoroutineScope(application.coroutineContext + Dispatchers.Default).launch {
while (isActive) {
runCatching { block() }
delay(1000)
}
}
}
}
private object OpenApiRateLimiter {
private data class Bucket(val second: Long, val count: Int)
private val buckets = ConcurrentHashMap<String, Bucket>()
fun check(apiKey: String) {
val limit = com.bbit.ticket.utils.bootstrap.AppConfig.openApiQueue.perApiKeyPerSecond
val nowSecond = System.currentTimeMillis() / 1000
val updated = buckets.compute(apiKey) { _, old ->
if (old == null || old.second != nowSecond) {
Bucket(nowSecond, 1)
} else {
Bucket(nowSecond, old.count + 1)
}
} ?: Bucket(nowSecond, 1)
if (updated.count > limit) {
throw BizException(ErrorCode.BAD_REQUEST.code, "当前 api-key 请求过于频繁")
}
}
}
@@ -5,14 +5,11 @@ package com.bbit.ticket.service.piaotong
import com.bbit.ticket.dao.piaotong.BlueInvoiceDao import com.bbit.ticket.dao.piaotong.BlueInvoiceDao
import com.bbit.ticket.entity.common.PageResult import com.bbit.ticket.entity.common.PageResult
import com.bbit.ticket.entity.request.AskInvoiceRequest import com.bbit.ticket.entity.request.AskInvoiceRequest
import com.bbit.ticket.entity.request.InvoiceQueryRequest
import com.bbit.ticket.entity.request.InvoiceQuerySubmitRequest
import com.bbit.ticket.entity.request.QueryInvoiceRequest import com.bbit.ticket.entity.request.QueryInvoiceRequest
import com.bbit.ticket.entity.response.InvoiceDownloadUrlResponse import com.bbit.ticket.entity.response.InvoiceDownloadUrlResponse
import com.bbit.ticket.entity.response.QueryInvoiceResult import com.bbit.ticket.entity.response.QueryInvoiceResult
import com.bbit.ticket.entity.response.InvoiceDetailResponse import com.bbit.ticket.entity.response.InvoiceDetailResponse
import com.bbit.ticket.entity.response.InvoiceHistoryItem import com.bbit.ticket.entity.response.InvoiceHistoryItem
import com.bbit.ticket.entity.response.InvoiceQueryResponse
import com.bbit.ticket.utils.CurrentUser import com.bbit.ticket.utils.CurrentUser
import io.ktor.client.request.get import io.ktor.client.request.get
import io.ktor.client.statement.bodyAsBytes import io.ktor.client.statement.bodyAsBytes
@@ -163,18 +160,28 @@ object PTBlueService {
/** /**
* 查询并更新发票状态(复用 syncInvoiceFromPT * 查询并更新发票状态(复用 syncInvoiceFromPT
*/ */
suspend fun queryInvoiceAllInfo(req: QueryInvoiceRequest): QueryInvoiceResult { suspend fun queryInvoiceAllInfo(user: CurrentUser, invoiceReqSerialNo: String): QueryInvoiceResult {
val invoiceReqSerialNo = req.invoiceReqSerialNo
val scope = dbQuery { val scope = dbQuery {
BlueInvoiceDao.findInvoiceScopeBySerialNo(invoiceReqSerialNo) BlueInvoiceDao.findInvoiceScopeBySerialNo(
user.id,
user.enterpriseId,
if (user.isDigitalOperator) user.digitalAccountId else null,
invoiceReqSerialNo,
)
} }
val result = syncInvoiceFromPT(scope.userId, invoiceReqSerialNo, req.taxpayerNum, scope.enterpriseId, scope.digitalAccountId) val result = syncInvoiceFromPT(
scope.userId,
invoiceReqSerialNo,
scope.taxpayerNum,
scope.enterpriseId,
scope.digitalAccountId,
)
val relatedSerialNos = dbQuery { val relatedSerialNos = dbQuery {
BlueInvoiceDao.findRelatedInvoiceReqSerialNos(scope.userId, invoiceReqSerialNo) BlueInvoiceDao.findRelatedInvoiceReqSerialNos(scope.userId, invoiceReqSerialNo)
} }
relatedSerialNos.forEach { relatedSerialNo -> relatedSerialNos.forEach { relatedSerialNo ->
runCatching { runCatching {
syncInvoiceFromPT(scope.userId, relatedSerialNo, req.taxpayerNum, scope.enterpriseId, scope.digitalAccountId) syncInvoiceFromPT(scope.userId, relatedSerialNo, scope.taxpayerNum, scope.enterpriseId, scope.digitalAccountId)
} }
} }
return result return result
@@ -33,6 +33,16 @@ object AppConfig {
val allowedHosts: List<String>, val allowedHosts: List<String>,
) )
data class OpenApiQueue(
val issueWorkerCount: Int,
val queryWorkerCount: Int,
val maxPendingPerApiKey: Int,
val queryDelaySeconds: Long,
val maxQueryPollCount: Int,
val maxAttemptCount: Int,
val perApiKeyPerSecond: Int,
)
lateinit var app: App lateinit var app: App
private set private set
@@ -48,6 +58,9 @@ object AppConfig {
lateinit var cors: Cors lateinit var cors: Cors
private set private set
lateinit var openApiQueue: OpenApiQueue
private set
fun init(environment: ApplicationEnvironment) { fun init(environment: ApplicationEnvironment) {
app = App( app = App(
name = string(environment, "app.name", "Platform A"), name = string(environment, "app.name", "Platform A"),
@@ -81,6 +94,16 @@ object AppConfig {
.map { it.trim() } .map { it.trim() }
.filter { it.isNotEmpty() }, .filter { it.isNotEmpty() },
) )
openApiQueue = OpenApiQueue(
issueWorkerCount = int(environment, "openapi.queue.issueWorkerCount", 4),
queryWorkerCount = int(environment, "openapi.queue.queryWorkerCount", 4),
maxPendingPerApiKey = int(environment, "openapi.queue.maxPendingPerApiKey", 1000),
queryDelaySeconds = long(environment, "openapi.queue.queryDelaySeconds", 10),
maxQueryPollCount = int(environment, "openapi.queue.maxQueryPollCount", 30),
maxAttemptCount = int(environment, "openapi.queue.maxAttemptCount", 3),
perApiKeyPerSecond = int(environment, "openapi.rateLimit.perApiKeyPerSecond", 20),
)
} }
private fun string(environment: ApplicationEnvironment, path: String, default: String): String = private fun string(environment: ApplicationEnvironment, path: String, default: String): String =
@@ -7,6 +7,8 @@ import com.bbit.ticket.database.piaotong.HistoryInvoiceVoucherTable
import com.bbit.ticket.database.piaotong.HistoryInvoiceRedTable import com.bbit.ticket.database.piaotong.HistoryInvoiceRedTable
import com.bbit.ticket.database.piaotong.OpenInvoiceBatchItemTable import com.bbit.ticket.database.piaotong.OpenInvoiceBatchItemTable
import com.bbit.ticket.database.piaotong.OpenInvoiceBatchTable import com.bbit.ticket.database.piaotong.OpenInvoiceBatchTable
import com.bbit.ticket.database.piaotong.OpenInvoiceQueueControlTable
import com.bbit.ticket.database.piaotong.OpenInvoiceTaskTable
import com.bbit.ticket.database.piaotong.PtDigitalAccountTable import com.bbit.ticket.database.piaotong.PtDigitalAccountTable
import com.bbit.ticket.database.piaotong.PtEnterpriseTable import com.bbit.ticket.database.piaotong.PtEnterpriseTable
import com.bbit.ticket.database.system.SysApiAccessLogTable import com.bbit.ticket.database.system.SysApiAccessLogTable
@@ -47,6 +49,8 @@ object DatabaseInitializer {
HistoryInvoiceOrderTable, HistoryInvoiceOrderTable,
OpenInvoiceBatchTable, OpenInvoiceBatchTable,
OpenInvoiceBatchItemTable, OpenInvoiceBatchItemTable,
OpenInvoiceTaskTable,
OpenInvoiceQueueControlTable,
) )
// 先通过 Exposed 生成迁移 SQL,再逐条执行,避免启动时静默跳过缺失表或字段。 // 先通过 Exposed 生成迁移 SQL,再逐条执行,避免启动时静默跳过缺失表或字段。
transaction { transaction {
@@ -30,3 +30,14 @@ security:
cors: cors:
allowedHosts: "localhost:5173,127.0.0.1:5173" allowedHosts: "localhost:5173,127.0.0.1:5173"
openapi:
queue:
issueWorkerCount: 4
queryWorkerCount: 4
maxPendingPerApiKey: 1000
queryDelaySeconds: 10
maxQueryPollCount: 30
maxAttemptCount: 3
rateLimit:
perApiKeyPerSecond: 20
+66
View File
@@ -221,6 +221,72 @@ export function openApiStatisticsApi(): Promise<OpenApiStatisticsItem[]> {
return http.get('/pt/openapi/statistics') return http.get('/pt/openapi/statistics')
} }
export interface OpenInvoiceTaskOverviewItem {
digitalAccountId: string
apiKey: string
account?: string | null
status: string
pauseCode?: string | null
reason?: string | null
pending: number
processing: number
success: number
failed: number
waitingAuth: number
total: number
lastCreatedAt?: string | null
}
export interface OpenInvoiceTaskItem {
id: string
digitalAccountId: string
apiKey: string
account?: string | null
taskType: string
sourceType: string
runMode: string
invoiceReqSerialNo: string
batchNo?: string | null
status: string
ptCode?: string | null
errorMessage?: string | null
attemptCount: number
maxAttemptCount: number
pollCount: number
maxPollCount: number
nextRunAt: string
createdAt: string
updatedAt?: string | null
startedAt?: string | null
finishedAt?: string | null
}
export function openInvoiceTaskOverviewApi(): Promise<OpenInvoiceTaskOverviewItem[]> {
return http.get('/pt/openapi/tasks/overview')
}
export function openInvoiceTaskPageApi(params: {
page: number
pageSize: number
digitalAccountId?: string
status?: string | null
sourceType?: string | null
runMode?: string | null
}): Promise<PageResult<OpenInvoiceTaskItem>> {
return http.get('/pt/openapi/tasks', { params })
}
export function pauseOpenInvoiceTaskQueueApi(
digitalAccountId: string,
reason?: string
): Promise<string> {
return http.post(`/pt/openapi/tasks/queues/${digitalAccountId}/pause`, { reason })
}
export function resumeOpenInvoiceTaskQueueApi(digitalAccountId: string): Promise<string> {
return http.post(`/pt/openapi/tasks/queues/${digitalAccountId}/resume`)
}
// ============================================= // =============================================
// 开票相关 // 开票相关
// ============================================= // =============================================
+366 -18
View File
@@ -11,6 +11,25 @@
</div> </div>
</div> </div>
<div class="queue-summary">
<div class="metric">
<span>数电账号</span>
<strong>{{ rows.length }}</strong>
</div>
<div class="metric">
<span>待处理</span>
<strong>{{ totals.pending }}</strong>
</div>
<div class="metric">
<span>处理中</span>
<strong>{{ totals.processing }}</strong>
</div>
<div class="metric">
<span>需认证</span>
<strong>{{ totals.waitingAuth }}</strong>
</div>
</div>
<div class="card-body card-body-fill table-fill"> <div class="card-body card-body-fill table-fill">
<n-data-table <n-data-table
flex-height flex-height
@@ -19,43 +38,372 @@
:data="rows" :data="rows"
:loading="loading" :loading="loading"
:pagination="{ pageSize: 12 }" :pagination="{ pageSize: 12 }"
:row-key="(row: OpenApiStatisticsItem) => `${row.digitalAccountId}-${row.interfaceCode}`" :row-key="(row: OpenInvoiceTaskOverviewItem) => row.digitalAccountId"
:scroll-x="1040" :row-props="rowProps"
:scroll-x="1160"
/> />
</div> </div>
</section> </section>
<n-drawer v-model:show="drawerVisible" :width="760">
<n-drawer-content :title="drawerTitle" closable>
<div class="drawer-toolbar">
<n-select
v-model:value="taskStatus"
clearable
size="small"
placeholder="全部状态"
:options="statusOptions"
class="task-filter"
@update:value="loadTasks(1)"
/>
<n-select
v-model:value="taskSourceType"
clearable
size="small"
placeholder="全部来源"
:options="sourceTypeOptions"
class="task-filter"
@update:value="loadTasks(1)"
/>
<n-select
v-model:value="taskRunMode"
clearable
size="small"
placeholder="全部模式"
:options="runModeOptions"
class="task-filter"
@update:value="loadTasks(1)"
/>
<n-button size="small" :loading="taskLoading" @click="loadTasks(taskPage)">
<template #icon><n-icon :component="RefreshCw" /></template>
刷新
</n-button>
</div>
<n-data-table
size="small"
:columns="taskColumns"
:data="tasks"
:loading="taskLoading"
:pagination="taskPagination"
:scroll-x="980"
remote
@update:page="loadTasks"
@update:page-size="changeTaskPageSize"
/>
</n-drawer-content>
</n-drawer>
</div> </div>
</template> </template>
<script setup lang="ts"> <script setup lang="ts">
import { h, onMounted, ref } from 'vue' import { computed, h, onMounted, reactive, ref } from 'vue'
import type { DataTableColumns } from 'naive-ui' import type { DataTableColumns, PaginationProps } from 'naive-ui'
import { NButton, NDataTable, NIcon, NTag } from 'naive-ui' import { NButton, NDataTable, NDrawer, NDrawerContent, NIcon, NSelect, NTag } from 'naive-ui'
import { RefreshCw } from 'lucide-vue-next' import { ListTree, Pause, Play, RefreshCw } from 'lucide-vue-next'
import { openApiStatisticsApi } from '@/api/piaotong' import {
import type { OpenApiStatisticsItem } from '@/api/piaotong' openInvoiceTaskOverviewApi,
openInvoiceTaskPageApi,
pauseOpenInvoiceTaskQueueApi,
resumeOpenInvoiceTaskQueueApi
} from '@/api/piaotong'
import type { OpenInvoiceTaskItem, OpenInvoiceTaskOverviewItem } from '@/api/piaotong'
import { appMessage } from '@/utils/message'
const loading = ref(false) const loading = ref(false)
const rows = ref<OpenApiStatisticsItem[]>([]) const rows = ref<OpenInvoiceTaskOverviewItem[]>([])
const selected = ref<OpenInvoiceTaskOverviewItem | null>(null)
const drawerVisible = ref(false)
const taskLoading = ref(false)
const tasks = ref<OpenInvoiceTaskItem[]>([])
const taskPage = ref(1)
const taskPageSize = ref(20)
const taskTotal = ref(0)
const taskStatus = ref<string | null>(null)
const taskSourceType = ref<string | null>(null)
const taskRunMode = ref<string | null>(null)
const columns: DataTableColumns<OpenApiStatisticsItem> = [ const statusOptions = [
{ title: '数电账号ID', key: 'digitalAccountId', minWidth: 220 }, { label: '待处理', value: 'PENDING' },
{ title: '接口', key: 'interfaceCode', minWidth: 180 }, { label: '处理中', value: 'PROCESSING' },
{ title: '调用次数', key: 'total', width: 100 }, { label: '成功', value: 'SUCCESS' },
{ title: '成功', key: 'success', width: 100, render: (row) => h(NTag, { type: 'success' }, { default: () => row.success }) }, { label: '失败', value: 'FAILED' },
{ title: '失败', key: 'failed', width: 100, render: (row) => h(NTag, { type: row.failed > 0 ? 'error' : 'default' }, { default: () => row.failed }) }, { label: '需认证', value: 'WAITING_AUTH' }
{ title: '平均耗时(ms)', key: 'avgCostMs', width: 140 },
{ title: '最近调用', key: 'lastCalledAt', minWidth: 180 }
] ]
const sourceTypeOptions = [
{ label: '单笔', value: 'SINGLE' },
{ label: '批量', value: 'BATCH' },
{ label: '测试', value: 'TEST' }
]
const runModeOptions = [
{ label: '生产', value: 'REAL' },
{ label: '模拟', value: 'SIMULATED' }
]
const totals = computed(() =>
rows.value.reduce(
(acc, row) => {
acc.pending += row.pending
acc.processing += row.processing
acc.waitingAuth += row.waitingAuth
return acc
},
{ pending: 0, processing: 0, waitingAuth: 0 }
)
)
const drawerTitle = computed(() => {
const row = selected.value
if (!row) return '任务明细'
return `${row.account || row.digitalAccountId} 任务明细`
})
const statusTagType = (status: string) => {
if (status === 'RUNNING' || status === 'SUCCESS') return 'success'
if (status === 'PAUSED' || status === 'WAITING_AUTH') return 'warning'
if (status === 'FAILED') return 'error'
if (status === 'PROCESSING') return 'info'
return 'default'
}
const columns: DataTableColumns<OpenInvoiceTaskOverviewItem> = [
{ title: '数电账号', key: 'account', minWidth: 160, render: (row) => row.account || '-' },
{ title: '数电账号ID', key: 'digitalAccountId', minWidth: 220 },
{
title: '队列',
key: 'status',
width: 110,
render: (row) =>
h(NTag, { type: statusTagType(row.status), size: 'small' }, { default: () => row.status })
},
{ title: '待处理', key: 'pending', width: 90 },
{ title: '处理中', key: 'processing', width: 90 },
{
title: '需认证',
key: 'waitingAuth',
width: 90,
render: (row) =>
h(
NTag,
{ type: row.waitingAuth > 0 ? 'warning' : 'default', size: 'small' },
{ default: () => row.waitingAuth }
)
},
{ title: '成功', key: 'success', width: 90 },
{
title: '失败',
key: 'failed',
width: 90,
render: (row) =>
h(
NTag,
{ type: row.failed > 0 ? 'error' : 'default', size: 'small' },
{ default: () => row.failed }
)
},
{ title: '总数', key: 'total', width: 90 },
{ title: '最近任务', key: 'lastCreatedAt', minWidth: 150 },
{
title: '操作',
key: 'actions',
width: 168,
align: 'center',
render: (row) =>
h(
'div',
{ class: 'action-buttons' },
[
h(
NButton,
{
size: 'small',
secondary: true,
title: '查看任务详情',
onClick: (event: MouseEvent) => {
event.stopPropagation()
openDrawer(row)
}
},
{
icon: () => h(NIcon, { component: ListTree }),
default: () => '详情'
}
),
h(
NButton,
{
size: 'small',
secondary: true,
type: row.status === 'PAUSED' ? 'success' : 'warning',
title: row.status === 'PAUSED' ? '恢复队列' : '暂停队列',
onClick: (event: MouseEvent) => {
event.stopPropagation()
row.status === 'PAUSED' ? resume(row) : pause(row)
}
},
{
icon: () => h(NIcon, { component: row.status === 'PAUSED' ? Play : Pause }),
default: () => (row.status === 'PAUSED' ? '恢复' : '暂停')
}
)
]
)
}
]
function rowProps(row: OpenInvoiceTaskOverviewItem) {
return {
style: 'cursor: pointer;',
onClick: () => {
openDrawer(row)
}
}
}
const taskColumns: DataTableColumns<OpenInvoiceTaskItem> = [
{ title: '票号', key: 'invoiceReqSerialNo', minWidth: 170 },
{ title: '任务', key: 'taskType', width: 120 },
{ title: '来源', key: 'sourceType', width: 90 },
{ title: '模式', key: 'runMode', width: 90 },
{
title: '状态',
key: 'status',
width: 110,
render: (row) =>
h(NTag, { type: statusTagType(row.status), size: 'small' }, { default: () => row.status })
},
{ title: 'PT码', key: 'ptCode', width: 90 },
{ title: '查询次数', key: 'pollCount', width: 90, render: (row) => `${row.pollCount}/${row.maxPollCount}` },
{ title: '重试', key: 'attemptCount', width: 90, render: (row) => `${row.attemptCount}/${row.maxAttemptCount}` },
{ title: '下次执行', key: 'nextRunAt', minWidth: 150 },
{ title: '错误', key: 'errorMessage', minWidth: 180 }
]
const taskPagination = reactive<PaginationProps>({
page: taskPage.value,
pageSize: taskPageSize.value,
itemCount: taskTotal.value,
showSizePicker: true,
pageSizes: [10, 20, 50]
})
async function load() { async function load() {
loading.value = true loading.value = true
try { try {
rows.value = await openApiStatisticsApi() rows.value = await openInvoiceTaskOverviewApi()
} finally { } finally {
loading.value = false loading.value = false
} }
} }
async function loadTasks(page = 1) {
if (!selected.value) return
taskLoading.value = true
try {
const result = await openInvoiceTaskPageApi({
digitalAccountId: selected.value.digitalAccountId,
status: taskStatus.value,
sourceType: taskSourceType.value,
runMode: taskRunMode.value,
page,
pageSize: taskPageSize.value
})
tasks.value = result.items
taskPage.value = result.page
taskTotal.value = result.total
taskPagination.page = result.page
taskPagination.itemCount = result.total
} finally {
taskLoading.value = false
}
}
function openDrawer(row: OpenInvoiceTaskOverviewItem) {
selected.value = row
drawerVisible.value = true
taskStatus.value = null
taskSourceType.value = null
taskRunMode.value = null
loadTasks(1)
}
async function changeTaskPageSize(pageSize: number) {
taskPageSize.value = pageSize
taskPagination.pageSize = pageSize
await loadTasks(1)
}
async function pause(row: OpenInvoiceTaskOverviewItem) {
await pauseOpenInvoiceTaskQueueApi(row.digitalAccountId, '手动暂停')
appMessage.success('队列已暂停')
await load()
}
async function resume(row: OpenInvoiceTaskOverviewItem) {
await resumeOpenInvoiceTaskQueueApi(row.digitalAccountId)
appMessage.success('队列已恢复')
await load()
if (selected.value?.digitalAccountId === row.digitalAccountId) {
await loadTasks(taskPage.value)
}
}
onMounted(load) onMounted(load)
</script> </script>
<style scoped>
.queue-summary {
display: grid;
grid-template-columns: repeat(4, minmax(120px, 1fr));
gap: 12px;
padding: 0 16px 12px;
}
.metric {
border: 1px solid var(--border-color);
border-radius: 8px;
padding: 12px;
background: var(--card-color);
}
.metric span {
display: block;
color: var(--text-color-2);
font-size: 12px;
}
.metric strong {
display: block;
margin-top: 4px;
font-size: 22px;
font-weight: 700;
}
.drawer-toolbar {
display: flex;
justify-content: flex-end;
gap: 8px;
margin-bottom: 12px;
}
.task-filter {
width: 140px;
}
.action-buttons {
display: flex;
align-items: center;
justify-content: center;
gap: 8px;
min-width: 144px;
}
@media (max-width: 720px) {
.queue-summary {
grid-template-columns: repeat(2, minmax(120px, 1fr));
}
}
</style>