""" 收入完整性检测算法测试用例 覆盖场景: 1. 正常场景:充值与申报一致 2. 少报收入场景:存在隐瞒收入 3. 多报收入场景:申报收入超过充值 4. 无分成协议场景 5. 边界值测试 6. 异常情况处理 """ import pytest from datetime import datetime, date, timedelta from decimal import Decimal from unittest.mock import AsyncMock, MagicMock, patch from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.services.risk_detection.algorithms.revenue_integrity import RevenueIntegrityAlgorithm from app.services.risk_detection.algorithms.base import DetectionContext from app.models.risk_detection import RiskLevel from app.models.streamer import StreamerInfo, PlatformRecharge from app.models.tax_declaration import TaxDeclaration class TestRevenueIntegrityAlgorithm: """收入完整性检测算法测试类""" @pytest.fixture def algorithm(self): """创建算法实例""" return RevenueIntegrityAlgorithm() @pytest.fixture def mock_db_session(self): """创建Mock数据库会话""" session = AsyncMock(spec=AsyncSession) # Mock执行方法 session.execute = AsyncMock() return session @pytest.fixture def streamer_info(self): """主播信息Mock数据""" return { "streamer_id": "ZB_TEST_001", "streamer_name": "测试主播", "entity_type": "individual", "tax_registration_no": "TAX123456789", "unified_social_credit_code": None, "id_card_no": "110101199001011234", } @pytest.fixture def mock_recharge_data(self): """Mock充值数据""" return [ { "recharge_id": "RC001", "user_name": "测试用户", "amount": 10000.0, "time": datetime(2024, 1, 15, 10, 30, 0), "payment_method": "bank_transfer", }, { "recharge_id": "RC002", "user_name": "测试用户", "amount": 20000.0, "time": datetime(2024, 1, 20, 14, 20, 0), "payment_method": "bank_transfer", }, { "recharge_id": "RC003", "user_name": "测试用户", "amount": 15000.0, "time": datetime(2024, 1, 25, 9, 15, 0), "payment_method": "bank_transfer", }, ] @pytest.fixture def mock_declaration_data(self): """Mock税务申报数据""" return [ { "declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 45000.0, "declaration_date": date(2024, 2, 15), }, ] @pytest.fixture def mock_contract_data(self): """Mock分成协议数据""" return { "contract_id": "CT001", "streamer_ratio": 70.0, # 主播分成70% "platform_ratio": 30.0, # 平台分成30% "contract_start_date": date(2024, 1, 1), "contract_end_date": date(2024, 12, 31), } class TestRevenueIntegrityNormal(TestRevenueIntegrityAlgorithm): """正常场景测试""" @pytest.mark.asyncio async def test_revenue_match_with_contract(self, algorithm, mock_db_session, streamer_info, mock_recharge_data, mock_declaration_data, mock_contract_data): """测试场景1:收入一致(有分成协议)""" # 设置:充值总额45000元,分成70%,申报45000元 # Mock数据库查询结果 mock_db_session.execute.side_effect = [ # 查询主播信息 self._create_streamer_result(streamer_info), # 查询充值数据 self._create_recharge_result(mock_recharge_data), # 查询申报数据 self._create_declaration_result(mock_declaration_data), # 查询分成协议 self._create_contract_result(mock_contract_data), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言 assert result.risk_level == RiskLevel.NONE assert result.risk_score < 20.0 assert "收入完整性检查通过" in result.description assert result.risk_data["recharge_total"] == 45000.0 assert result.risk_data["declared_revenue"] == 45000.0 @pytest.mark.asyncio async def test_revenue_match_without_contract(self, algorithm, mock_db_session, streamer_info, mock_recharge_data, mock_declaration_data): """测试场景2:收入一致(无分成协议)""" # 设置:充值总额45000元,无分成协议,申报45000元 # Mock数据库查询(无分成协议) mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(mock_recharge_data), self._create_declaration_result(mock_declaration_data), # 无分成协议查询,返回None MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言 assert result.risk_level == RiskLevel.NONE assert result.risk_score < 20.0 assert "收入完整性检查通过" in result.description @pytest.mark.asyncio async def test_slight_difference_low_risk(self, algorithm, mock_db_session, streamer_info): """测试场景3:轻微差异(低风险)""" # 充值45000元,申报44000元,差异1000元(差异率2.22%) recharge_data = [ {"recharge_id": f"RC{i}", "user_name": "测试", "amount": 15000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} for i in range(1, 4) ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 44000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:差异在可接受范围内 assert result.risk_level == RiskLevel.NONE assert 0 <= result.risk_score <= 20.0 class TestRevenueIntegrityUnderReporting(TestRevenueIntegrityAlgorithm): """少报收入场景测试""" @pytest.mark.asyncio async def test_medium_under_reporting(self, algorithm, mock_db_session, streamer_info, mock_contract_data): """测试场景4:中度少报收入(Medium风险)""" # 充值100000元,分成70%,预期70000元,申报50000元,差异20000元(差异率28.57%) recharge_data = [ {"recharge_id": f"RC{i}", "user_name": "测试", "amount": 20000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} for i in range(1, 6) ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 50000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), self._create_contract_result(mock_contract_data), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言 assert result.risk_level == RiskLevel.MEDIUM assert 50.0 <= result.risk_score < 70.0 assert "隐瞒收入" in result.description or "少报" in result.description assert result.risk_data["difference"] == 20000.0 assert 25.0 < result.risk_data["difference_rate"] < 30.0 @pytest.mark.asyncio async def test_high_under_reporting(self, algorithm, mock_db_session, streamer_info, mock_contract_data): """测试场景5:高度少报收入(High风险)""" # 充值200000元,分成70%,预期140000元,申报80000元,差异60000元(差异率42.86%) recharge_data = [ {"recharge_id": f"RC{i}", "user_name": "测试", "amount": 40000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} for i in range(1, 6) ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 80000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), self._create_contract_result(mock_contract_data), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言 assert result.risk_level == RiskLevel.HIGH assert 70.0 <= result.risk_score < 85.0 assert "隐瞒收入" in result.description @pytest.mark.asyncio async def test_critical_under_reporting(self, algorithm, mock_db_session, streamer_info, mock_contract_data): """测试场景6:严重少报收入(Critical风险)""" # 充值500000元,分成70%,预期350000元,申报100000元,差异250000元(差异率71.43%) recharge_data = [ {"recharge_id": f"RC{i}", "user_name": "测试", "amount": 100000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} for i in range(1, 6) ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 100000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), self._create_contract_result(mock_contract_data), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言 assert result.risk_level == RiskLevel.CRITICAL assert result.risk_score >= 85.0 assert "严重风险" in result.description assert result.risk_data["difference"] >= 100000.0 class TestRevenueIntegrityOverReporting(TestRevenueIntegrityAlgorithm): """多报收入场景测试""" @pytest.mark.asyncio async def test_over_reporting_low_risk(self, algorithm, mock_db_session, streamer_info): """测试场景7:轻微多报收入(低风险)""" # 充值40000元,申报41000元,多报1000元 recharge_data = [ {"recharge_id": f"RC{i}", "user_name": "测试", "amount": 20000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} for i in range(1, 3) ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 41000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:申报超过充值,但风险较低 assert result.risk_level == RiskLevel.LOW assert "虚报" in result.description or "超过" in result.description class TestRevenueIntegrityEdgeCases(TestRevenueIntegrityAlgorithm): """边界值和异常场景测试""" @pytest.mark.asyncio async def test_no_recharge_data(self, algorithm, mock_db_session, streamer_info): """测试场景8:无充值数据""" # 无充值数据,有申报数据 recharge_data = [] # 空充值 declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 10000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:无充值但有申报,风险很高 assert result.risk_level in [RiskLevel.HIGH, RiskLevel.CRITICAL] assert "申报收入超过充值" in result.description @pytest.mark.asyncio async def test_no_declaration_data(self, algorithm, mock_db_session, streamer_info): """测试场景9:无申报数据""" # 有充值数据,无申报数据 recharge_data = [ {"recharge_id": "RC001", "user_name": "测试", "amount": 10000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} ] declaration_data = [] # 空申报 mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:有充值但无申报,严重风险 assert result.risk_level == RiskLevel.CRITICAL assert "隐瞒收入" in result.description @pytest.mark.asyncio async def test_zero_difference(self, algorithm, mock_db_session, streamer_info): """测试场景10:完全一致(零差异)""" # 充值与申报完全一致 recharge_data = [ {"recharge_id": "RC001", "user_name": "测试", "amount": 30000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 30000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:完全一致,无风险 assert result.risk_level == RiskLevel.NONE assert result.risk_score == 0.0 assert "基本一致" in result.description @pytest.mark.asyncio async def test_large_amount_boundary(self, algorithm, mock_db_session, streamer_info): """测试场景11:大额边界值(刚好10万元差异)""" # 充值200000元,申报100000元,差异100000元,差异率50% recharge_data = [ {"recharge_id": "RC001", "user_name": "测试", "amount": 200000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 100000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:刚好达到Critical阈值 assert result.risk_level == RiskLevel.CRITICAL assert result.risk_data["difference"] == 100000.0 @pytest.mark.asyncio async def test_rate_boundary_50_percent(self, algorithm, mock_db_session, streamer_info): """测试场景12:差异率边界值(刚好50%)""" # 充值200000元,申报100000元,差异100000元,差异率50% recharge_data = [ {"recharge_id": "RC001", "user_name": "测试", "amount": 200000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 100000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:50%差异率应达到Critical assert result.risk_level == RiskLevel.CRITICAL assert result.risk_data["difference_rate"] == 50.0 class TestRevenueIntegrityErrorHandling(TestRevenueIntegrityAlgorithm): """错误处理测试""" @pytest.mark.asyncio async def test_missing_streamer_id(self, algorithm, mock_db_session): """测试场景13:缺少主播ID""" context = self._create_context("", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:返回错误结果 assert result.risk_level == RiskLevel.UNKNOWN assert "失败" in result.description @pytest.mark.asyncio async def test_missing_period(self, algorithm, mock_db_session): """测试场景14:缺少期间参数""" context = DetectionContext( task_id="task_001", rule_id="rule_001", parameters={"streamer_id": "ZB_TEST_001"}, db_session=mock_db_session ) result = await algorithm.detect(context) # 断言:返回错误结果 assert result.risk_level == RiskLevel.UNKNOWN assert "失败" in result.description @pytest.mark.asyncio async def test_invalid_period_format(self, algorithm, mock_db_session): """测试场景15:无效的期间格式""" context = self._create_context("ZB_TEST_001", "2024-13", mock_db_session) # 13月无效 result = await algorithm.detect(context) # 断言:返回错误结果 assert result.risk_level == RiskLevel.UNKNOWN @pytest.mark.asyncio async def test_streamer_not_found(self, algorithm, mock_db_session): """测试场景16:主播不存在""" # Mock主播查询返回None mock_db_session.execute.side_effect = [ MagicMock(scalar_one_or_none=lambda: None), # 主播不存在 ] context = self._create_context("ZB_NONEXIST", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:返回错误结果 assert result.risk_level == RiskLevel.UNKNOWN assert "找不到主播信息" in result.description class TestRevenueIntegrityBusinessScenarios(TestRevenueIntegrityAlgorithm): """真实业务场景测试""" @pytest.mark.asyncio async def test_monthly_reconciliation(self, algorithm, mock_db_session, streamer_info): """测试场景17:月度对账场景""" # 模拟真实月度对账:充值45万,申报30万,差异15万,需要核查 recharge_data = [ { "recharge_id": f"RC{i:03d}", "user_name": "测试主播", "amount": 15000.0, "time": datetime(2024, 1, 1) + timedelta(days=i), "payment_method": "bank_transfer" } for i in range(1, 31) # 每天1笔,共30笔 ] declaration_data = [ { "declaration_id": "TAX001", "taxpayer_name": "测试主播", "tax_period": "2024-01", "sales_revenue": 300000.0, "declaration_date": date(2024, 2, 15) } ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_TEST_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言 assert result.risk_level in [RiskLevel.HIGH, RiskLevel.CRITICAL] assert result.risk_data["recharge_count"] == 30 assert result.risk_data["difference"] == 150000.0 @pytest.mark.asyncio async def test_new_streamer_no_history(self, algorithm, mock_db_session, streamer_info): """测试场景18:新主播无历史数据""" # 新主播,只有少量充值和申报 recharge_data = [ {"recharge_id": "RC001", "user_name": "新主播", "amount": 5000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "新主播", "tax_period": "2024-01", "sales_revenue": 4800.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_NEW_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:新主播轻微差异也属正常 assert result.risk_level in [RiskLevel.LOW, RiskLevel.NONE] @pytest.mark.asyncio async def test_enterprise_streamer(self, algorithm, mock_db_session): """测试场景19:企业主播""" # 企业主播,统一社会信用代码 streamer_info = { "streamer_id": "ZB_ENT_001", "streamer_name": "XX传媒有限公司", "entity_type": "enterprise", "tax_registration_no": None, "unified_social_credit_code": "91110000000000000X", "id_card_no": None, } recharge_data = [ {"recharge_id": "RC001", "user_name": "企业主播", "amount": 100000.0, "time": datetime(2024, 1, 15), "payment_method": "bank_transfer"} ] declaration_data = [ {"declaration_id": "TAX001", "taxpayer_name": "XX传媒有限公司", "tax_period": "2024-01", "sales_revenue": 100000.0, "declaration_date": date(2024, 2, 15)} ] mock_db_session.execute.side_effect = [ self._create_streamer_result(streamer_info), self._create_recharge_result(recharge_data), self._create_declaration_result(declaration_data), MagicMock(scalar_one_or_none=lambda: None), ] # 执行检测 context = self._create_context("ZB_ENT_001", "2024-01", mock_db_session) result = await algorithm.detect(context) # 断言:企业主播数据一致 assert result.risk_level == RiskLevel.NONE # ==================== 辅助方法 ==================== def _create_context(self, streamer_id: str, period: str, db_session: AsyncSession) -> DetectionContext: """创建检测上下文""" return DetectionContext( task_id="task_test_001", rule_id="rule_test_001", parameters={ "streamer_id": streamer_id, "period": period, }, db_session=db_session ) def _create_streamer_result(self, streamer_info: dict) -> MagicMock: """创建主播查询结果""" mock_streamer = MagicMock() mock_streamer.streamer_id = streamer_info["streamer_id"] mock_streamer.streamer_name = streamer_info["streamer_name"] mock_streamer.entity_type = streamer_info["entity_type"] mock_streamer.tax_registration_no = streamer_info["tax_registration_no"] mock_streamer.unified_social_credit_code = streamer_info["unified_social_credit_code"] mock_streamer.id_card_no = streamer_info["id_card_no"] result = MagicMock() result.scalar_one_or_none.return_value = mock_streamer return result def _create_recharge_result(self, recharge_data: list) -> MagicMock: """创建充值查询结果""" mock_recharges = [] for data in recharge_data: mock_recharge = MagicMock() mock_recharge.recharge_id = data["recharge_id"] mock_recharge.user_name = data["user_name"] mock_recharge.actual_amount_cny = data["amount"] mock_recharge.recharge_time = data["time"] mock_recharge.payment_method = data["payment_method"] mock_recharge.status = "success" mock_recharges.append(mock_recharge) # Mock汇总查询 summary = MagicMock() summary.count = len(recharge_data) summary.total = sum(r["amount"] for r in recharge_data) result = MagicMock() result.one.return_value = summary result.scalars.return_value.all.return_value = mock_recharges return result def _create_declaration_result(self, declaration_data: list) -> MagicMock: """创建税务申报查询结果""" mock_declarations = [] for data in declaration_data: mock_decl = MagicMock() mock_decl.vat_declaration_id = data["declaration_id"] mock_decl.taxpayer_name = data["taxpayer_name"] mock_decl.tax_period = data["tax_period"] mock_decl.sales_revenue = data["sales_revenue"] mock_decl.declaration_date = data["declaration_date"] mock_declarations.append(mock_decl) result = MagicMock() result.scalars.return_value.all.return_value = mock_declarations return result def _create_contract_result(self, contract_data: dict) -> MagicMock: """创建分成协议查询结果""" mock_contract = MagicMock() mock_contract.streamer_ratio = contract_data["streamer_ratio"] result = MagicMock() result.scalar_one_or_none.return_value = mock_contract return result # ==================== 运行测试 ==================== if __name__ == "__main__": # 运行测试 pytest.main([__file__, "-v", "--tb=short"])