996 lines
39 KiB
Python
996 lines
39 KiB
Python
"""
|
||
数据导入API路由
|
||
"""
|
||
import uuid
|
||
import io
|
||
from datetime import datetime
|
||
from typing import Any, Dict, List
|
||
from urllib.parse import quote
|
||
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, status, Query
|
||
from fastapi.responses import JSONResponse, StreamingResponse
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
from loguru import logger
|
||
import pandas as pd
|
||
|
||
from app.schemas.streamer import (
|
||
DataImportUploadRequest,
|
||
DataImportUploadResponse,
|
||
DataImportConfirmRequest,
|
||
DataImportConfirmResponse,
|
||
DataImportHistoryResponse,
|
||
DataImportHistoryListResponse,
|
||
)
|
||
from app.database import get_async_session
|
||
|
||
router = APIRouter()
|
||
|
||
# Excel模板字段定义
|
||
TEMPLATE_FIELDS = {
|
||
"streamer": {
|
||
"name": "主播信息导入模板",
|
||
"fields": [
|
||
("streamer_id", "主播ID", "示例: ZB001"),
|
||
("real_name", "真实姓名", "示例: 张三"),
|
||
("stage_name", "艺名", "示例: 小张"),
|
||
("id_card_no", "身份证号", "示例: 110101199001011234"),
|
||
("phone", "联系电话", "示例: 13800138000"),
|
||
("email", "邮箱", "示例: example@email.com"),
|
||
("platform", "签约平台", "抖音/快手/淘宝/B站"),
|
||
("mcn_agency_id", "所属MCN机构ID", "可选"),
|
||
("entry_date", "入驻日期", "格式: 2024-01-01"),
|
||
("contract_status", "合同状态", "有效/过期/终止"),
|
||
("entity_type", "主体类型", "个人/个体工商户/公司"),
|
||
("tax_region", "纳税地区", "示例: 北京市"),
|
||
("bank_account_no", "银行账号", "示例: 6222021234567890123"),
|
||
("bank_name", "开户银行", "示例: 中国工商银行"),
|
||
]
|
||
},
|
||
"mcn": {
|
||
"name": "MCN机构导入模板",
|
||
"fields": [
|
||
("agency_name", "机构名称", "示例: XX传媒有限公司"),
|
||
("credit_code", "统一社会信用代码", "18位代码"),
|
||
("legal_person", "法定代表人", "示例: 李四"),
|
||
("contact_person", "联系人", "示例: 王五"),
|
||
("contact_phone", "联系电话", "示例: 13900139000"),
|
||
("contact_email", "联系邮箱", "示例: contact@mcn.com"),
|
||
("address", "公司地址", "示例: 北京市朝阳区XXX"),
|
||
("business_scope", "经营范围", "示例: 互联网直播、广告代理等"),
|
||
("cooperation_start_date", "合作开始日期", "格式: 2024-01-01"),
|
||
("cooperation_status", "合作状态", "合作中/暂停/终止"),
|
||
]
|
||
},
|
||
"recharge": {
|
||
"name": "平台充值记录导入模板",
|
||
"fields": [
|
||
("platform", "平台名称", "抖音/快手/淘宝/B站"),
|
||
("recharge_amount", "充值金额", "数字,示例: 10000.00"),
|
||
("recharge_date", "充值日期", "格式: 2024-01-01"),
|
||
("recharge_type", "充值类型", "现金/转账/其他"),
|
||
("account_name", "充值账户名", "示例: XX公司"),
|
||
("voucher_no", "凭证号", "示例: CZ202401001"),
|
||
("remark", "备注", "可选"),
|
||
]
|
||
},
|
||
"contract": {
|
||
"name": "分成协议导入模板",
|
||
"fields": [
|
||
("contract_no", "协议编号", "示例: HT202401001"),
|
||
("contract_type", "协议类型", "独家/非独家/临时"),
|
||
("streamer_id", "主播ID", "示例: ZB001"),
|
||
("streamer_name", "主播姓名", "示例: 张三"),
|
||
("streamer_entity_type", "主播主体类型", "个人/个体工商户/公司"),
|
||
("platform_party", "平台方名称", "示例: XX科技有限公司"),
|
||
("platform_credit_code", "平台方统一社会信用代码", "18位代码"),
|
||
("revenue_type", "收入类型", "直播打赏/带货佣金/广告收入"),
|
||
("platform_ratio", "平台分成比例", "数字,示例: 0.3 表示30%"),
|
||
("streamer_ratio", "主播分成比例", "数字,示例: 0.7 表示70%"),
|
||
("settlement_cycle", "结算周期", "月结/周结/日结"),
|
||
("contract_start_date", "协议开始日期", "格式: 2024-01-01"),
|
||
("contract_end_date", "协议结束日期", "格式: 2024-12-31"),
|
||
("contract_status", "协议状态", "有效/过期/终止"),
|
||
("remark", "备注", "可选"),
|
||
]
|
||
},
|
||
"order": {
|
||
"name": "电商订单数据导入模板",
|
||
"fields": [
|
||
("platform_order_no", "平台订单号", "示例: DD202401001234"),
|
||
("ecommerce_platform", "电商平台", "抖音/快手/淘宝/京东"),
|
||
("streamer_id", "主播ID", "示例: ZB001"),
|
||
("streamer_name", "主播姓名", "示例: 张三"),
|
||
("product_id", "商品ID", "示例: SP001"),
|
||
("product_name", "商品名称", "示例: 美妆护肤套装"),
|
||
("quantity", "购买数量", "数字,示例: 2"),
|
||
("original_price", "原价", "数字,示例: 299.00"),
|
||
("sale_price", "售价", "数字,示例: 199.00"),
|
||
("total_amount", "总金额", "数字,示例: 398.00"),
|
||
("actual_payment", "实付金额", "数字,示例: 378.00"),
|
||
("commission_ratio", "佣金比例", "数字,示例: 0.2 表示20%"),
|
||
("commission_amount", "佣金金额", "数字,示例: 75.60"),
|
||
("streamer_commission", "主播佣金", "数字,示例: 52.92"),
|
||
("buyer_id", "买家ID", "示例: U123456"),
|
||
("order_time", "下单时间", "格式: 2024-01-01 10:30:00"),
|
||
("settlement_time", "结算时间", "可选,格式: 2024-01-15 10:00:00"),
|
||
("order_status", "订单状态", "已完成/待发货/已退款/已取消"),
|
||
("is_commission_settled", "是否已结算佣金", "是/否"),
|
||
("province", "省份", "可选,示例: 广东省"),
|
||
("city", "城市", "可选,示例: 深圳市"),
|
||
]
|
||
},
|
||
"settlement": {
|
||
"name": "佣金结算单导入模板",
|
||
"fields": [
|
||
("settlement_no", "结算单号", "示例: JS202401001"),
|
||
("streamer_id", "主播ID", "示例: ZB001"),
|
||
("streamer_name", "主播姓名", "示例: 张三"),
|
||
("streamer_entity_type", "主播主体类型", "个人/个体工商户/公司"),
|
||
("settlement_period", "结算周期", "格式: 2024-01"),
|
||
("settlement_start_date", "结算开始日期", "格式: 2024-01-01"),
|
||
("settlement_end_date", "结算结束日期", "格式: 2024-01-31"),
|
||
("order_count", "订单数量", "数字,示例: 100"),
|
||
("total_sales", "总销售额", "数字,示例: 50000.00"),
|
||
("total_commission", "总佣金", "数字,示例: 10000.00"),
|
||
("platform_service_fee", "平台服务费", "数字,示例: 500.00"),
|
||
("actual_settlement_amount", "实际结算金额", "数字,示例: 9500.00"),
|
||
("tax_withholding", "代扣代缴税费", "数字,示例: 950.00"),
|
||
("payment_method", "付款方式", "银行转账/支付宝/微信"),
|
||
("payment_account_no", "付款账号", "示例: 6222021234567890123"),
|
||
("payment_account_name", "付款账户名", "示例: 张三"),
|
||
("payment_time", "付款时间", "可选,格式: 2024-02-01 10:00:00"),
|
||
("payment_status", "付款状态", "已付款/待付款/付款中"),
|
||
("settlement_status", "结算状态", "已结算/待结算/结算中"),
|
||
("remark", "备注", "可选"),
|
||
]
|
||
},
|
||
"expense": {
|
||
"name": "成本费用凭证导入模板",
|
||
"fields": [
|
||
("voucher_no", "凭证号", "示例: PZ202401001"),
|
||
("expense_type", "费用类型", "营销费用/人员费用/运营费用/其他"),
|
||
("expense_category", "费用类别", "推广费/佣金/工资/租金/其他"),
|
||
("payer_name", "付款方名称", "示例: XX科技有限公司"),
|
||
("payer_account_no", "付款方账号", "示例: 6222021234567890123"),
|
||
("payee_name", "收款方名称", "示例: 张三"),
|
||
("payee_account_no", "收款方账号", "示例: 6222029876543210987"),
|
||
("payee_bank_name", "收款方开户行", "可选,示例: 中国工商银行"),
|
||
("expense_date", "费用发生日期", "格式: 2024-01-15"),
|
||
("expense_amount", "费用金额", "数字,示例: 5000.00"),
|
||
("tax_amount", "税额", "数字,示例: 300.00"),
|
||
("tax_rate", "税率", "数字,示例: 0.06 表示6%"),
|
||
("payment_method", "支付方式", "银行转账/现金/支付宝/微信"),
|
||
("payment_status", "支付状态", "已支付/待支付/支付中"),
|
||
("accounting_status", "入账状态", "已入账/待入账"),
|
||
("fiscal_year", "会计年度", "数字,示例: 2024"),
|
||
("fiscal_period", "会计期间", "数字,1-12表示月份"),
|
||
("is_large_amount", "是否大额费用", "是/否(>5万为大额)"),
|
||
("is_cross_border", "是否跨境支付", "是/否"),
|
||
("expense_description", "费用说明", "可选"),
|
||
("related_contract_id", "关联协议ID", "可选"),
|
||
]
|
||
},
|
||
"tax": {
|
||
"name": "税务申报表导入模板",
|
||
"fields": [
|
||
("taxpayer_name", "纳税人名称", "示例: XX科技有限公司"),
|
||
("taxpayer_id", "纳税人识别号", "18位代码"),
|
||
("tax_period", "税款所属期", "格式: 2024-01"),
|
||
("declaration_date", "申报日期", "格式: 2024-02-15"),
|
||
("tax_authority_code", "税务机关代码", "示例: 11010100"),
|
||
("tax_authority_name", "税务机关名称", "示例: 北京市朝阳区税务局"),
|
||
("taxpayer_type", "纳税人类型", "一般纳税人/小规模纳税人"),
|
||
("tax_rate", "税率", "数字,示例: 0.06 表示6%"),
|
||
("sales_revenue", "销售收入", "数字,示例: 100000.00"),
|
||
("sales_revenue_taxable", "应税销售收入", "数字,示例: 94339.62"),
|
||
("output_tax", "销项税额", "数字,示例: 5660.38"),
|
||
("input_tax", "进项税额", "数字,示例: 3000.00"),
|
||
("input_tax_deductible", "可抵扣进项税额", "数字,示例: 3000.00"),
|
||
("tax_payable", "应纳税额", "数字,示例: 2660.38"),
|
||
("tax_to_pay", "应补(退)税额", "数字,示例: 2660.38"),
|
||
("refund_amount", "退税金额", "数字,示例: 0.00"),
|
||
("declaration_status", "申报状态", "已申报/待申报/申报中"),
|
||
("is_reconciled", "是否已核对", "是/否"),
|
||
]
|
||
},
|
||
"bank": {
|
||
"name": "银行流水导入模板",
|
||
"fields": [
|
||
("account_no", "账号", "示例: 6222021234567890123"),
|
||
("account_name", "账户名", "示例: XX科技有限公司"),
|
||
("bank_name", "开户行", "示例: 中国工商银行北京分行"),
|
||
("transaction_date", "交易日期", "格式: 2024-01-15"),
|
||
("transaction_time", "交易时间", "格式: 2024-01-15 10:30:00"),
|
||
("transaction_type", "交易类型", "转入/转出/利息/手续费"),
|
||
("transaction_amount", "交易金额", "数字,示例: 10000.00"),
|
||
("balance", "余额", "数字,示例: 50000.00"),
|
||
("counterparty_account_no", "对手方账号", "可选"),
|
||
("counterparty_account_name", "对手方账户名", "可选"),
|
||
("counterparty_bank_name", "对手方开户行", "可选"),
|
||
("voucher_no", "凭证号", "可选"),
|
||
("transaction_purpose", "交易用途", "可选,示例: 佣金结算"),
|
||
("is_cross_border", "是否跨境", "是/否"),
|
||
("currency", "币种", "CNY/USD/EUR等"),
|
||
("amount_cny", "折算人民币金额", "数字"),
|
||
("exchange_rate", "汇率", "数字,示例: 1.00"),
|
||
("is_large_amount", "是否大额交易", "是/否(>20万为大额)"),
|
||
("is_suspicious", "是否可疑交易", "是/否"),
|
||
("suspicious_reason", "可疑原因", "可选"),
|
||
("is_reconciled", "是否已对账", "是/否"),
|
||
]
|
||
},
|
||
"invoice": {
|
||
"name": "发票数据导入模板",
|
||
"fields": [
|
||
("invoice_code", "发票代码", "示例: 011001900111"),
|
||
("invoice_no", "发票号码", "示例: 12345678"),
|
||
("invoice_type", "发票类型", "增值税专用发票/增值税普通发票/电子发票"),
|
||
("direction", "发票方向", "进项/销项"),
|
||
("invoice_date", "开票日期", "格式: 2024-01-15"),
|
||
("purchaser_name", "购买方名称", "示例: XX科技有限公司"),
|
||
("purchaser_tax_no", "购买方税号", "18位代码"),
|
||
("seller_name", "销售方名称", "示例: YY传媒有限公司"),
|
||
("seller_tax_no", "销售方税号", "18位代码"),
|
||
("total_amount", "金额合计", "数字,示例: 10000.00"),
|
||
("total_tax", "税额合计", "数字,示例: 600.00"),
|
||
("total_amount_with_tax", "价税合计", "数字,示例: 10600.00"),
|
||
("amount_in_words", "大写金额", "示例: 壹万零陆佰元整"),
|
||
("invoice_status", "发票状态", "正常/已作废/已冲红"),
|
||
("is_verified", "是否已验证", "是/否"),
|
||
("is_red_invoice", "是否红字发票", "是/否"),
|
||
("red_reason", "红字原因", "可选"),
|
||
("remark", "备注", "可选"),
|
||
]
|
||
},
|
||
}
|
||
|
||
# 模拟导入历史记录
|
||
IMPORT_HISTORY = [
|
||
{
|
||
"id": 1,
|
||
"import_type": "streamer",
|
||
"import_type_name": "主播信息",
|
||
"file_name": "主播信息.xlsx",
|
||
"total_rows": 1000,
|
||
"success_rows": 980,
|
||
"failed_rows": 20,
|
||
"status": "success",
|
||
"created_by": "admin",
|
||
"created_at": "2024-01-01 10:00:00",
|
||
"error_file_url": "/files/errors_123.xlsx",
|
||
},
|
||
]
|
||
|
||
# 存储上传文件信息
|
||
UPLOADED_FILES = {}
|
||
|
||
|
||
@router.post("/upload", response_model=DataImportUploadResponse)
|
||
async def upload_import_file(
|
||
file: UploadFile = File(..., description="上传的文件"),
|
||
import_type: str = Form(..., description="导入类型:streamer, recharge, order, contract"),
|
||
mapping: str = Form(None, description="字段映射(JSON格式)"),
|
||
):
|
||
"""
|
||
上传数据文件(Excel/CSV)
|
||
"""
|
||
logger.info(f"上传文件: {file.filename}, 类型: {import_type}")
|
||
|
||
# 检查文件格式
|
||
allowed_extensions = ["xlsx", "xls", "csv"]
|
||
file_extension = file.filename.split(".")[-1].lower()
|
||
|
||
if file_extension not in allowed_extensions:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"不支持的文件格式,仅支持: {', '.join(allowed_extensions)}",
|
||
)
|
||
|
||
try:
|
||
# 生成上传ID
|
||
upload_id = f"upload_{uuid.uuid4().hex[:12]}"
|
||
|
||
# 读取文件内容
|
||
contents = await file.read()
|
||
|
||
# 解析文件(根据扩展名)
|
||
if file_extension == "csv":
|
||
df = pd.read_csv(pd.io.common.BytesIO(contents))
|
||
else:
|
||
df = pd.read_excel(pd.io.common.BytesIO(contents))
|
||
|
||
# 获取前5行作为预览数据
|
||
preview_data = df.head(5).to_dict(orient="records")
|
||
|
||
# 生成默认字段映射(列名 -> 字段名)
|
||
field_mapping = {col: col for col in df.columns}
|
||
|
||
# 保存文件信息
|
||
file_info = {
|
||
"upload_id": upload_id,
|
||
"file_name": file.filename,
|
||
"file_size": len(contents),
|
||
"total_rows": len(df),
|
||
"data": df,
|
||
"field_mapping": field_mapping,
|
||
}
|
||
UPLOADED_FILES[upload_id] = file_info
|
||
|
||
# 解析mapping参数
|
||
mapping_dict = {}
|
||
if mapping:
|
||
try:
|
||
mapping_dict = eval(mapping) # 注意:实际生产中应使用json.loads
|
||
except:
|
||
pass
|
||
|
||
return DataImportUploadResponse(
|
||
upload_id=upload_id,
|
||
file_name=file.filename,
|
||
file_size=len(contents),
|
||
total_rows=len(df),
|
||
preview_data=preview_data,
|
||
field_mapping=field_mapping,
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"文件上传失败: {str(e)}")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"文件解析失败: {str(e)}",
|
||
)
|
||
|
||
|
||
@router.post("/confirm", response_model=DataImportConfirmResponse)
|
||
async def confirm_import(request: DataImportConfirmRequest, db: AsyncSession = Depends(get_async_session)):
|
||
"""
|
||
确认导入数据到数据库
|
||
"""
|
||
from datetime import datetime
|
||
|
||
logger.info(f"确认导入: {request.upload_id}, 类型: {request.import_type}")
|
||
|
||
# 检查上传文件是否存在
|
||
if request.upload_id not in UPLOADED_FILES:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail=f"上传文件不存在: {request.upload_id}",
|
||
)
|
||
|
||
file_info = UPLOADED_FILES[request.upload_id]
|
||
df = file_info["data"]
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
try:
|
||
# 根据导入类型调用不同的处理函数
|
||
import_type = request.import_type.lower()
|
||
|
||
if import_type == "streamer":
|
||
success_rows, failed_rows, error_log = await import_streamers(db, df)
|
||
elif import_type == "contract":
|
||
success_rows, failed_rows, error_log = await import_contracts(db, df)
|
||
elif import_type == "recharge":
|
||
success_rows, failed_rows, error_log = await import_recharges(db, df)
|
||
elif import_type == "tax":
|
||
success_rows, failed_rows, error_log = await import_tax_declarations(db, df)
|
||
elif import_type == "mcn":
|
||
success_rows, failed_rows, error_log = await import_mcn_agencies(db, df)
|
||
elif import_type == "order":
|
||
success_rows, failed_rows, error_log = await import_orders(db, df)
|
||
elif import_type == "settlement":
|
||
success_rows, failed_rows, error_log = await import_settlements(db, df)
|
||
elif import_type == "expense":
|
||
success_rows, failed_rows, error_log = await import_expenses(db, df)
|
||
elif import_type == "bank":
|
||
success_rows, failed_rows, error_log = await import_bank_transactions(db, df)
|
||
elif import_type == "invoice":
|
||
success_rows, failed_rows, error_log = await import_invoices(db, df)
|
||
else:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
detail=f"不支持的导入类型: {import_type}",
|
||
)
|
||
|
||
# 生成任务ID
|
||
task_id = f"task_{uuid.uuid4().hex[:12]}"
|
||
|
||
# 添加到历史记录
|
||
history_record = {
|
||
"id": len(IMPORT_HISTORY) + 1,
|
||
"import_type": request.import_type,
|
||
"import_type_name": TEMPLATE_FIELDS.get(import_type, {}).get("name", "未知类型"),
|
||
"file_name": file_info["file_name"],
|
||
"total_rows": success_rows + failed_rows,
|
||
"success_rows": success_rows,
|
||
"failed_rows": failed_rows,
|
||
"status": "success" if failed_rows == 0 else "partial",
|
||
"created_by": "admin",
|
||
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"error_file_url": f"/files/errors_{task_id}.xlsx" if failed_rows > 0 else None,
|
||
}
|
||
IMPORT_HISTORY.append(history_record)
|
||
|
||
return DataImportConfirmResponse(
|
||
task_id=task_id,
|
||
total_rows=success_rows + failed_rows,
|
||
success_rows=success_rows,
|
||
failed_rows=failed_rows,
|
||
error_log=error_log,
|
||
)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"数据导入失败: {str(e)}", exc_info=True)
|
||
raise HTTPException(
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
detail=f"数据导入失败: {str(e)}",
|
||
)
|
||
|
||
|
||
# 导入处理函数
|
||
async def import_streamers(db: AsyncSession, df: pd.DataFrame):
|
||
"""导入主播信息"""
|
||
from app.models.streamer import StreamerInfo
|
||
from datetime import datetime
|
||
from sqlalchemy.dialects.postgresql import insert
|
||
from sqlalchemy import text
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
streamer_data = {
|
||
"streamer_id": str(row['streamer_id']),
|
||
"streamer_name": str(row['real_name']),
|
||
"entity_type": 'individual',
|
||
"phone_number": str(row['phone']),
|
||
"bank_account_no": str(row['bank_account_no']),
|
||
"bank_name": str(row['bank_name']),
|
||
"status": 'active',
|
||
"registration_date": datetime.now().date()
|
||
}
|
||
|
||
# 可选字段
|
||
if 'id_card_no' in row and pd.notna(row['id_card_no']):
|
||
streamer_data["id_card_no"] = str(row['id_card_no'])
|
||
|
||
# 使用 ON CONFLICT 处理重复数据
|
||
stmt = insert(StreamerInfo.__table__).values(**streamer_data)
|
||
stmt = stmt.on_conflict_do_update(
|
||
constraint="streamer_info_streamer_id_key",
|
||
set_=streamer_data
|
||
)
|
||
await db.execute(stmt)
|
||
success_rows += 1
|
||
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({
|
||
"row": idx + 1,
|
||
"errorMsg": str(e),
|
||
})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
async def import_contracts(db: AsyncSession, df: pd.DataFrame):
|
||
"""导入分成协议"""
|
||
from app.models.contract import RevenueSharingContract
|
||
from datetime import datetime
|
||
from sqlalchemy.dialects.postgresql import insert
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
contract_data = {
|
||
"contract_id": str(row['contract_no']),
|
||
"contract_no": str(row['contract_no']),
|
||
"contract_type": 'tip_sharing',
|
||
"streamer_id": str(row['streamer_id']),
|
||
"streamer_name": str(row['streamer_name']),
|
||
"streamer_entity_type": str(row['streamer_entity_type']),
|
||
"platform_party": str(row['platform_party']),
|
||
"platform_credit_code": str(row['platform_credit_code']),
|
||
"revenue_type": 'tip',
|
||
"streamer_ratio": float(row['streamer_ratio']),
|
||
"platform_ratio": float(row['platform_ratio']),
|
||
"settlement_cycle": 'monthly',
|
||
"contract_start_date": datetime.strptime(str(row['contract_start_date']), '%Y-%m-%d').date(),
|
||
"contract_end_date": datetime.strptime(str(row['contract_end_date']), '%Y-%m-%d').date(),
|
||
"contract_status": str(row['contract_status'])
|
||
}
|
||
|
||
# 使用 ON CONFLICT 处理重复数据
|
||
stmt = insert(RevenueSharingContract.__table__).values(**contract_data)
|
||
stmt = stmt.on_conflict_do_update(
|
||
constraint="contract_contract_id_key",
|
||
set_=contract_data
|
||
)
|
||
await db.execute(stmt)
|
||
success_rows += 1
|
||
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({
|
||
"row": idx + 1,
|
||
"errorMsg": str(e),
|
||
})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
async def import_recharges(db: AsyncSession, df: pd.DataFrame):
|
||
"""导入充值记录"""
|
||
from app.models.streamer import PlatformRecharge
|
||
from datetime import datetime
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
recharge = PlatformRecharge(
|
||
recharge_id=f"RC_{uuid.uuid4().hex[:12]}",
|
||
user_id=str(row.get('streamer_id', row['account_name'])),
|
||
user_name=str(row['account_name']),
|
||
recharge_amount=float(row['recharge_amount']),
|
||
recharge_time=datetime.strptime(str(row['recharge_date']), '%Y-%m-%d'),
|
||
payment_method='bank_transfer',
|
||
transaction_no=f"TXN_{uuid.uuid4().hex[:12]}",
|
||
platform_order_no=f"PO_{uuid.uuid4().hex[:12]}",
|
||
actual_amount_cny=float(row['recharge_amount']),
|
||
total_coins=float(row['recharge_amount']) * 10,
|
||
status='success',
|
||
withdrawal_status='not_withdrawn'
|
||
)
|
||
|
||
db.add(recharge)
|
||
success_rows += 1
|
||
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({
|
||
"row": idx + 1,
|
||
"errorMsg": str(e),
|
||
})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
async def import_tax_declarations(db: AsyncSession, df: pd.DataFrame):
|
||
"""导入税务申报"""
|
||
from app.models.tax_declaration import TaxDeclaration
|
||
from datetime import datetime
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
declaration = TaxDeclaration(
|
||
vat_declaration_id=f"TAX_{uuid.uuid4().hex[:12]}",
|
||
taxpayer_name=str(row['taxpayer_name']),
|
||
taxpayer_id=str(row['taxpayer_id']),
|
||
tax_period=str(row['tax_period']),
|
||
declaration_date=datetime.strptime(str(row['declaration_date']), '%Y-%m-%d').date(),
|
||
tax_authority_code=str(row['tax_authority_code']),
|
||
tax_authority_name=str(row['tax_authority_name']),
|
||
taxpayer_type='small_scale',
|
||
tax_rate=float(row['tax_rate']),
|
||
sales_revenue=float(row['sales_revenue']),
|
||
sales_revenue_taxable=float(row['sales_revenue_taxable']),
|
||
output_tax=float(row['output_tax']),
|
||
input_tax=float(row.get('input_tax', 0)),
|
||
input_tax_deductible=float(row.get('input_tax_deductible', 0)),
|
||
tax_payable=float(row['tax_payable']),
|
||
tax_to_pay=float(row['tax_to_pay']),
|
||
refund_amount=float(row.get('refund_amount', 0)),
|
||
declaration_status=str(row['declaration_status']),
|
||
is_reconciled=False
|
||
)
|
||
|
||
db.add(declaration)
|
||
success_rows += 1
|
||
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({
|
||
"row": idx + 1,
|
||
"errorMsg": str(e),
|
||
})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
# 其他导入函数(简化实现)
|
||
async def import_mcn_agencies(db: AsyncSession, df: pd.DataFrame):
|
||
from app.models.streamer import McnAgency
|
||
from datetime import datetime
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
mcn = McnAgency(
|
||
mcn_name=str(row['agency_name']),
|
||
unified_social_credit_code=str(row['credit_code']),
|
||
legal_person_name=str(row['legal_person']),
|
||
bank_account_no="6222021234567890123",
|
||
bank_name="中国工商银行",
|
||
mcn_type="regular",
|
||
status="active",
|
||
registration_date=datetime.now().date()
|
||
)
|
||
db.add(mcn)
|
||
success_rows += 1
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({"row": idx + 1, "errorMsg": str(e)})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
async def import_orders(db: AsyncSession, df: pd.DataFrame):
|
||
from app.models.order import Order
|
||
from datetime import datetime
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
order = Order(
|
||
platform_order_no=str(row['platform_order_no']),
|
||
ecommerce_platform=str(row['ecommerce_platform']),
|
||
streamer_id=str(row['streamer_id']),
|
||
streamer_name=str(row['streamer_name']),
|
||
product_id=str(row['product_id']),
|
||
product_name=str(row['product_name']),
|
||
quantity=int(row['quantity']),
|
||
original_price=float(row['original_price']),
|
||
sale_price=float(row['sale_price']),
|
||
total_amount=float(row['total_amount']),
|
||
actual_payment=float(row['actual_payment']),
|
||
order_time=datetime.strptime(str(row['order_time']), '%Y-%m-%d %H:%M:%S'),
|
||
order_status='completed',
|
||
is_commission_settled=True
|
||
)
|
||
db.add(order)
|
||
success_rows += 1
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({"row": idx + 1, "errorMsg": str(e)})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
async def import_settlements(db: AsyncSession, df: pd.DataFrame):
|
||
from app.models.settlement import Settlement
|
||
from datetime import datetime
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
settlement = Settlement(
|
||
settlement_no=str(row['settlement_no']),
|
||
streamer_id=str(row['streamer_id']),
|
||
streamer_name=str(row['streamer_name']),
|
||
streamer_entity_type=str(row['streamer_entity_type']),
|
||
settlement_period=str(row['settlement_period']),
|
||
settlement_start_date=datetime.strptime(str(row['settlement_start_date']), '%Y-%m-%d').date(),
|
||
settlement_end_date=datetime.strptime(str(row['settlement_end_date']), '%Y-%m-%d').date(),
|
||
order_count=int(row['order_count']),
|
||
total_sales=float(row['total_sales']),
|
||
total_commission=float(row['total_commission']),
|
||
actual_settlement_amount=float(row['actual_settlement_amount']),
|
||
payment_status='pending',
|
||
settlement_status='pending'
|
||
)
|
||
db.add(settlement)
|
||
success_rows += 1
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({"row": idx + 1, "errorMsg": str(e)})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
async def import_expenses(db: AsyncSession, df: pd.DataFrame):
|
||
from app.models.expense import Expense
|
||
from datetime import datetime
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
expense = Expense(
|
||
voucher_no=str(row['voucher_no']),
|
||
expense_type=str(row['expense_type']),
|
||
expense_category=str(row['expense_category']),
|
||
payer_name=str(row['payer_name']),
|
||
payee_name=str(row['payee_name']),
|
||
expense_date=datetime.strptime(str(row['expense_date']), '%Y-%m-%d').date(),
|
||
expense_amount=float(row['expense_amount']),
|
||
payment_status='pending',
|
||
accounting_status='pending'
|
||
)
|
||
db.add(expense)
|
||
success_rows += 1
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({"row": idx + 1, "errorMsg": str(e)})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
async def import_bank_transactions(db: AsyncSession, df: pd.DataFrame):
|
||
from app.models.bank_transaction import BankTransaction
|
||
from datetime import datetime
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
transaction = BankTransaction(
|
||
account_no=str(row['account_no']),
|
||
account_name=str(row['account_name']),
|
||
bank_name=str(row['bank_name']),
|
||
transaction_date=datetime.strptime(str(row['transaction_date']), '%Y-%m-%d').date(),
|
||
transaction_type=str(row['transaction_type']),
|
||
transaction_amount=float(row['transaction_amount']),
|
||
balance=float(row['balance']),
|
||
currency='CNY',
|
||
amount_cny=float(row.get('amount_cny', row['transaction_amount']))
|
||
)
|
||
db.add(transaction)
|
||
success_rows += 1
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({"row": idx + 1, "errorMsg": str(e)})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
async def import_invoices(db: AsyncSession, df: pd.DataFrame):
|
||
from app.models.invoice import Invoice
|
||
from datetime import datetime
|
||
|
||
success_rows = 0
|
||
failed_rows = 0
|
||
error_log = []
|
||
|
||
for idx, row in df.iterrows():
|
||
try:
|
||
invoice = Invoice(
|
||
invoice_code=str(row['invoice_code']),
|
||
invoice_no=str(row['invoice_no']),
|
||
invoice_type=str(row['invoice_type']),
|
||
direction=str(row['direction']),
|
||
invoice_date=datetime.strptime(str(row['invoice_date']), '%Y-%m-%d').date(),
|
||
purchaser_name=str(row['purchaser_name']),
|
||
purchaser_tax_no=str(row['purchaser_tax_no']),
|
||
seller_name=str(row['seller_name']),
|
||
seller_tax_no=str(row['seller_tax_no']),
|
||
total_amount=float(row['total_amount']),
|
||
total_tax=float(row['total_tax']),
|
||
total_amount_with_tax=float(row['total_amount_with_tax']),
|
||
invoice_status='normal',
|
||
is_verified=False
|
||
)
|
||
db.add(invoice)
|
||
success_rows += 1
|
||
except Exception as e:
|
||
failed_rows += 1
|
||
error_log.append({"row": idx + 1, "errorMsg": str(e)})
|
||
|
||
await db.commit()
|
||
return success_rows, failed_rows, error_log
|
||
|
||
|
||
@router.get("/history", response_model=DataImportHistoryListResponse)
|
||
async def list_import_history(
|
||
page: int = 1,
|
||
size: int = 10,
|
||
):
|
||
"""
|
||
查询数据导入历史记录
|
||
"""
|
||
logger.info(f"查询导入历史: page={page}, size={size}")
|
||
|
||
# 分页
|
||
total = len(IMPORT_HISTORY)
|
||
start = (page - 1) * size
|
||
end = start + size
|
||
records = IMPORT_HISTORY[start:end]
|
||
|
||
return DataImportHistoryListResponse(
|
||
records=records,
|
||
total=total,
|
||
page=page,
|
||
size=size,
|
||
)
|
||
|
||
|
||
@router.get("/export", response_model=dict)
|
||
async def create_export_task(
|
||
export_type: str = Query(..., description="导出类型"),
|
||
export_format: str = Query(..., description="导出格式:excel, csv"),
|
||
file_name: str = Query(..., description="文件名"),
|
||
conditions: str = Query(None, description="查询条件(JSON格式)"),
|
||
fields: str = Query(None, description="导出字段(JSON格式)"),
|
||
):
|
||
"""
|
||
创建数据导出任务
|
||
"""
|
||
logger.info(f"创建导出任务: {export_type}")
|
||
|
||
# 生成任务ID
|
||
task_id = f"export_{uuid.uuid4().hex[:12]}"
|
||
|
||
# 模拟导出任务
|
||
export_task = {
|
||
"taskId": task_id,
|
||
"status": "processing",
|
||
"fileName": f"{file_name}_20240101100000.{export_format}",
|
||
"estimatedTime": 30,
|
||
"downloadUrl": None,
|
||
}
|
||
|
||
return {
|
||
"code": 200,
|
||
"message": "导出任务创建成功",
|
||
"data": export_task,
|
||
}
|
||
|
||
|
||
@router.get("/export/{task_id}")
|
||
async def get_export_status(task_id: str):
|
||
"""
|
||
查询导出任务状态
|
||
"""
|
||
logger.info(f"查询导出状态: {task_id}")
|
||
|
||
# 模拟任务状态
|
||
export_status = {
|
||
"taskId": task_id,
|
||
"status": "completed",
|
||
"fileName": f"主播信息导出_20240101100000.xlsx",
|
||
"fileSize": 1024000,
|
||
"downloadUrl": f"/files/export/{task_id}.xlsx",
|
||
"expiresAt": "2024-01-08 10:00:00",
|
||
"createdAt": "2024-01-01 10:00:00",
|
||
"completedAt": "2024-01-01 10:01:30",
|
||
}
|
||
|
||
return {
|
||
"code": 200,
|
||
"data": export_status,
|
||
}
|
||
|
||
|
||
@router.get("/template/list")
|
||
async def list_templates():
|
||
"""
|
||
获取所有可用的导入模板列表
|
||
"""
|
||
templates = []
|
||
for key, value in TEMPLATE_FIELDS.items():
|
||
templates.append({
|
||
"type": key,
|
||
"name": value["name"],
|
||
"field_count": len(value["fields"]),
|
||
})
|
||
return {
|
||
"code": 200,
|
||
"data": templates,
|
||
}
|
||
|
||
|
||
@router.get("/template/{template_type}")
|
||
async def download_template(template_type: str):
|
||
"""
|
||
下载指定类型的Excel导入模板
|
||
"""
|
||
logger.info(f"下载模板: {template_type}")
|
||
|
||
if template_type not in TEMPLATE_FIELDS:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_404_NOT_FOUND,
|
||
detail=f"不支持的模板类型: {template_type}",
|
||
)
|
||
|
||
template_config = TEMPLATE_FIELDS[template_type]
|
||
template_name = template_config["name"]
|
||
fields = template_config["fields"]
|
||
|
||
# 创建DataFrame
|
||
# 第一行:中文列名
|
||
# 第二行:英文字段名(供系统识别)
|
||
# 第三行:填写说明
|
||
columns = [f[1] for f in fields] # 中文列名
|
||
field_names = [f[0] for f in fields] # 英文字段名
|
||
descriptions = [f[2] for f in fields] # 填写说明
|
||
|
||
# 创建示例数据行
|
||
df = pd.DataFrame(columns=columns)
|
||
|
||
# 添加字段说明行
|
||
df.loc[0] = descriptions
|
||
|
||
# 创建Excel文件
|
||
output = io.BytesIO()
|
||
with pd.ExcelWriter(output, engine='openpyxl') as writer:
|
||
df.to_excel(writer, sheet_name='数据', index=False)
|
||
|
||
# 获取工作表
|
||
workbook = writer.book
|
||
worksheet = writer.sheets['数据']
|
||
|
||
# 添加字段映射说明sheet
|
||
mapping_data = {
|
||
'中文列名': columns,
|
||
'英文字段名': field_names,
|
||
'填写说明': descriptions,
|
||
}
|
||
mapping_df = pd.DataFrame(mapping_data)
|
||
mapping_df.to_excel(writer, sheet_name='字段说明', index=False)
|
||
|
||
# 设置列宽
|
||
for sheet in [worksheet, writer.sheets['字段说明']]:
|
||
for column in sheet.columns:
|
||
max_length = 0
|
||
column_letter = column[0].column_letter
|
||
for cell in column:
|
||
try:
|
||
if len(str(cell.value)) > max_length:
|
||
max_length = len(str(cell.value))
|
||
except:
|
||
pass
|
||
adjusted_width = min(max_length + 2, 50)
|
||
sheet.column_dimensions[column_letter].width = adjusted_width
|
||
|
||
output.seek(0)
|
||
|
||
# 返回文件流
|
||
filename = f"{template_name}.xlsx"
|
||
# URL编码文件名以支持中文
|
||
encoded_filename = quote(filename)
|
||
headers = {
|
||
'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
|
||
}
|
||
|
||
return StreamingResponse(
|
||
output,
|
||
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||
headers=headers,
|
||
)
|