完善开放接口,完善开票历史

This commit is contained in:
BBIT-Kai
2026-05-21 10:53:08 +08:00
parent ccc164b176
commit 40f1c27e71
15 changed files with 1087 additions and 431 deletions
@@ -6,6 +6,7 @@ import com.bbit.ticket.dao.system.pageOffset
import com.bbit.ticket.database.piaotong.HistoryInvoiceBasicTable
import com.bbit.ticket.database.piaotong.HistoryInvoiceGoodsTable
import com.bbit.ticket.database.piaotong.HistoryInvoiceOrderTable
import com.bbit.ticket.database.piaotong.HistoryInvoiceRedTable
import com.bbit.ticket.database.piaotong.HistoryInvoiceVoucherTable
import com.bbit.ticket.database.piaotong.OpenInvoiceBatchItemTable
import com.bbit.ticket.database.piaotong.OpenInvoiceBatchTable
@@ -320,6 +321,8 @@ object BlueInvoiceDao {
}[HistoryInvoiceBasicTable.id]
}
syncBlueRedFlag(userId, req.invoiceReqSerialNo, req.code, now)
// 同步商品明细(先删后插)
if (!req.itemList.isNullOrEmpty()) {
HistoryInvoiceGoodsTable.deleteWhere {
@@ -354,6 +357,38 @@ object BlueInvoiceDao {
}
}
private fun syncBlueRedFlag(
userId: Uuid,
redInvoiceReqSerialNo: String,
code: String,
now: OffsetDateTime,
) {
val historyId = HistoryInvoiceRedTable.selectAll()
.where {
(HistoryInvoiceRedTable.userId eq userId) and
(HistoryInvoiceRedTable.invoiceReqSerialNo eq redInvoiceReqSerialNo)
}
.firstOrNull()
?.get(HistoryInvoiceRedTable.historyId)
?: return
HistoryInvoiceBasicTable.update({
(HistoryInvoiceBasicTable.id eq historyId) and
(HistoryInvoiceBasicTable.userId eq userId) and
HistoryInvoiceBasicTable.deletedAt.isNull()
}) {
it[redFlag] = redFlagByCode(code)
it[updatedAt] = now
}
}
private fun redFlagByCode(code: String): String =
when (code) {
"0000" -> "ALREADY_RED"
"9999" -> "RED_FAIL"
else -> "REDING"
}
/** 提取公共字段赋值逻辑(独立函数而非扩展,兼容 insert/update 不同 receiver 类型) */
private fun applySyncFields(it: UpdateBuilder<Int>, req: GetInvoiceInfoResponse) {
// 基础状态
@@ -414,6 +449,52 @@ object BlueInvoiceDao {
?: throw com.bbit.ticket.entity.common.BizException("NOT_FOUND", "发票记录不存在用户信息")
}
fun findRelatedInvoiceReqSerialNos(userId: Uuid, invoiceReqSerialNo: String): List<String> {
val basicRow = HistoryInvoiceBasicTable.selectAll()
.where {
(HistoryInvoiceBasicTable.userId eq userId) and
(HistoryInvoiceBasicTable.invoiceReqSerialNo eq invoiceReqSerialNo) and
HistoryInvoiceBasicTable.deletedAt.isNull()
}
.singleOrNull()
?: return emptyList()
val basicId = basicRow[HistoryInvoiceBasicTable.id]
val invoiceType = basicRow[HistoryInvoiceBasicTable.invoiceType]
val blueHistoryId = if (invoiceType == "2") {
HistoryInvoiceRedTable.selectAll()
.where {
(HistoryInvoiceRedTable.userId eq userId) and
(HistoryInvoiceRedTable.invoiceReqSerialNo eq invoiceReqSerialNo)
}
.singleOrNull()
?.get(HistoryInvoiceRedTable.historyId)
?: return emptyList()
} else {
basicId
}
val blueSerialNo = HistoryInvoiceBasicTable.selectAll()
.where {
(HistoryInvoiceBasicTable.id eq blueHistoryId) and
(HistoryInvoiceBasicTable.userId eq userId) and
HistoryInvoiceBasicTable.deletedAt.isNull()
}
.singleOrNull()
?.get(HistoryInvoiceBasicTable.invoiceReqSerialNo)
val redSerialNos = HistoryInvoiceRedTable.selectAll()
.where {
(HistoryInvoiceRedTable.userId eq userId) and
(HistoryInvoiceRedTable.historyId eq blueHistoryId)
}
.map { it[HistoryInvoiceRedTable.invoiceReqSerialNo] }
return (listOfNotNull(blueSerialNo) + redSerialNos)
.filter { it != invoiceReqSerialNo }
.distinct()
}
fun invoiceDownloadUrl(userId: Uuid, invoiceReqSerialNo: String): InvoiceDownloadUrlResponse? {
val row = HistoryInvoiceBasicTable.selectAll()
.where {
@@ -14,6 +14,7 @@ import kotlin.uuid.Uuid
object HistoryDao {
data class HistoryRow(
val invoiceReqSerialNo: String,
val invoiceCode: String?,
val invoiceNo: String?,
val electronicInvoiceNo: String?,
@@ -62,6 +63,7 @@ object HistoryDao {
?: row[HistoryInvoiceBasicTable.createdAt].format(dateFormatter)
return HistoryRow(
invoiceReqSerialNo = row[HistoryInvoiceBasicTable.invoiceReqSerialNo],
invoiceCode = row[HistoryInvoiceBasicTable.invoiceCode],
invoiceNo = row[HistoryInvoiceBasicTable.invoiceNo],
electronicInvoiceNo = row[HistoryInvoiceBasicTable.electronicInvoiceNo],
@@ -12,6 +12,7 @@ import org.jetbrains.exposed.v1.core.eq
import org.jetbrains.exposed.v1.core.isNull
import org.jetbrains.exposed.v1.jdbc.insert
import org.jetbrains.exposed.v1.jdbc.selectAll
import org.jetbrains.exposed.v1.jdbc.update
import java.math.BigDecimal
import java.time.OffsetDateTime
import kotlin.uuid.ExperimentalUuidApi
@@ -97,6 +98,15 @@ object RedInvoiceDao {
it[HistoryInvoiceRedTable.createdAt] = now
it[HistoryInvoiceRedTable.createdBy] = userId
}
HistoryInvoiceBasicTable.update({
(HistoryInvoiceBasicTable.id eq historyId) and
(HistoryInvoiceBasicTable.userId eq userId) and
HistoryInvoiceBasicTable.deletedAt.isNull()
}) {
it[redFlag] = "REDING"
it[updatedAt] = now
}
}
/**
@@ -8,7 +8,7 @@ import kotlinx.serialization.Serializable
@Serializable
data class OpenBlueInvoiceCreateRequest(
val requestNo: String? = null,
val invoiceReqSerialNo: String,
val invoiceReqSerialNo: String? = null,
val invoiceIssueKindCode: String = "82",
val buyerName: String,
val purchaseInvSellerIdType: String? = null,
@@ -2,7 +2,6 @@
package com.bbit.ticket.route.openapi
import com.bbit.ticket.dao.system.LogDao
import com.bbit.ticket.entity.common.BizException
import com.bbit.ticket.entity.common.PTException
import com.bbit.ticket.entity.common.fail
@@ -10,8 +9,8 @@ import com.bbit.ticket.entity.common.ok
import com.bbit.ticket.entity.openapi.OpenBlueInvoiceBatchCreateRequest
import com.bbit.ticket.entity.openapi.OpenBlueInvoiceCreateRequest
import com.bbit.ticket.service.openapi.OpenBlueInvoiceService
import com.bbit.ticket.service.system.ApiAccessLogService
import com.bbit.ticket.utils.requireOpenApiPrincipal
import com.bbit.ticket.utils.plugins.dbQuery
import com.bbit.ticket.utils.plugins.myJson
import io.ktor.server.application.ApplicationCall
import io.ktor.server.request.receive
@@ -19,58 +18,70 @@ import io.ktor.server.response.respond
import io.ktor.server.routing.Route
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import io.ktor.server.routing.route
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
import kotlin.time.TimeSource
import kotlin.uuid.ExperimentalUuidApi
/**
* 注册开放平台蓝票开具、查询、票样与批量开票接口。
*
* @receiver 当前 Ktor 路由节点。
*/
fun Route.registerOpenBlueInvoiceRoutes() {
post {
val principal = call.requireOpenApiPrincipal()
val request = call.receive<OpenBlueInvoiceCreateRequest>()
call.respondOpenApi(principal.apiKey, principal.username, myJson.encodeToString(request)) {
OpenBlueInvoiceService.createSingle(principal, request)
}
post {
val principal = call.requireOpenApiPrincipal()
val request = call.receive<OpenBlueInvoiceCreateRequest>()
call.respondOpenApi(principal.apiKey, principal.username, myJson.encodeToString(request)) {
OpenBlueInvoiceService.createSingle(principal, request)
}
}
get("/{invoiceReqSerialNo}") {
val principal = call.requireOpenApiPrincipal()
val invoiceReqSerialNo = call.parameters["invoiceReqSerialNo"].orEmpty()
call.respondOpenApi(principal.apiKey, principal.username, null) {
OpenBlueInvoiceService.querySingle(principal, invoiceReqSerialNo)
}
get("/{invoiceReqSerialNo}") {
val principal = call.requireOpenApiPrincipal()
val invoiceReqSerialNo = call.parameters["invoiceReqSerialNo"].orEmpty()
call.respondOpenApi(principal.apiKey, principal.username, null) {
OpenBlueInvoiceService.querySingle(principal, invoiceReqSerialNo)
}
}
get("/sample/{invoiceReqSerialNo}") {
val principal = call.requireOpenApiPrincipal()
val invoiceReqSerialNo = call.parameters["invoiceReqSerialNo"].orEmpty()
call.respondOpenApi(principal.apiKey, principal.username, null) {
OpenBlueInvoiceService.sample(principal, invoiceReqSerialNo)
}
get("/sample/{invoiceReqSerialNo}") {
val principal = call.requireOpenApiPrincipal()
val invoiceReqSerialNo = call.parameters["invoiceReqSerialNo"].orEmpty()
call.respondOpenApi(principal.apiKey, principal.username, null) {
OpenBlueInvoiceService.sample(principal, invoiceReqSerialNo)
}
}
post("/batches") {
val principal = call.requireOpenApiPrincipal()
val request = call.receive<OpenBlueInvoiceBatchCreateRequest>()
call.respondOpenApi(principal.apiKey, principal.username, myJson.encodeToString(request)) {
val response = OpenBlueInvoiceService.createBatch(principal, request)
call.application.launch {
OpenBlueInvoiceService.processBatch(principal, request.batchNo)
}
response
post("/batches") {
val principal = call.requireOpenApiPrincipal()
val request = call.receive<OpenBlueInvoiceBatchCreateRequest>()
call.respondOpenApi(principal.apiKey, principal.username, myJson.encodeToString(request)) {
val response = OpenBlueInvoiceService.createBatch(principal, request)
call.application.launch {
OpenBlueInvoiceService.processBatch(principal, request.batchNo)
}
response
}
}
get("/batches/{batchNo}") {
val principal = call.requireOpenApiPrincipal()
val batchNo = call.parameters["batchNo"].orEmpty()
call.respondOpenApi(principal.apiKey, principal.username, null) {
OpenBlueInvoiceService.queryBatch(principal, batchNo)
}
get("/batches/{batchNo}") {
val principal = call.requireOpenApiPrincipal()
val batchNo = call.parameters["batchNo"].orEmpty()
call.respondOpenApi(principal.apiKey, principal.username, null) {
OpenBlueInvoiceService.queryBatch(principal, batchNo)
}
}
}
/**
* 使用开放接口统一响应格式执行接口逻辑,并记录 API 访问日志。
*
* @param appKey 调用方应用密钥。
* @param appName 调用方应用名称。
* @param requestBody 请求体 JSON 文本。
* @param block 当前接口要执行的业务逻辑。
*/
private suspend inline fun <reified T> ApplicationCall.respondOpenApi(
appKey: String?,
appName: String?,
@@ -79,8 +90,7 @@ private suspend inline fun <reified T> ApplicationCall.respondOpenApi(
) {
val start = TimeSource.Monotonic.markNow()
try {
val data = block()
val response = ok(data)
val response = ok(block())
val responseBody = myJson.encodeToString(response)
respond(response)
saveOpenApiLog(appKey, appName, requestBody, response.code, responseBody, "SUCCESS", null, start)
@@ -99,6 +109,18 @@ private suspend inline fun <reified T> ApplicationCall.respondOpenApi(
}
}
/**
* 保存开放接口访问日志。
*
* @param appKey 调用方应用密钥。
* @param appName 调用方应用名称。
* @param requestBody 请求体 JSON 文本。
* @param responseCode 响应业务状态码。
* @param responseBody 响应体 JSON 文本。
* @param status 日志状态,SUCCESS 或 FAILED。
* @param errorMessage 失败时的错误信息。
* @param start 接口开始时间标记。
*/
private suspend fun ApplicationCall.saveOpenApiLog(
appKey: String?,
appName: String?,
@@ -108,9 +130,9 @@ private suspend fun ApplicationCall.saveOpenApiLog(
status: String,
errorMessage: String?,
start: TimeSource.Monotonic.ValueTimeMark,
) = dbQuery {
LogDao.saveApiAccessLog(
call = this@saveOpenApiLog,
) {
ApiAccessLogService.save(
call = this,
appKey = appKey,
appName = appName,
requestBody = requestBody,
@@ -0,0 +1,71 @@
package com.bbit.ticket.route.piaotong
import com.bbit.ticket.entity.common.BizException
import com.bbit.ticket.entity.common.PTException
import com.bbit.ticket.entity.common.fail
import com.bbit.ticket.entity.common.ok
import io.ktor.http.ContentType
import io.ktor.http.HttpHeaders
import io.ktor.server.application.ApplicationCall
import io.ktor.server.response.header
import io.ktor.server.response.respond
import io.ktor.server.response.respondBytes
/**
* 使用统一票通响应格式执行接口逻辑。
*
* @param fallbackMessage 未知异常时返回给前端的兜底提示。
* @param block 当前接口要执行的业务逻辑。
*/
suspend inline fun <reified T> ApplicationCall.respondPt(
fallbackMessage: String,
crossinline block: suspend () -> T,
) {
try {
respond(ok(block()))
} catch (e: PTException) {
respond(fail(code = e.code, message = e.message, traceId = e.serialNo))
} catch (e: BizException) {
respond(e.status, fail(code = e.errorCode, message = e.message))
} catch (e: Exception) {
respond(fail(code = "-1", message = e.message ?: fallbackMessage))
}
}
/**
* 读取必填查询参数,参数缺失时直接返回失败响应。
*
* @param name 查询参数名称。
* @param message 参数缺失时返回给前端的提示。
*/
suspend fun ApplicationCall.requiredQueryParameter(name: String, message: String): String? {
val value = request.queryParameters[name]?.trim()?.takeIf { it.isNotEmpty() }
if (value == null) {
respond(fail(code = "-1", message = message))
}
return value
}
/**
* 使用统一票通异常处理返回 PDF 预览内容。
*
* @param fallbackMessage 未知异常时返回给前端的兜底提示。
* @param filename 响应头中的文件名。
* @param block 获取 PDF 文件内容的业务逻辑。
*/
suspend fun ApplicationCall.respondPtPdf(
fallbackMessage: String,
filename: String,
block: suspend () -> ByteArray,
) {
try {
response.header(HttpHeaders.ContentDisposition, "inline; filename=\"$filename\"")
respondBytes(block(), ContentType.Application.Pdf)
} catch (e: PTException) {
respond(fail(code = e.code, message = e.message, traceId = e.serialNo))
} catch (e: BizException) {
respond(e.status, fail(code = e.errorCode, message = e.message))
} catch (e: Exception) {
respond(fail(code = "-1", message = e.message ?: fallbackMessage))
}
}
@@ -2,9 +2,7 @@
package com.bbit.ticket.route.piaotong
import com.bbit.ticket.entity.common.PTException
import com.bbit.ticket.entity.common.fail
import com.bbit.ticket.entity.common.ok
import com.bbit.ticket.entity.common.BizException
import com.bbit.ticket.entity.request.AuthQrcodeRequest
import com.bbit.ticket.entity.request.GetLoginSmsCodeRequest
import com.bbit.ticket.entity.request.QueryRealNameAuthQrStatusRequest
@@ -19,230 +17,122 @@ import com.bbit.ticket.service.piaotong.PTAuthService
import com.bbit.ticket.service.piaotong.PTConfigService
import com.bbit.ticket.utils.requireCurrentUser
import com.bbit.ticket.utils.requirePtProfile
import io.ktor.http.HttpStatusCode
import io.ktor.server.request.receive
import io.ktor.server.response.respond
import io.ktor.server.routing.Route
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import io.ktor.server.routing.put
import kotlin.uuid.ExperimentalUuidApi
/**
* 注册票通认证与本地配置相关接口。
*
* @receiver 当前 Ktor 路由节点。
*/
fun Route.registerPTAuthRoutes() {
get("/info") {
try {
val currentUser = call.requireCurrentUser()
val profile = currentUser.requirePtProfile()
val response = PTAuthService.getTaxBureauAccountAuthStatus(
call.respondPt("查询票通认证状态失败") {
val profile = call.requireCurrentUser().requirePtProfile()
PTAuthService.getTaxBureauAccountAuthStatus(
TaxBureauAuthReq(profile.taxpayerNum, profile.taxAccount)
)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
)
)
}
}
post("/register") {
try {
val currentUser = call.requireCurrentUser()
val req = call.receive<TaxRegisterInfo>()
val response = PTAuthService.registerEnterprise(req, currentUser.id)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
)
)
}
}
post("/registerUser") {
try {
val currentUser = call.requireCurrentUser()
val req = call.receive<TaxRegisterUserRequest>()
val response = PTAuthService.registerUserFromPayload(req, currentUser)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
)
)
}
}
// =============================================
// 基础信息配置(本地 CRUD
// =============================================
// 1. 企业信息
get("/enterprise") {
try {
post("/register") {
call.respondPt("企业注册失败") {
val currentUser = call.requireCurrentUser()
val response = PTConfigService.getEnterpriseInfo(currentUser.id)
if (response == null) {
call.respond(ok(emptyMap<String, String>()))
} else {
call.respond(ok(response))
}
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "查询企业信息失败"))
PTAuthService.registerEnterprise(call.receive<TaxRegisterInfo>(), currentUser.id)
}
}
post("/registerUser") {
call.respondPt("用户注册失败") {
PTAuthService.registerUserFromPayload(
call.receive<TaxRegisterUserRequest>(),
call.requireCurrentUser()
)
}
}
get("/enterprise") {
call.respondPt("查询企业信息失败") {
PTConfigService.getEnterpriseInfo(call.requireCurrentUser().id) ?: emptyMap<String, String>()
}
}
put("/enterprise") {
try {
val currentUser = call.requireCurrentUser()
val req = call.receive<UpdateEnterpriseInfoRequest>()
val response = PTConfigService.updateEnterpriseInfo(currentUser.id, req)
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "保存企业信息失败"))
call.respondPt("保存企业信息失败") {
PTConfigService.updateEnterpriseInfo(
call.requireCurrentUser().id,
call.receive<UpdateEnterpriseInfoRequest>()
)
}
}
// 2. 登记数电账号
get("/digital-account") {
try {
call.respondPt("查询数电账号失败") {
val currentUser = call.requireCurrentUser()
if (currentUser.taxPayerNum == null) {
call.respond(fail(code = "-1", message = "请先完善用户信息"))
} else {
val response = PTConfigService.getDigitalAccount(currentUser.id)
if (response == null) {
call.respond(ok(emptyMap<String, String>()))
} else {
call.respond(ok(response))
}
throw BizException("-1", "请先完善用户信息", HttpStatusCode.OK)
}
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "查询数电账号失败"))
PTConfigService.getDigitalAccount(currentUser.id) ?: emptyMap<String, String>()
}
}
put("/digital-account") {
try {
val currentUser = call.requireCurrentUser()
val req = call.receive<UpdateDigitalAccountRequest>()
val response = PTConfigService.updateDigitalAccount(currentUser.id, req)
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "保存数电账号失败"))
call.respondPt("保存数电账号失败") {
PTConfigService.updateDigitalAccount(
call.requireCurrentUser().id,
call.receive<UpdateDigitalAccountRequest>()
)
}
}
// 3. 开票预设数据
get("/preset") {
try {
val currentUser = call.requireCurrentUser()
val response = PTConfigService.getPresetData(currentUser.id)
if (response == null) {
call.respond(ok(emptyMap<String, String>()))
} else {
call.respond(ok(response))
}
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "查询预设数据失败"))
call.respondPt("查询预设数据失败") {
PTConfigService.getPresetData(call.requireCurrentUser().id) ?: emptyMap<String, String>()
}
}
put("/preset") {
try {
val currentUser = call.requireCurrentUser()
val req = call.receive<UpdatePresetDataRequest>()
val response = PTConfigService.updatePresetData(currentUser.id, req)
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "保存预设数据失败"))
call.respondPt("保存预设数据失败") {
PTConfigService.updatePresetData(
call.requireCurrentUser().id,
call.receive<UpdatePresetDataRequest>()
)
}
}
get("/authentication") {
try {
val qrcodeType = call.request.queryParameters["qrcodeType"]
val currentUser = call.requireCurrentUser()
val profile = currentUser.requirePtProfile()
val response = PTAuthService.getAuthenticationQrcode(
call.respondPt("获取认证二维码失败") {
val profile = call.requireCurrentUser().requirePtProfile()
PTAuthService.getAuthenticationQrcode(
AuthQrcodeRequest(
taxpayerNum = profile.taxpayerNum,
account = profile.taxAccount,
qrcodeType = qrcodeType
)
)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
qrcodeType = call.request.queryParameters["qrcodeType"]
)
)
}
}
/**
* 查询认证二维码扫码状态
*/
post("/query-auth-status") {
try {
val req = call.receive<QueryRealNameAuthQrStatusRequest>()
val response = PTAuthService.queryAuthQrcodeScanStatus(req)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
)
)
call.respondPt("查询认证二维码扫码状态失败") {
PTAuthService.queryAuthQrcodeScanStatus(call.receive<QueryRealNameAuthQrStatusRequest>())
}
}
/**
* 发送登录短信验证码
*/
post("/send-sms-code") {
try {
val req = call.receive<GetLoginSmsCodeRequest>()
val response = PTAuthService.sendLoginSmsCode(req)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
)
)
call.respondPt("发送登录短信验证码失败") {
PTAuthService.sendLoginSmsCode(call.receive<GetLoginSmsCodeRequest>())
}
}
/**
* 短信验证码登录
*/
post("/sms-login") {
try {
val req = call.receive<SmsLoginRequest>()
val response = PTAuthService.smsLogin(req)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
)
)
call.respondPt("短信验证码登录失败") {
PTAuthService.smsLogin(call.receive<SmsLoginRequest>())
}
}
}
@@ -3,9 +3,6 @@
package com.bbit.ticket.route.piaotong
import com.bbit.ticket.entity.common.BizException
import com.bbit.ticket.entity.common.PTException
import com.bbit.ticket.entity.common.fail
import com.bbit.ticket.entity.common.ok
import com.bbit.ticket.entity.request.AskInvoiceRequest
import com.bbit.ticket.entity.request.QueryInvoiceRequest
import com.bbit.ticket.entity.request.RedCreateRequest
@@ -13,228 +10,114 @@ import com.bbit.ticket.service.piaotong.PTBlueService
import com.bbit.ticket.service.piaotong.PTRedService
import com.bbit.ticket.utils.requireCurrentUser
import com.bbit.ticket.utils.requirePtProfile
import io.ktor.http.ContentType
import io.ktor.http.HttpHeaders
import io.ktor.server.request.receive
import io.ktor.server.response.header
import io.ktor.server.response.respond
import io.ktor.server.response.respondBytes
import io.ktor.server.routing.Route
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import kotlin.uuid.ExperimentalUuidApi
/**
* 注册票通开票、历史、票样与状态查询接口。
*
* @receiver 当前 Ktor 路由节点。
*/
fun Route.registerPTInvoiceRoutes() {
/**
* 创建红票任务
*/
post("/invoiceRed") {
try {
val user = call.requireCurrentUser()
val req = call.receive<RedCreateRequest>()
val result = PTRedService.invoiceRed(user, req)
call.respond(ok(result))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
)
)
} catch (e: BizException) {
call.respond(e.status, fail(code = e.errorCode, message = e.message))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "红字任务创建失败"))
call.respondPt("红字任务创建失败") {
PTRedService.invoiceRed(call.requireCurrentUser(), call.receive<RedCreateRequest>())
}
}
post("/invoiceBlue") {
try {
val currentUser = call.requireCurrentUser()
val req = call.receive<AskInvoiceRequest>()
val response = PTBlueService.invoiceBlue(req, currentUser)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
)
)
call.respondPt("蓝票任务创建失败") {
PTBlueService.invoiceBlue(call.receive<AskInvoiceRequest>(), call.requireCurrentUser())
}
}
get("/invoiceBlueHistory") {
try {
call.respondPt("查询开票历史失败") {
val currentUser = call.requireCurrentUser()
val page = call.request.queryParameters["page"]?.toIntOrNull() ?: 1
val pageSize = call.request.queryParameters["pageSize"]?.toIntOrNull() ?: 20
val invoiceType = call.request.queryParameters["invoiceType"]
val isSuccess = call.request.queryParameters["isSuccess"]?.toBooleanStrictOrNull()
val batchNo = call.request.queryParameters["batchNo"]
val response = PTBlueService.getInvoiceBlueHistory(
currentUser.id, page, pageSize, invoiceType, isSuccess, batchNo
PTBlueService.getInvoiceBlueHistory(
userId = currentUser.id,
page = call.request.queryParameters["page"]?.toIntOrNull() ?: 1,
pageSize = call.request.queryParameters["pageSize"]?.toIntOrNull() ?: 20,
invoiceType = call.request.queryParameters["invoiceType"],
isSuccess = call.request.queryParameters["isSuccess"]?.toBooleanStrictOrNull(),
batchNo = call.request.queryParameters["batchNo"],
)
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "查询开票历史失败"))
}
}
get("/invoiceBatchNos") {
try {
val currentUser = call.requireCurrentUser()
val response = PTBlueService.listBatchNos(currentUser.id)
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "查询批次号列表失败"))
call.respondPt("查询批次号列表失败") {
PTBlueService.listBatchNos(call.requireCurrentUser().id)
}
}
get("/invoiceDetail") {
try {
val currentUser = call.requireCurrentUser()
val invoiceReqSerialNo = call.request.queryParameters["invoiceReqSerialNo"]
if (invoiceReqSerialNo.isNullOrBlank()) {
call.respond(fail(code = "-1", message = "请传入发票请求流水号"))
return@get
}
val response = PTBlueService.getInvoiceDetail(currentUser.id, invoiceReqSerialNo)
if (response == null) {
call.respond(fail(code = "-1", message = "未找到该发票记录"))
return@get
}
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "查询发票详情失败"))
val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号")
?: return@get
call.respondPt("查询发票详情失败") {
PTBlueService.getInvoiceDetail(call.requireCurrentUser().id, invoiceReqSerialNo)
?: throw BizException("-1", "未找到该发票记录")
}
}
get("/invoiceDownloadUrl") {
try {
val currentUser = call.requireCurrentUser()
val invoiceReqSerialNo = call.request.queryParameters["invoiceReqSerialNo"]
if (invoiceReqSerialNo.isNullOrBlank()) {
call.respond(fail(code = "-1", message = "璇蜂紶鍏ュ彂绁ㄨ姹傛祦姘村彿"))
return@get
}
val response = PTBlueService.getInvoiceDownloadUrl(currentUser.id, invoiceReqSerialNo)
if (response == null) {
call.respond(fail(code = "-1", message = "鏈壘鍒拌鍙戠エ璁板綍"))
return@get
}
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "鏌ヨ鍙戠エ涓嬭浇鍦板潃澶辫触"))
val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号")
?: return@get
call.respondPt("查询发票下载地址失败") {
PTBlueService.getInvoiceDownloadUrl(call.requireCurrentUser().id, invoiceReqSerialNo)
?: throw BizException("-1", "未找到该发票记录")
}
}
get("/invoicePreview") {
try {
val currentUser = call.requireCurrentUser()
val invoiceReqSerialNo = call.request.queryParameters["invoiceReqSerialNo"]
if (invoiceReqSerialNo.isNullOrBlank()) {
call.respond(fail(code = "-1", message = "璇蜂紶鍏ュ彂绁ㄨ姹傛祦姘村彿"))
return@get
}
val bytes = PTBlueService.getInvoicePreview(currentUser.id, invoiceReqSerialNo)
if (bytes == null) {
call.respond(fail(code = "-1", message = "鏈壘鍒扮エ鏍峰湴鍧€"))
return@get
}
call.response.header(
HttpHeaders.ContentDisposition,
"inline; filename=\"${invoiceReqSerialNo}.pdf\""
)
call.respondBytes(bytes, ContentType.Application.Pdf)
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "棰勮绁ㄦ牱澶辫触"))
val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号")
?: return@get
call.respondPtPdf("预览票样失败", "$invoiceReqSerialNo.pdf") {
PTBlueService.getInvoicePreview(call.requireCurrentUser().id, invoiceReqSerialNo)
?: throw BizException("-1", "未找到票样地址")
}
}
get("/redInvoiceDownloadUrl") {
try {
val currentUser = call.requireCurrentUser()
val invoiceReqSerialNo = call.request.queryParameters["invoiceReqSerialNo"]
if (invoiceReqSerialNo.isNullOrBlank()) {
call.respond(fail(code = "-1", message = "请传入发票请求流水号"))
return@get
}
val response = PTRedService.getRedInvoiceDownloadUrl(currentUser.id, invoiceReqSerialNo)
if (response == null) {
call.respond(fail(code = "-1", message = "未找到该红票记录"))
return@get
}
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "查询红票下载地址失败"))
val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号")
?: return@get
call.respondPt("查询红票下载地址失败") {
PTRedService.getRedInvoiceDownloadUrl(call.requireCurrentUser().id, invoiceReqSerialNo)
?: throw BizException("-1", "未找到该红票记录")
}
}
get("/redInvoicePreview") {
try {
val currentUser = call.requireCurrentUser()
val invoiceReqSerialNo = call.request.queryParameters["invoiceReqSerialNo"]
if (invoiceReqSerialNo.isNullOrBlank()) {
call.respond(fail(code = "-1", message = "请传入发票请求流水号"))
return@get
}
val bytes = PTRedService.getRedInvoicePreview(currentUser.id, invoiceReqSerialNo)
if (bytes == null) {
call.respond(fail(code = "-1", message = "未找到票样地址"))
return@get
}
call.response.header(
HttpHeaders.ContentDisposition,
"inline; filename=\"${invoiceReqSerialNo}.pdf\""
)
call.respondBytes(bytes, ContentType.Application.Pdf)
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "预览红票票样失败"))
val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号")
?: return@get
call.respondPtPdf("预览红票票样失败", "$invoiceReqSerialNo.pdf") {
PTRedService.getRedInvoicePreview(call.requireCurrentUser().id, invoiceReqSerialNo)
?: throw BizException("-1", "未找到票样地址")
}
}
get("/queryInvoice") {
try {
val currentUser = call.requireCurrentUser()
val invoiceReqSerialNo = call.request.queryParameters["invoiceReqSerialNo"]
if (invoiceReqSerialNo.isNullOrBlank()) {
call.respond(fail(code = "-1", message = "请传入发票请求流水号"))
return@get
}
val response = PTBlueService.queryInvoiceAllInfo(
val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号")
?: return@get
call.respondPt("刷新发票状态失败") {
PTBlueService.queryInvoiceAllInfo(
QueryInvoiceRequest(
taxpayerNum = currentUser.requirePtProfile().taxpayerNum,
invoiceReqSerialNo = invoiceReqSerialNo
)
)
call.respond(ok(response))
} catch (e: PTException) {
call.respond(
fail(
code = e.code,
message = e.message,
traceId = e.serialNo
taxpayerNum = call.requireCurrentUser().requirePtProfile().taxpayerNum,
invoiceReqSerialNo = invoiceReqSerialNo,
)
)
}
}
/**
* 查询红票申请信息(冲红原因、收票人信息)
*/
get("/invoiceRedInfo") {
try {
val currentUser = call.requireCurrentUser()
val invoiceReqSerialNo = call.request.queryParameters["invoiceReqSerialNo"]
if (invoiceReqSerialNo.isNullOrBlank()) {
call.respond(fail(code = "-1", message = "请传入发票请求流水号"))
return@get
}
val response = PTRedService.getRedInvoiceInfo(currentUser.id, invoiceReqSerialNo)
if (response == null) {
call.respond(fail(code = "-1", message = "未找到红票申请信息"))
return@get
}
call.respond(ok(response))
} catch (e: Exception) {
call.respond(fail(code = "-1", message = e.message ?: "查询红票申请信息失败"))
val invoiceReqSerialNo = call.requiredQueryParameter("invoiceReqSerialNo", "请传入发票请求流水号")
?: return@get
call.respondPt("查询红票申请信息失败") {
PTRedService.getRedInvoiceInfo(call.requireCurrentUser().id, invoiceReqSerialNo)
?: throw BizException("-1", "未找到红票申请信息")
}
}
}
@@ -4,6 +4,7 @@ package com.bbit.ticket.service.openapi
import com.bbit.ticket.database.piaotong.OpenInvoiceBatchItemTable
import com.bbit.ticket.database.piaotong.OpenInvoiceBatchTable
import com.bbit.ticket.database.piaotong.HistoryInvoiceBasicTable
import com.bbit.ticket.entity.common.BizException
import com.bbit.ticket.entity.common.ErrorCode
import com.bbit.ticket.entity.openapi.OpenBlueInvoiceBatchCreateRequest
@@ -16,6 +17,7 @@ import com.bbit.ticket.entity.openapi.OpenBlueInvoiceSampleResponse
import com.bbit.ticket.entity.request.AskInvoiceRequest
import com.bbit.ticket.service.piaotong.PTBlueService
import com.bbit.ticket.utils.OpenApiPrincipal
import com.bbit.ticket.utils.net.PTClient
import com.bbit.ticket.utils.plugins.dbQuery
import com.bbit.ticket.utils.plugins.myJson
import io.ktor.http.HttpStatusCode
@@ -24,6 +26,7 @@ import kotlinx.serialization.encodeToString
import org.jetbrains.exposed.v1.core.SortOrder
import org.jetbrains.exposed.v1.core.and
import org.jetbrains.exposed.v1.core.eq
import org.jetbrains.exposed.v1.core.inList
import org.jetbrains.exposed.v1.jdbc.insert
import org.jetbrains.exposed.v1.jdbc.selectAll
import org.jetbrains.exposed.v1.jdbc.update
@@ -39,20 +42,13 @@ object OpenBlueInvoiceService {
request: OpenBlueInvoiceCreateRequest,
): OpenBlueInvoiceCreateResponse {
validateCreateRequest(request)
val existing = PTBlueService.getInvoiceDetail(principal.userId, request.invoiceReqSerialNo)
if (existing != null) {
return OpenBlueInvoiceCreateResponse(
requestNo = request.requestNo,
invoiceReqSerialNo = request.invoiceReqSerialNo,
status = existing.status,
)
}
val createRequest = request.withGeneratedInvoiceReqSerialNo(principal.userId)
PTBlueService.createBlueInvoice(request.toAskInvoiceRequest(principal), principal.userId)
val detail = PTBlueService.getInvoiceDetail(principal.userId, request.invoiceReqSerialNo)
PTBlueService.createBlueInvoice(createRequest.toAskInvoiceRequest(principal), principal.userId)
val detail = PTBlueService.getInvoiceDetail(principal.userId, createRequest.requireInvoiceReqSerialNo())
return OpenBlueInvoiceCreateResponse(
requestNo = request.requestNo,
invoiceReqSerialNo = request.invoiceReqSerialNo,
requestNo = createRequest.requestNo,
invoiceReqSerialNo = createRequest.requireInvoiceReqSerialNo(),
status = detail?.status ?: "PROCESSING",
)
}
@@ -110,6 +106,11 @@ object OpenBlueInvoiceService {
throw BizException(ErrorCode.BAD_REQUEST.code, "批次号已存在,一个批次只能开票一次")
}
val reservedSerialNos = mutableSetOf<String>()
val createRequests = request.items.map {
it.withGeneratedInvoiceReqSerialNo(principal.userId, reservedSerialNos)
}
val now = OffsetDateTime.now()
dbQuery {
val batchId = OpenInvoiceBatchTable.insert {
@@ -124,11 +125,11 @@ object OpenBlueInvoiceService {
it[createdAt] = now
}[OpenInvoiceBatchTable.id]
request.items.forEach { item ->
createRequests.forEach { item ->
OpenInvoiceBatchItemTable.insert {
it[OpenInvoiceBatchItemTable.batchId] = batchId
it[requestNo] = item.requestNo
it[invoiceReqSerialNo] = item.invoiceReqSerialNo
it[invoiceReqSerialNo] = item.requireInvoiceReqSerialNo()
it[originalRequestBody] = myJson.encodeToString(item)
it[status] = "PENDING"
it[createdAt] = now
@@ -329,15 +330,9 @@ object OpenBlueInvoiceService {
throw BizException(ErrorCode.BAD_REQUEST.code, "单次批量最多支持 $MAX_BATCH_SIZE 条")
}
request.items.forEach(::validateCreateRequest)
if (request.items.map { it.invoiceReqSerialNo }.distinct().size != request.items.size) {
throw BizException(ErrorCode.BAD_REQUEST.code, "批量明细中 invoiceReqSerialNo 不能重复")
}
}
private fun validateCreateRequest(request: OpenBlueInvoiceCreateRequest) {
if (request.invoiceReqSerialNo.isBlank()) {
throw BizException(ErrorCode.BAD_REQUEST.code, "invoiceReqSerialNo 不能为空")
}
if (request.buyerName.isBlank()) {
throw BizException(ErrorCode.BAD_REQUEST.code, "buyerName 不能为空")
}
@@ -346,10 +341,54 @@ object OpenBlueInvoiceService {
}
}
private suspend fun OpenBlueInvoiceCreateRequest.withGeneratedInvoiceReqSerialNo(
userId: Uuid,
reservedSerialNos: MutableSet<String> = mutableSetOf(),
): OpenBlueInvoiceCreateRequest {
var invoiceReqSerialNo: String
do {
invoiceReqSerialNo = PTClient.ptDate()
} while (
invoiceReqSerialNo in reservedSerialNos ||
findUsedInvoiceReqSerialNos(userId, listOf(invoiceReqSerialNo)).isNotEmpty()
)
reservedSerialNos.add(invoiceReqSerialNo)
return copy(invoiceReqSerialNo = invoiceReqSerialNo)
}
private fun OpenBlueInvoiceCreateRequest.requireInvoiceReqSerialNo(): String =
invoiceReqSerialNo
?: throw BizException(ErrorCode.INTERNAL_SERVER_ERROR.code, "invoiceReqSerialNo 生成失败")
private suspend fun findUsedInvoiceReqSerialNos(userId: Uuid, invoiceReqSerialNos: List<String>): Set<String> {
val serialNos = invoiceReqSerialNos.distinct()
if (serialNos.isEmpty()) {
return emptySet()
}
return dbQuery {
val historySerialNos = HistoryInvoiceBasicTable.selectAll()
.where {
(HistoryInvoiceBasicTable.userId eq userId) and
(HistoryInvoiceBasicTable.invoiceReqSerialNo inList serialNos)
}
.map { it[HistoryInvoiceBasicTable.invoiceReqSerialNo] }
val batchSerialNos = (OpenInvoiceBatchItemTable innerJoin OpenInvoiceBatchTable)
.selectAll()
.where {
(OpenInvoiceBatchTable.userId eq userId) and
(OpenInvoiceBatchItemTable.invoiceReqSerialNo inList serialNos)
}
.map { it[OpenInvoiceBatchItemTable.invoiceReqSerialNo] }
(historySerialNos + batchSerialNos).toSet()
}
}
private fun OpenBlueInvoiceCreateRequest.toAskInvoiceRequest(principal: OpenApiPrincipal): AskInvoiceRequest =
AskInvoiceRequest(
taxpayerNum = principal.taxPayerNum,
invoiceReqSerialNo = invoiceReqSerialNo,
invoiceReqSerialNo = requireInvoiceReqSerialNo(),
invoiceIssueKindCode = invoiceIssueKindCode,
buyerName = buyerName,
purchaseInvSellerIdType = purchaseInvSellerIdType,
@@ -113,7 +113,16 @@ object PTBlueService {
val existing = dbQuery {
BlueInvoiceDao.findUserIdBySerialNo(invoiceReqSerialNo)
}
return syncInvoiceFromPT(existing, invoiceReqSerialNo, req.taxpayerNum)
val result = syncInvoiceFromPT(existing, invoiceReqSerialNo, req.taxpayerNum)
val relatedSerialNos = dbQuery {
BlueInvoiceDao.findRelatedInvoiceReqSerialNos(existing, invoiceReqSerialNo)
}
relatedSerialNos.forEach { relatedSerialNo ->
runCatching {
syncInvoiceFromPT(existing, relatedSerialNo, req.taxpayerNum)
}
}
return result
}
}
@@ -51,10 +51,8 @@ object PTRedService {
)
PTClient.ptPost<QuickRedInvoiceRequest, QuickRedInvoiceResponse>("invoiceRed.pt", req)
dbQuery { RedInvoiceDao.addRedInvoice(user.id, historyId, req) }
// 创建后立即同步一次(非关键,失败忽略)
try {
PTBlueService.syncInvoiceFromPT(user.id, invoiceReqSerialNo, profile.taxpayerNum)
} catch (_: Exception) { }
PTBlueService.syncInvoiceFromPT(user.id, his.invoiceReqSerialNo, profile.taxpayerNum)
PTBlueService.syncInvoiceFromPT(user.id, invoiceReqSerialNo, profile.taxpayerNum)
return "操作成功"
}
@@ -0,0 +1,47 @@
@file:OptIn(ExperimentalUuidApi::class)
package com.bbit.ticket.service.system
import com.bbit.ticket.dao.system.LogDao
import com.bbit.ticket.utils.plugins.dbQuery
import io.ktor.server.application.ApplicationCall
import kotlin.uuid.ExperimentalUuidApi
object ApiAccessLogService {
/**
* 保存开放接口访问日志。
*
* @param call 当前接口请求上下文。
* @param appKey 调用方应用密钥。
* @param appName 调用方应用名称。
* @param requestBody 请求体 JSON 文本。
* @param responseCode 响应业务状态码。
* @param responseBody 响应体 JSON 文本。
* @param status 日志状态,SUCCESS 或 FAILED。
* @param errorMessage 失败时的错误信息。
* @param costMs 接口耗时,单位毫秒。
*/
suspend fun save(
call: ApplicationCall,
appKey: String?,
appName: String?,
requestBody: String?,
responseCode: String?,
responseBody: String?,
status: String,
errorMessage: String?,
costMs: Long,
) = dbQuery {
LogDao.saveApiAccessLog(
call = call,
appKey = appKey,
appName = appName,
requestBody = requestBody,
responseCode = responseCode,
responseBody = responseBody,
status = status,
errorMessage = errorMessage,
costMs = costMs,
)
}
}
+554 -2
View File
@@ -1,9 +1,561 @@
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withTimeout
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class Test {
@Test
fun helloWorld() {
print("Hello World!")
fun simulateOpenInvoiceBatchScheduler() = runBlocking {
val store = InMemoryInvoiceStore()
val customerClient = FakeCustomerInvoiceClient(total = 123)
val ptClient = FakePtClient()
val scheduler = DemoOpenInvoiceBatchScheduler(
store = store,
customerClient = customerClient,
ptClient = ptClient,
pageSize = 5,
issueConcurrency = 3,
)
store.createBatch(batchNo = "BATCH-DEMO-001")
scheduler.start()
try {
withTimeout(20_000) {
while (!store.isBatchResolved("BATCH-DEMO-001")) {
delay(1_500)
println(store.batchSnapshot("BATCH-DEMO-001"))
}
}
} catch (e: TimeoutCancellationException) {
error("scheduler did not finish in time: ${store.batchSnapshot("BATCH-DEMO-001")}")
} finally {
scheduler.stop()
}
val batch = store.getBatch("BATCH-DEMO-001")
val items = store.listItems(batch.id)
println("final batch = $batch")
println("final items =")
items.forEach { println(it) }
assertEquals(23, batch.totalCount)
assertEquals(20, batch.successCount)
assertEquals(3, batch.failedCount)
assertEquals(0, batch.processingCount)
assertEquals(BatchStatus.PARTIAL_FAILED, batch.status)
assertTrue(batch.resolved)
}
}
/**
* 演示版调度器:
* - FetchWorker:按 batchNo 分页拉甲方数据,边拉边落“库”
* - IssueWorker:扫描 PENDING 明细,并发调用 PT 开票
* - QueryWorker:扫描 QUERY_PENDING 明细,按 nextQueryAt 轮询 PT 结果
* - SummaryWorker:持续汇总批次进度
*/
private class DemoOpenInvoiceBatchScheduler(
private val store: InMemoryInvoiceStore,
private val customerClient: FakeCustomerInvoiceClient,
private val ptClient: FakePtClient,
private val pageSize: Int,
issueConcurrency: Int,
) {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val issueSemaphore = Semaphore(issueConcurrency)
@Volatile
private var running = true
fun start() {
scope.launch { recoverInterruptedTasks() }
scope.launch { fetchLoop() }
scope.launch { issueLoop() }
scope.launch { queryLoop() }
scope.launch { summaryLoop() }
}
fun stop() {
running = false
}
private suspend fun recoverInterruptedTasks() {
// 真实项目里这里处理服务崩溃留下的 ISSUING / QUERYING / 过期锁。
store.recoverStuckItems()
}
private suspend fun fetchLoop() {
while (running) {
val batch = store.claimFetchBatch()
if (batch == null) {
delay(100)
continue
}
var pageNo = batch.fetchPageNo + 1
while (running) {
val page = customerClient.fetchItems(batch.batchNo, pageNo, pageSize)
store.saveFetchedItems(batch.id, page.items)
store.markPageFetched(
batchId = batch.id,
pageNo = pageNo,
fetchedCount = page.items.size,
total = page.total,
)
val reachedTail = !page.hasNext || page.items.isEmpty() || page.items.size < pageSize
if (reachedTail) {
store.markFetchFinished(batch.id)
break
}
pageNo++
}
}
}
private suspend fun issueLoop() {
while (running) {
val items = store.claimIssueItems(limit = 20)
if (items.isEmpty()) {
delay(100)
continue
}
coroutineScope {
items.forEach { item ->
launch {
issueSemaphore.withPermit {
issueOne(item)
}
}
}
}
}
}
private suspend fun issueOne(item: InvoiceItemRecord) {
runCatching {
ptClient.issueInvoice(item.invoiceReqSerialNo)
}.onSuccess {
store.markIssueSubmitted(
itemId = item.id,
nextQueryAtMs = nowMs() + 200,
)
}.onFailure { error ->
store.markItemFailed(item.id, "ISSUE_FAILED", error.message ?: "issue failed")
}
}
private suspend fun queryLoop() {
while (running) {
val items = store.claimQueryableItems(limit = 30, nowMs = nowMs())
if (items.isEmpty()) {
delay(100)
continue
}
items.forEach { item ->
queryOne(item)
}
}
}
private suspend fun queryOne(item: InvoiceItemRecord) {
val result = ptClient.queryInvoice(item.invoiceReqSerialNo)
when (result.status) {
PtInvoiceStatus.SUCCESS -> store.markItemSuccess(item.id, result.invoiceNo)
PtInvoiceStatus.FAILED -> store.markItemFailed(item.id, result.code, result.message)
PtInvoiceStatus.PROCESSING -> {
val nextDelayMs = when {
item.queryAttempts < 5 -> 200L
item.queryAttempts < 20 -> 500L
else -> 1_000L
}
store.markQueryPending(item.id, nowMs() + nextDelayMs)
}
}
}
private suspend fun summaryLoop() {
while (running) {
store.refreshAllBatchSummary()
delay(200)
}
}
private fun nowMs(): Long = System.currentTimeMillis()
}
private class FakeCustomerInvoiceClient(private val total: Int) {
suspend fun fetchItems(batchNo: String, pageNo: Int, pageSize: Int): CustomerInvoicePage {
delay(120)
val start = (pageNo - 1) * pageSize
if (start >= total) {
return CustomerInvoicePage(total = total, hasNext = false, items = emptyList())
}
val endExclusive = minOf(start + pageSize, total)
val items = (start until endExclusive).map { index ->
val no = index + 1
CustomerInvoiceItem(
sourceBizNo = "ORDER-${no.toString().padStart(3, '0')}",
invoiceReqSerialNo = "BL20260520${no.toString().padStart(8, '0')}",
buyerName = "测试客户$no",
amount = "100.00",
rawJson = """{"batchNo":"$batchNo","sourceBizNo":"ORDER-$no"}""",
)
}
return CustomerInvoicePage(
total = total,
hasNext = endExclusive < total,
items = items,
)
}
}
private class FakePtClient {
private val queryCountBySerialNo = mutableMapOf<String, Int>()
suspend fun issueInvoice(invoiceReqSerialNo: String) {
delay(150)
// 这里模拟 PT 开票接口只是“提交成功”,不代表最终开票成功。
println("PT issue submitted: $invoiceReqSerialNo")
}
suspend fun queryInvoice(invoiceReqSerialNo: String): PtQueryResult {
delay(80)
val queryCount = (queryCountBySerialNo[invoiceReqSerialNo] ?: 0) + 1
queryCountBySerialNo[invoiceReqSerialNo] = queryCount
val serialNumber = invoiceReqSerialNo.takeLast(8).toInt()
val shouldFail = serialNumber % 7 == 0
return when {
queryCount < 3 -> PtQueryResult(PtInvoiceStatus.PROCESSING, "7777", "开票处理中")
shouldFail -> PtQueryResult(PtInvoiceStatus.FAILED, "9999", "模拟开票失败")
else -> PtQueryResult(
status = PtInvoiceStatus.SUCCESS,
code = "0000",
message = "开票成功",
invoiceNo = "NO-${invoiceReqSerialNo.takeLast(8)}",
)
}
}
}
private class InMemoryInvoiceStore {
private val mutex = Mutex()
private val batches = linkedMapOf<Long, InvoiceBatchRecord>()
private val items = linkedMapOf<Long, InvoiceItemRecord>()
private var batchIdSequence = 1L
private var itemIdSequence = 1L
suspend fun createBatch(batchNo: String): InvoiceBatchRecord = mutex.withLock {
check(batches.values.none { it.batchNo == batchNo }) { "batch already exists: $batchNo" }
val batch = InvoiceBatchRecord(
id = batchIdSequence++,
batchNo = batchNo,
status = BatchStatus.CREATED,
)
batches[batch.id] = batch
batch
}
suspend fun claimFetchBatch(): InvoiceBatchRecord? = mutex.withLock {
val batch = batches.values.firstOrNull {
!it.fetchFinished && (it.status == BatchStatus.CREATED || it.status == BatchStatus.FETCHING)
} ?: return@withLock null
val claimed = batch.copy(status = BatchStatus.FETCHING)
batches[batch.id] = claimed
claimed
}
suspend fun saveFetchedItems(batchId: Long, fetchedItems: List<CustomerInvoiceItem>) = mutex.withLock {
fetchedItems.forEach { fetched ->
val exists = items.values.any {
it.batchId == batchId && it.sourceBizNo == fetched.sourceBizNo
}
if (!exists) {
val item = InvoiceItemRecord(
id = itemIdSequence++,
batchId = batchId,
sourceBizNo = fetched.sourceBizNo,
invoiceReqSerialNo = fetched.invoiceReqSerialNo,
buyerName = fetched.buyerName,
amount = fetched.amount,
originalRequestBody = fetched.rawJson,
status = ItemStatus.PENDING,
)
items[item.id] = item
}
}
}
suspend fun markPageFetched(batchId: Long, pageNo: Int, fetchedCount: Int, total: Int) = mutex.withLock {
val batch = batches.getValue(batchId)
batches[batchId] = batch.copy(
totalCount = total,
fetchedCount = batch.fetchedCount + fetchedCount,
fetchPageNo = pageNo,
updatedAtMs = nowMs(),
)
}
suspend fun markFetchFinished(batchId: Long) = mutex.withLock {
val batch = batches.getValue(batchId)
batches[batchId] = batch.copy(
status = BatchStatus.FETCHED,
fetchFinished = true,
updatedAtMs = nowMs(),
)
}
suspend fun claimIssueItems(limit: Int): List<InvoiceItemRecord> = mutex.withLock {
val claimed = items.values
.filter { it.status == ItemStatus.PENDING }
.take(limit)
.map { item ->
val updated = item.copy(
status = ItemStatus.ISSUING,
issueAttempts = item.issueAttempts + 1,
updatedAtMs = nowMs(),
)
items[item.id] = updated
updated
}
if (claimed.isNotEmpty()) {
setBatchesStatus(claimed.map { it.batchId }.toSet(), BatchStatus.ISSUING)
}
claimed
}
suspend fun markIssueSubmitted(itemId: Long, nextQueryAtMs: Long) = mutex.withLock {
val item = items.getValue(itemId)
items[itemId] = item.copy(
status = ItemStatus.QUERY_PENDING,
nextQueryAtMs = nextQueryAtMs,
updatedAtMs = nowMs(),
)
}
suspend fun claimQueryableItems(limit: Int, nowMs: Long): List<InvoiceItemRecord> = mutex.withLock {
val claimed = items.values
.filter { it.status == ItemStatus.QUERY_PENDING && it.nextQueryAtMs <= nowMs }
.take(limit)
.map { item ->
val updated = item.copy(
status = ItemStatus.QUERYING,
queryAttempts = item.queryAttempts + 1,
updatedAtMs = nowMs(),
)
items[item.id] = updated
updated
}
if (claimed.isNotEmpty()) {
setBatchesStatus(claimed.map { it.batchId }.toSet(), BatchStatus.QUERYING)
}
claimed
}
suspend fun markQueryPending(itemId: Long, nextQueryAtMs: Long) = mutex.withLock {
val item = items.getValue(itemId)
items[itemId] = item.copy(
status = ItemStatus.QUERY_PENDING,
nextQueryAtMs = nextQueryAtMs,
updatedAtMs = nowMs(),
)
}
suspend fun markItemSuccess(itemId: Long, invoiceNo: String?) = mutex.withLock {
val item = items.getValue(itemId)
items[itemId] = item.copy(
status = ItemStatus.SUCCESS,
invoiceNo = invoiceNo,
lastErrorCode = null,
lastErrorMessage = null,
updatedAtMs = nowMs(),
)
}
suspend fun markItemFailed(itemId: Long, code: String, message: String) = mutex.withLock {
val item = items.getValue(itemId)
items[itemId] = item.copy(
status = ItemStatus.FAILED,
lastErrorCode = code,
lastErrorMessage = message,
updatedAtMs = nowMs(),
)
}
suspend fun refreshAllBatchSummary() = mutex.withLock {
batches.values.forEach { batch ->
val batchItems = items.values.filter { it.batchId == batch.id }
val success = batchItems.count { it.status == ItemStatus.SUCCESS }
val failed = batchItems.count { it.status == ItemStatus.FAILED }
val processing = batchItems.size - success - failed
val resolved = batch.fetchFinished && batchItems.isNotEmpty() && processing == 0
val status = when {
!batch.fetchFinished -> BatchStatus.FETCHING
!resolved && batchItems.any { it.status == ItemStatus.PENDING || it.status == ItemStatus.ISSUING } ->
BatchStatus.ISSUING
!resolved -> BatchStatus.QUERYING
failed == 0 -> BatchStatus.FINISHED
success == 0 -> BatchStatus.FAILED
else -> BatchStatus.PARTIAL_FAILED
}
batches[batch.id] = batch.copy(
status = status,
submittedCount = batchItems.count {
it.status != ItemStatus.PENDING && it.status != ItemStatus.ISSUING
},
successCount = success,
failedCount = failed,
processingCount = processing,
resolved = resolved,
updatedAtMs = nowMs(),
)
}
}
suspend fun recoverStuckItems() = mutex.withLock {
items.values.forEach { item ->
when (item.status) {
ItemStatus.ISSUING -> items[item.id] = item.copy(status = ItemStatus.QUERY_PENDING)
ItemStatus.QUERYING -> items[item.id] = item.copy(status = ItemStatus.QUERY_PENDING)
else -> Unit
}
}
}
suspend fun isBatchResolved(batchNo: String): Boolean = mutex.withLock {
batches.values.first { it.batchNo == batchNo }.resolved
}
suspend fun getBatch(batchNo: String): InvoiceBatchRecord = mutex.withLock {
batches.values.first { it.batchNo == batchNo }
}
suspend fun listItems(batchId: Long): List<InvoiceItemRecord> = mutex.withLock {
items.values.filter { it.batchId == batchId }
}
suspend fun batchSnapshot(batchNo: String): String = mutex.withLock {
val batch = batches.values.first { it.batchNo == batchNo }
"batch=${batch.batchNo}, status=${batch.status}, fetched=${batch.fetchedCount}/${batch.totalCount}, " +
"success=${batch.successCount}, failed=${batch.failedCount}, processing=${batch.processingCount}"
}
private fun setBatchesStatus(batchIds: Set<Long>, status: BatchStatus) {
batchIds.forEach { batchId ->
val batch = batches.getValue(batchId)
if (!batch.resolved) {
batches[batchId] = batch.copy(status = status, updatedAtMs = nowMs())
}
}
}
private fun nowMs(): Long = System.currentTimeMillis()
}
private data class CustomerInvoicePage(
val total: Int,
val hasNext: Boolean,
val items: List<CustomerInvoiceItem>,
)
private data class CustomerInvoiceItem(
val sourceBizNo: String,
val invoiceReqSerialNo: String,
val buyerName: String,
val amount: String,
val rawJson: String,
)
private data class InvoiceBatchRecord(
val id: Long,
val batchNo: String,
val status: BatchStatus,
val totalCount: Int = 0,
val fetchedCount: Int = 0,
val submittedCount: Int = 0,
val successCount: Int = 0,
val failedCount: Int = 0,
val processingCount: Int = 0,
val fetchPageNo: Int = 0,
val fetchFinished: Boolean = false,
val resolved: Boolean = false,
val updatedAtMs: Long = System.currentTimeMillis(),
)
private data class InvoiceItemRecord(
val id: Long,
val batchId: Long,
val sourceBizNo: String,
val invoiceReqSerialNo: String,
val buyerName: String,
val amount: String,
val originalRequestBody: String,
val status: ItemStatus,
val issueAttempts: Int = 0,
val queryAttempts: Int = 0,
val nextQueryAtMs: Long = 0,
val invoiceNo: String? = null,
val lastErrorCode: String? = null,
val lastErrorMessage: String? = null,
val updatedAtMs: Long = System.currentTimeMillis(),
)
private data class PtQueryResult(
val status: PtInvoiceStatus,
val code: String,
val message: String,
val invoiceNo: String? = null,
)
private enum class BatchStatus {
CREATED,
FETCHING,
FETCHED,
ISSUING,
QUERYING,
FINISHED,
PARTIAL_FAILED,
FAILED,
}
private enum class ItemStatus {
PENDING,
ISSUING,
QUERY_PENDING,
QUERYING,
SUCCESS,
FAILED,
}
private enum class PtInvoiceStatus {
PROCESSING,
SUCCESS,
FAILED,
}