deep-risk/backend/app/api/v1/endpoints/data_import.py
2025-12-14 20:08:27 +08:00

996 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据导入API路由
"""
import uuid
import io
from datetime import datetime
from typing import Any, Dict, List
from urllib.parse import quote
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, status, Query
from fastapi.responses import JSONResponse, StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from loguru import logger
import pandas as pd
from app.schemas.streamer import (
DataImportUploadRequest,
DataImportUploadResponse,
DataImportConfirmRequest,
DataImportConfirmResponse,
DataImportHistoryResponse,
DataImportHistoryListResponse,
)
from app.database import get_async_session
router = APIRouter()
# Excel模板字段定义
TEMPLATE_FIELDS = {
"streamer": {
"name": "主播信息导入模板",
"fields": [
("streamer_id", "主播ID", "示例: ZB001"),
("real_name", "真实姓名", "示例: 张三"),
("stage_name", "艺名", "示例: 小张"),
("id_card_no", "身份证号", "示例: 110101199001011234"),
("phone", "联系电话", "示例: 13800138000"),
("email", "邮箱", "示例: example@email.com"),
("platform", "签约平台", "抖音/快手/淘宝/B站"),
("mcn_agency_id", "所属MCN机构ID", "可选"),
("entry_date", "入驻日期", "格式: 2024-01-01"),
("contract_status", "合同状态", "有效/过期/终止"),
("entity_type", "主体类型", "个人/个体工商户/公司"),
("tax_region", "纳税地区", "示例: 北京市"),
("bank_account_no", "银行账号", "示例: 6222021234567890123"),
("bank_name", "开户银行", "示例: 中国工商银行"),
]
},
"mcn": {
"name": "MCN机构导入模板",
"fields": [
("agency_name", "机构名称", "示例: XX传媒有限公司"),
("credit_code", "统一社会信用代码", "18位代码"),
("legal_person", "法定代表人", "示例: 李四"),
("contact_person", "联系人", "示例: 王五"),
("contact_phone", "联系电话", "示例: 13900139000"),
("contact_email", "联系邮箱", "示例: contact@mcn.com"),
("address", "公司地址", "示例: 北京市朝阳区XXX"),
("business_scope", "经营范围", "示例: 互联网直播、广告代理等"),
("cooperation_start_date", "合作开始日期", "格式: 2024-01-01"),
("cooperation_status", "合作状态", "合作中/暂停/终止"),
]
},
"recharge": {
"name": "平台充值记录导入模板",
"fields": [
("platform", "平台名称", "抖音/快手/淘宝/B站"),
("recharge_amount", "充值金额", "数字,示例: 10000.00"),
("recharge_date", "充值日期", "格式: 2024-01-01"),
("recharge_type", "充值类型", "现金/转账/其他"),
("account_name", "充值账户名", "示例: XX公司"),
("voucher_no", "凭证号", "示例: CZ202401001"),
("remark", "备注", "可选"),
]
},
"contract": {
"name": "分成协议导入模板",
"fields": [
("contract_no", "协议编号", "示例: HT202401001"),
("contract_type", "协议类型", "独家/非独家/临时"),
("streamer_id", "主播ID", "示例: ZB001"),
("streamer_name", "主播姓名", "示例: 张三"),
("streamer_entity_type", "主播主体类型", "个人/个体工商户/公司"),
("platform_party", "平台方名称", "示例: XX科技有限公司"),
("platform_credit_code", "平台方统一社会信用代码", "18位代码"),
("revenue_type", "收入类型", "直播打赏/带货佣金/广告收入"),
("platform_ratio", "平台分成比例", "数字,示例: 0.3 表示30%"),
("streamer_ratio", "主播分成比例", "数字,示例: 0.7 表示70%"),
("settlement_cycle", "结算周期", "月结/周结/日结"),
("contract_start_date", "协议开始日期", "格式: 2024-01-01"),
("contract_end_date", "协议结束日期", "格式: 2024-12-31"),
("contract_status", "协议状态", "有效/过期/终止"),
("remark", "备注", "可选"),
]
},
"order": {
"name": "电商订单数据导入模板",
"fields": [
("platform_order_no", "平台订单号", "示例: DD202401001234"),
("ecommerce_platform", "电商平台", "抖音/快手/淘宝/京东"),
("streamer_id", "主播ID", "示例: ZB001"),
("streamer_name", "主播姓名", "示例: 张三"),
("product_id", "商品ID", "示例: SP001"),
("product_name", "商品名称", "示例: 美妆护肤套装"),
("quantity", "购买数量", "数字,示例: 2"),
("original_price", "原价", "数字,示例: 299.00"),
("sale_price", "售价", "数字,示例: 199.00"),
("total_amount", "总金额", "数字,示例: 398.00"),
("actual_payment", "实付金额", "数字,示例: 378.00"),
("commission_ratio", "佣金比例", "数字,示例: 0.2 表示20%"),
("commission_amount", "佣金金额", "数字,示例: 75.60"),
("streamer_commission", "主播佣金", "数字,示例: 52.92"),
("buyer_id", "买家ID", "示例: U123456"),
("order_time", "下单时间", "格式: 2024-01-01 10:30:00"),
("settlement_time", "结算时间", "可选,格式: 2024-01-15 10:00:00"),
("order_status", "订单状态", "已完成/待发货/已退款/已取消"),
("is_commission_settled", "是否已结算佣金", "是/否"),
("province", "省份", "可选,示例: 广东省"),
("city", "城市", "可选,示例: 深圳市"),
]
},
"settlement": {
"name": "佣金结算单导入模板",
"fields": [
("settlement_no", "结算单号", "示例: JS202401001"),
("streamer_id", "主播ID", "示例: ZB001"),
("streamer_name", "主播姓名", "示例: 张三"),
("streamer_entity_type", "主播主体类型", "个人/个体工商户/公司"),
("settlement_period", "结算周期", "格式: 2024-01"),
("settlement_start_date", "结算开始日期", "格式: 2024-01-01"),
("settlement_end_date", "结算结束日期", "格式: 2024-01-31"),
("order_count", "订单数量", "数字,示例: 100"),
("total_sales", "总销售额", "数字,示例: 50000.00"),
("total_commission", "总佣金", "数字,示例: 10000.00"),
("platform_service_fee", "平台服务费", "数字,示例: 500.00"),
("actual_settlement_amount", "实际结算金额", "数字,示例: 9500.00"),
("tax_withholding", "代扣代缴税费", "数字,示例: 950.00"),
("payment_method", "付款方式", "银行转账/支付宝/微信"),
("payment_account_no", "付款账号", "示例: 6222021234567890123"),
("payment_account_name", "付款账户名", "示例: 张三"),
("payment_time", "付款时间", "可选,格式: 2024-02-01 10:00:00"),
("payment_status", "付款状态", "已付款/待付款/付款中"),
("settlement_status", "结算状态", "已结算/待结算/结算中"),
("remark", "备注", "可选"),
]
},
"expense": {
"name": "成本费用凭证导入模板",
"fields": [
("voucher_no", "凭证号", "示例: PZ202401001"),
("expense_type", "费用类型", "营销费用/人员费用/运营费用/其他"),
("expense_category", "费用类别", "推广费/佣金/工资/租金/其他"),
("payer_name", "付款方名称", "示例: XX科技有限公司"),
("payer_account_no", "付款方账号", "示例: 6222021234567890123"),
("payee_name", "收款方名称", "示例: 张三"),
("payee_account_no", "收款方账号", "示例: 6222029876543210987"),
("payee_bank_name", "收款方开户行", "可选,示例: 中国工商银行"),
("expense_date", "费用发生日期", "格式: 2024-01-15"),
("expense_amount", "费用金额", "数字,示例: 5000.00"),
("tax_amount", "税额", "数字,示例: 300.00"),
("tax_rate", "税率", "数字,示例: 0.06 表示6%"),
("payment_method", "支付方式", "银行转账/现金/支付宝/微信"),
("payment_status", "支付状态", "已支付/待支付/支付中"),
("accounting_status", "入账状态", "已入账/待入账"),
("fiscal_year", "会计年度", "数字,示例: 2024"),
("fiscal_period", "会计期间", "数字1-12表示月份"),
("is_large_amount", "是否大额费用", "是/否(>5万为大额"),
("is_cross_border", "是否跨境支付", "是/否"),
("expense_description", "费用说明", "可选"),
("related_contract_id", "关联协议ID", "可选"),
]
},
"tax": {
"name": "税务申报表导入模板",
"fields": [
("taxpayer_name", "纳税人名称", "示例: XX科技有限公司"),
("taxpayer_id", "纳税人识别号", "18位代码"),
("tax_period", "税款所属期", "格式: 2024-01"),
("declaration_date", "申报日期", "格式: 2024-02-15"),
("tax_authority_code", "税务机关代码", "示例: 11010100"),
("tax_authority_name", "税务机关名称", "示例: 北京市朝阳区税务局"),
("taxpayer_type", "纳税人类型", "一般纳税人/小规模纳税人"),
("tax_rate", "税率", "数字,示例: 0.06 表示6%"),
("sales_revenue", "销售收入", "数字,示例: 100000.00"),
("sales_revenue_taxable", "应税销售收入", "数字,示例: 94339.62"),
("output_tax", "销项税额", "数字,示例: 5660.38"),
("input_tax", "进项税额", "数字,示例: 3000.00"),
("input_tax_deductible", "可抵扣进项税额", "数字,示例: 3000.00"),
("tax_payable", "应纳税额", "数字,示例: 2660.38"),
("tax_to_pay", "应补(退)税额", "数字,示例: 2660.38"),
("refund_amount", "退税金额", "数字,示例: 0.00"),
("declaration_status", "申报状态", "已申报/待申报/申报中"),
("is_reconciled", "是否已核对", "是/否"),
]
},
"bank": {
"name": "银行流水导入模板",
"fields": [
("account_no", "账号", "示例: 6222021234567890123"),
("account_name", "账户名", "示例: XX科技有限公司"),
("bank_name", "开户行", "示例: 中国工商银行北京分行"),
("transaction_date", "交易日期", "格式: 2024-01-15"),
("transaction_time", "交易时间", "格式: 2024-01-15 10:30:00"),
("transaction_type", "交易类型", "转入/转出/利息/手续费"),
("transaction_amount", "交易金额", "数字,示例: 10000.00"),
("balance", "余额", "数字,示例: 50000.00"),
("counterparty_account_no", "对手方账号", "可选"),
("counterparty_account_name", "对手方账户名", "可选"),
("counterparty_bank_name", "对手方开户行", "可选"),
("voucher_no", "凭证号", "可选"),
("transaction_purpose", "交易用途", "可选,示例: 佣金结算"),
("is_cross_border", "是否跨境", "是/否"),
("currency", "币种", "CNY/USD/EUR等"),
("amount_cny", "折算人民币金额", "数字"),
("exchange_rate", "汇率", "数字,示例: 1.00"),
("is_large_amount", "是否大额交易", "是/否(>20万为大额"),
("is_suspicious", "是否可疑交易", "是/否"),
("suspicious_reason", "可疑原因", "可选"),
("is_reconciled", "是否已对账", "是/否"),
]
},
"invoice": {
"name": "发票数据导入模板",
"fields": [
("invoice_code", "发票代码", "示例: 011001900111"),
("invoice_no", "发票号码", "示例: 12345678"),
("invoice_type", "发票类型", "增值税专用发票/增值税普通发票/电子发票"),
("direction", "发票方向", "进项/销项"),
("invoice_date", "开票日期", "格式: 2024-01-15"),
("purchaser_name", "购买方名称", "示例: XX科技有限公司"),
("purchaser_tax_no", "购买方税号", "18位代码"),
("seller_name", "销售方名称", "示例: YY传媒有限公司"),
("seller_tax_no", "销售方税号", "18位代码"),
("total_amount", "金额合计", "数字,示例: 10000.00"),
("total_tax", "税额合计", "数字,示例: 600.00"),
("total_amount_with_tax", "价税合计", "数字,示例: 10600.00"),
("amount_in_words", "大写金额", "示例: 壹万零陆佰元整"),
("invoice_status", "发票状态", "正常/已作废/已冲红"),
("is_verified", "是否已验证", "是/否"),
("is_red_invoice", "是否红字发票", "是/否"),
("red_reason", "红字原因", "可选"),
("remark", "备注", "可选"),
]
},
}
# 模拟导入历史记录
IMPORT_HISTORY = [
{
"id": 1,
"import_type": "streamer",
"import_type_name": "主播信息",
"file_name": "主播信息.xlsx",
"total_rows": 1000,
"success_rows": 980,
"failed_rows": 20,
"status": "success",
"created_by": "admin",
"created_at": "2024-01-01 10:00:00",
"error_file_url": "/files/errors_123.xlsx",
},
]
# 存储上传文件信息
UPLOADED_FILES = {}
@router.post("/upload", response_model=DataImportUploadResponse)
async def upload_import_file(
file: UploadFile = File(..., description="上传的文件"),
import_type: str = Form(..., description="导入类型streamer, recharge, order, contract"),
mapping: str = Form(None, description="字段映射JSON格式"),
):
"""
上传数据文件Excel/CSV
"""
logger.info(f"上传文件: {file.filename}, 类型: {import_type}")
# 检查文件格式
allowed_extensions = ["xlsx", "xls", "csv"]
file_extension = file.filename.split(".")[-1].lower()
if file_extension not in allowed_extensions:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的文件格式,仅支持: {', '.join(allowed_extensions)}",
)
try:
# 生成上传ID
upload_id = f"upload_{uuid.uuid4().hex[:12]}"
# 读取文件内容
contents = await file.read()
# 解析文件(根据扩展名)
if file_extension == "csv":
df = pd.read_csv(pd.io.common.BytesIO(contents))
else:
df = pd.read_excel(pd.io.common.BytesIO(contents))
# 获取前5行作为预览数据
preview_data = df.head(5).to_dict(orient="records")
# 生成默认字段映射(列名 -> 字段名)
field_mapping = {col: col for col in df.columns}
# 保存文件信息
file_info = {
"upload_id": upload_id,
"file_name": file.filename,
"file_size": len(contents),
"total_rows": len(df),
"data": df,
"field_mapping": field_mapping,
}
UPLOADED_FILES[upload_id] = file_info
# 解析mapping参数
mapping_dict = {}
if mapping:
try:
mapping_dict = eval(mapping) # 注意实际生产中应使用json.loads
except:
pass
return DataImportUploadResponse(
upload_id=upload_id,
file_name=file.filename,
file_size=len(contents),
total_rows=len(df),
preview_data=preview_data,
field_mapping=field_mapping,
)
except Exception as e:
logger.error(f"文件上传失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"文件解析失败: {str(e)}",
)
@router.post("/confirm", response_model=DataImportConfirmResponse)
async def confirm_import(request: DataImportConfirmRequest, db: AsyncSession = Depends(get_async_session)):
"""
确认导入数据到数据库
"""
from datetime import datetime
logger.info(f"确认导入: {request.upload_id}, 类型: {request.import_type}")
# 检查上传文件是否存在
if request.upload_id not in UPLOADED_FILES:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"上传文件不存在: {request.upload_id}",
)
file_info = UPLOADED_FILES[request.upload_id]
df = file_info["data"]
success_rows = 0
failed_rows = 0
error_log = []
try:
# 根据导入类型调用不同的处理函数
import_type = request.import_type.lower()
if import_type == "streamer":
success_rows, failed_rows, error_log = await import_streamers(db, df)
elif import_type == "contract":
success_rows, failed_rows, error_log = await import_contracts(db, df)
elif import_type == "recharge":
success_rows, failed_rows, error_log = await import_recharges(db, df)
elif import_type == "tax":
success_rows, failed_rows, error_log = await import_tax_declarations(db, df)
elif import_type == "mcn":
success_rows, failed_rows, error_log = await import_mcn_agencies(db, df)
elif import_type == "order":
success_rows, failed_rows, error_log = await import_orders(db, df)
elif import_type == "settlement":
success_rows, failed_rows, error_log = await import_settlements(db, df)
elif import_type == "expense":
success_rows, failed_rows, error_log = await import_expenses(db, df)
elif import_type == "bank":
success_rows, failed_rows, error_log = await import_bank_transactions(db, df)
elif import_type == "invoice":
success_rows, failed_rows, error_log = await import_invoices(db, df)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的导入类型: {import_type}",
)
# 生成任务ID
task_id = f"task_{uuid.uuid4().hex[:12]}"
# 添加到历史记录
history_record = {
"id": len(IMPORT_HISTORY) + 1,
"import_type": request.import_type,
"import_type_name": TEMPLATE_FIELDS.get(import_type, {}).get("name", "未知类型"),
"file_name": file_info["file_name"],
"total_rows": success_rows + failed_rows,
"success_rows": success_rows,
"failed_rows": failed_rows,
"status": "success" if failed_rows == 0 else "partial",
"created_by": "admin",
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"error_file_url": f"/files/errors_{task_id}.xlsx" if failed_rows > 0 else None,
}
IMPORT_HISTORY.append(history_record)
return DataImportConfirmResponse(
task_id=task_id,
total_rows=success_rows + failed_rows,
success_rows=success_rows,
failed_rows=failed_rows,
error_log=error_log,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"数据导入失败: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"数据导入失败: {str(e)}",
)
# 导入处理函数
async def import_streamers(db: AsyncSession, df: pd.DataFrame):
"""导入主播信息"""
from app.models.streamer import StreamerInfo
from datetime import datetime
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import text
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
streamer_data = {
"streamer_id": str(row['streamer_id']),
"streamer_name": str(row['real_name']),
"entity_type": 'individual',
"phone_number": str(row['phone']),
"bank_account_no": str(row['bank_account_no']),
"bank_name": str(row['bank_name']),
"status": 'active',
"registration_date": datetime.now().date()
}
# 可选字段
if 'id_card_no' in row and pd.notna(row['id_card_no']):
streamer_data["id_card_no"] = str(row['id_card_no'])
# 使用 ON CONFLICT 处理重复数据
stmt = insert(StreamerInfo.__table__).values(**streamer_data)
stmt = stmt.on_conflict_do_update(
constraint="streamer_info_streamer_id_key",
set_=streamer_data
)
await db.execute(stmt)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({
"row": idx + 1,
"errorMsg": str(e),
})
await db.commit()
return success_rows, failed_rows, error_log
async def import_contracts(db: AsyncSession, df: pd.DataFrame):
"""导入分成协议"""
from app.models.contract import RevenueSharingContract
from datetime import datetime
from sqlalchemy.dialects.postgresql import insert
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
contract_data = {
"contract_id": str(row['contract_no']),
"contract_no": str(row['contract_no']),
"contract_type": 'tip_sharing',
"streamer_id": str(row['streamer_id']),
"streamer_name": str(row['streamer_name']),
"streamer_entity_type": str(row['streamer_entity_type']),
"platform_party": str(row['platform_party']),
"platform_credit_code": str(row['platform_credit_code']),
"revenue_type": 'tip',
"streamer_ratio": float(row['streamer_ratio']),
"platform_ratio": float(row['platform_ratio']),
"settlement_cycle": 'monthly',
"contract_start_date": datetime.strptime(str(row['contract_start_date']), '%Y-%m-%d').date(),
"contract_end_date": datetime.strptime(str(row['contract_end_date']), '%Y-%m-%d').date(),
"contract_status": str(row['contract_status'])
}
# 使用 ON CONFLICT 处理重复数据
stmt = insert(RevenueSharingContract.__table__).values(**contract_data)
stmt = stmt.on_conflict_do_update(
constraint="contract_contract_id_key",
set_=contract_data
)
await db.execute(stmt)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({
"row": idx + 1,
"errorMsg": str(e),
})
await db.commit()
return success_rows, failed_rows, error_log
async def import_recharges(db: AsyncSession, df: pd.DataFrame):
"""导入充值记录"""
from app.models.streamer import PlatformRecharge
from datetime import datetime
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
recharge = PlatformRecharge(
recharge_id=f"RC_{uuid.uuid4().hex[:12]}",
user_id=str(row.get('streamer_id', row['account_name'])),
user_name=str(row['account_name']),
recharge_amount=float(row['recharge_amount']),
recharge_time=datetime.strptime(str(row['recharge_date']), '%Y-%m-%d'),
payment_method='bank_transfer',
transaction_no=f"TXN_{uuid.uuid4().hex[:12]}",
platform_order_no=f"PO_{uuid.uuid4().hex[:12]}",
actual_amount_cny=float(row['recharge_amount']),
total_coins=float(row['recharge_amount']) * 10,
status='success',
withdrawal_status='not_withdrawn'
)
db.add(recharge)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({
"row": idx + 1,
"errorMsg": str(e),
})
await db.commit()
return success_rows, failed_rows, error_log
async def import_tax_declarations(db: AsyncSession, df: pd.DataFrame):
"""导入税务申报"""
from app.models.tax_declaration import TaxDeclaration
from datetime import datetime
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
declaration = TaxDeclaration(
vat_declaration_id=f"TAX_{uuid.uuid4().hex[:12]}",
taxpayer_name=str(row['taxpayer_name']),
taxpayer_id=str(row['taxpayer_id']),
tax_period=str(row['tax_period']),
declaration_date=datetime.strptime(str(row['declaration_date']), '%Y-%m-%d').date(),
tax_authority_code=str(row['tax_authority_code']),
tax_authority_name=str(row['tax_authority_name']),
taxpayer_type='small_scale',
tax_rate=float(row['tax_rate']),
sales_revenue=float(row['sales_revenue']),
sales_revenue_taxable=float(row['sales_revenue_taxable']),
output_tax=float(row['output_tax']),
input_tax=float(row.get('input_tax', 0)),
input_tax_deductible=float(row.get('input_tax_deductible', 0)),
tax_payable=float(row['tax_payable']),
tax_to_pay=float(row['tax_to_pay']),
refund_amount=float(row.get('refund_amount', 0)),
declaration_status=str(row['declaration_status']),
is_reconciled=False
)
db.add(declaration)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({
"row": idx + 1,
"errorMsg": str(e),
})
await db.commit()
return success_rows, failed_rows, error_log
# 其他导入函数(简化实现)
async def import_mcn_agencies(db: AsyncSession, df: pd.DataFrame):
from app.models.streamer import McnAgency
from datetime import datetime
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
mcn = McnAgency(
mcn_name=str(row['agency_name']),
unified_social_credit_code=str(row['credit_code']),
legal_person_name=str(row['legal_person']),
bank_account_no="6222021234567890123",
bank_name="中国工商银行",
mcn_type="regular",
status="active",
registration_date=datetime.now().date()
)
db.add(mcn)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({"row": idx + 1, "errorMsg": str(e)})
await db.commit()
return success_rows, failed_rows, error_log
async def import_orders(db: AsyncSession, df: pd.DataFrame):
from app.models.order import Order
from datetime import datetime
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
order = Order(
platform_order_no=str(row['platform_order_no']),
ecommerce_platform=str(row['ecommerce_platform']),
streamer_id=str(row['streamer_id']),
streamer_name=str(row['streamer_name']),
product_id=str(row['product_id']),
product_name=str(row['product_name']),
quantity=int(row['quantity']),
original_price=float(row['original_price']),
sale_price=float(row['sale_price']),
total_amount=float(row['total_amount']),
actual_payment=float(row['actual_payment']),
order_time=datetime.strptime(str(row['order_time']), '%Y-%m-%d %H:%M:%S'),
order_status='completed',
is_commission_settled=True
)
db.add(order)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({"row": idx + 1, "errorMsg": str(e)})
await db.commit()
return success_rows, failed_rows, error_log
async def import_settlements(db: AsyncSession, df: pd.DataFrame):
from app.models.settlement import Settlement
from datetime import datetime
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
settlement = Settlement(
settlement_no=str(row['settlement_no']),
streamer_id=str(row['streamer_id']),
streamer_name=str(row['streamer_name']),
streamer_entity_type=str(row['streamer_entity_type']),
settlement_period=str(row['settlement_period']),
settlement_start_date=datetime.strptime(str(row['settlement_start_date']), '%Y-%m-%d').date(),
settlement_end_date=datetime.strptime(str(row['settlement_end_date']), '%Y-%m-%d').date(),
order_count=int(row['order_count']),
total_sales=float(row['total_sales']),
total_commission=float(row['total_commission']),
actual_settlement_amount=float(row['actual_settlement_amount']),
payment_status='pending',
settlement_status='pending'
)
db.add(settlement)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({"row": idx + 1, "errorMsg": str(e)})
await db.commit()
return success_rows, failed_rows, error_log
async def import_expenses(db: AsyncSession, df: pd.DataFrame):
from app.models.expense import Expense
from datetime import datetime
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
expense = Expense(
voucher_no=str(row['voucher_no']),
expense_type=str(row['expense_type']),
expense_category=str(row['expense_category']),
payer_name=str(row['payer_name']),
payee_name=str(row['payee_name']),
expense_date=datetime.strptime(str(row['expense_date']), '%Y-%m-%d').date(),
expense_amount=float(row['expense_amount']),
payment_status='pending',
accounting_status='pending'
)
db.add(expense)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({"row": idx + 1, "errorMsg": str(e)})
await db.commit()
return success_rows, failed_rows, error_log
async def import_bank_transactions(db: AsyncSession, df: pd.DataFrame):
from app.models.bank_transaction import BankTransaction
from datetime import datetime
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
transaction = BankTransaction(
account_no=str(row['account_no']),
account_name=str(row['account_name']),
bank_name=str(row['bank_name']),
transaction_date=datetime.strptime(str(row['transaction_date']), '%Y-%m-%d').date(),
transaction_type=str(row['transaction_type']),
transaction_amount=float(row['transaction_amount']),
balance=float(row['balance']),
currency='CNY',
amount_cny=float(row.get('amount_cny', row['transaction_amount']))
)
db.add(transaction)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({"row": idx + 1, "errorMsg": str(e)})
await db.commit()
return success_rows, failed_rows, error_log
async def import_invoices(db: AsyncSession, df: pd.DataFrame):
from app.models.invoice import Invoice
from datetime import datetime
success_rows = 0
failed_rows = 0
error_log = []
for idx, row in df.iterrows():
try:
invoice = Invoice(
invoice_code=str(row['invoice_code']),
invoice_no=str(row['invoice_no']),
invoice_type=str(row['invoice_type']),
direction=str(row['direction']),
invoice_date=datetime.strptime(str(row['invoice_date']), '%Y-%m-%d').date(),
purchaser_name=str(row['purchaser_name']),
purchaser_tax_no=str(row['purchaser_tax_no']),
seller_name=str(row['seller_name']),
seller_tax_no=str(row['seller_tax_no']),
total_amount=float(row['total_amount']),
total_tax=float(row['total_tax']),
total_amount_with_tax=float(row['total_amount_with_tax']),
invoice_status='normal',
is_verified=False
)
db.add(invoice)
success_rows += 1
except Exception as e:
failed_rows += 1
error_log.append({"row": idx + 1, "errorMsg": str(e)})
await db.commit()
return success_rows, failed_rows, error_log
@router.get("/history", response_model=DataImportHistoryListResponse)
async def list_import_history(
page: int = 1,
size: int = 10,
):
"""
查询数据导入历史记录
"""
logger.info(f"查询导入历史: page={page}, size={size}")
# 分页
total = len(IMPORT_HISTORY)
start = (page - 1) * size
end = start + size
records = IMPORT_HISTORY[start:end]
return DataImportHistoryListResponse(
records=records,
total=total,
page=page,
size=size,
)
@router.get("/export", response_model=dict)
async def create_export_task(
export_type: str = Query(..., description="导出类型"),
export_format: str = Query(..., description="导出格式excel, csv"),
file_name: str = Query(..., description="文件名"),
conditions: str = Query(None, description="查询条件JSON格式"),
fields: str = Query(None, description="导出字段JSON格式"),
):
"""
创建数据导出任务
"""
logger.info(f"创建导出任务: {export_type}")
# 生成任务ID
task_id = f"export_{uuid.uuid4().hex[:12]}"
# 模拟导出任务
export_task = {
"taskId": task_id,
"status": "processing",
"fileName": f"{file_name}_20240101100000.{export_format}",
"estimatedTime": 30,
"downloadUrl": None,
}
return {
"code": 200,
"message": "导出任务创建成功",
"data": export_task,
}
@router.get("/export/{task_id}")
async def get_export_status(task_id: str):
"""
查询导出任务状态
"""
logger.info(f"查询导出状态: {task_id}")
# 模拟任务状态
export_status = {
"taskId": task_id,
"status": "completed",
"fileName": f"主播信息导出_20240101100000.xlsx",
"fileSize": 1024000,
"downloadUrl": f"/files/export/{task_id}.xlsx",
"expiresAt": "2024-01-08 10:00:00",
"createdAt": "2024-01-01 10:00:00",
"completedAt": "2024-01-01 10:01:30",
}
return {
"code": 200,
"data": export_status,
}
@router.get("/template/list")
async def list_templates():
"""
获取所有可用的导入模板列表
"""
templates = []
for key, value in TEMPLATE_FIELDS.items():
templates.append({
"type": key,
"name": value["name"],
"field_count": len(value["fields"]),
})
return {
"code": 200,
"data": templates,
}
@router.get("/template/{template_type}")
async def download_template(template_type: str):
"""
下载指定类型的Excel导入模板
"""
logger.info(f"下载模板: {template_type}")
if template_type not in TEMPLATE_FIELDS:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"不支持的模板类型: {template_type}",
)
template_config = TEMPLATE_FIELDS[template_type]
template_name = template_config["name"]
fields = template_config["fields"]
# 创建DataFrame
# 第一行:中文列名
# 第二行:英文字段名(供系统识别)
# 第三行:填写说明
columns = [f[1] for f in fields] # 中文列名
field_names = [f[0] for f in fields] # 英文字段名
descriptions = [f[2] for f in fields] # 填写说明
# 创建示例数据行
df = pd.DataFrame(columns=columns)
# 添加字段说明行
df.loc[0] = descriptions
# 创建Excel文件
output = io.BytesIO()
with pd.ExcelWriter(output, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='数据', index=False)
# 获取工作表
workbook = writer.book
worksheet = writer.sheets['数据']
# 添加字段映射说明sheet
mapping_data = {
'中文列名': columns,
'英文字段名': field_names,
'填写说明': descriptions,
}
mapping_df = pd.DataFrame(mapping_data)
mapping_df.to_excel(writer, sheet_name='字段说明', index=False)
# 设置列宽
for sheet in [worksheet, writer.sheets['字段说明']]:
for column in sheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
sheet.column_dimensions[column_letter].width = adjusted_width
output.seek(0)
# 返回文件流
filename = f"{template_name}.xlsx"
# URL编码文件名以支持中文
encoded_filename = quote(filename)
headers = {
'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
}
return StreamingResponse(
output,
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
headers=headers,
)