txw/scripts/gen_gxnl_import_sql.py

141 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import openpyxl
import uuid
import re
from datetime import datetime
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
EXCEL_PATH = ROOT / '可信碳共性能力网站导航.xlsx'
OUT_SQL = ROOT / 'txw-mhzc' / 'sql' / 'gxnl_wzxx_import_from_excel.sql'
FL_NAMES = {
'01': '碳核算平台',
'02': '碳认证机构',
'03': '碳交易平台',
'04': '碳金融服务',
'05': '碳技术咨询',
}
TYPE_TO_FL = {
'产品碳足迹': '01',
'企业碳管理平台': '01',
'CBAM': '01',
'碳核算/排放数据': '01',
'科研平台': '01',
'软件服务': '01',
'国家部委': '01',
'地方发改部门': '01',
'地方生态环境部门': '01',
'地方工信部门': '01',
'国际碳标准/绿证': '02',
'核查机构': '02',
'行业标准/倡议': '02',
'普惠平台': '02',
'交易机构': '03',
'咨询机构': '05',
'国际组织': '05',
'国际能源/环保机构': '05',
'行业协会/平台': '05',
}
def parse_sheet(ws, skip_first_col=False):
rows = list(ws.iter_rows(values_only=True))
items = []
cur_type = None
for row in rows[1:]:
vals = list(row)
if skip_first_col:
vals = vals[1:]
t = vals[0] if len(vals) > 0 else None
name = vals[1] if len(vals) > 1 else None
url = vals[2] if len(vals) > 2 else None
tags = vals[3] if len(vals) > 3 else None
if t:
cur_type = str(t).strip()
name = str(name).strip() if name else ''
url = str(url).strip() if url else ''
tags = str(tags).strip() if tags else ''
if not name or not url or url.lower() == 'none':
continue
if not url.startswith(('http://', 'https://')):
continue
items.append({'excelType': cur_type or '', 'bt': name, 'wzLj': url, 'bqjh': tags})
return items
def esc(s):
if s is None:
return 'NULL'
return "'" + str(s).replace('\\', '\\\\').replace("'", "''") + "'"
def norm_tags(excel_type, raw_tags):
tags = []
if raw_tags:
parts = re.split(r'[,,、;|/]', raw_tags)
for p in parts:
p = p.strip()
if p and p not in tags:
tags.append(p)
if excel_type and excel_type not in tags:
tags.insert(0, excel_type)
if not tags and excel_type:
tags = [excel_type]
return ','.join(tags[:10])
def main():
wb = openpyxl.load_workbook(EXCEL_PATH, read_only=True)
items = parse_sheet(wb['Sheet1'], True) + parse_sheet(wb['Sheet2'], False)
seen = set()
unique = []
for it in items:
key = it['wzLj'].rstrip('/').lower()
if key in seen:
continue
seen.add(key)
unique.append(it)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
lines = [
'-- ============================================================',
'-- 共性能力网站导航 Excel 导入数据',
'-- 来源: 可信碳共性能力网站导航.xlsx',
f'-- 生成时间: {now}',
f'-- 记录数: {len(unique)}',
'-- ============================================================',
'SET NAMES utf8mb4;',
'',
]
unmapped = {}
for idx, it in enumerate(unique, 1):
fl = TYPE_TO_FL.get(it['excelType'])
if not fl:
unmapped[it['excelType']] = unmapped.get(it['excelType'], 0) + 1
fl = '05'
fl_mc = FL_NAMES[fl]
bqjh = norm_tags(it['excelType'], it['bqjh'])
jj = (it['bqjh'] or it['excelType'] or it['bt'])[:40]
wz_uuid = uuid.uuid4().hex
pxh = 1000 - idx
lines.append(
'INSERT INTO `txw_mhzc_gxnl_slxxb` '
'(`wz_uuid`,`bt`,`wz_lj`,`jj`,`gxnl_fl_dm`,`gxnl_fl_mc`,`bqjh`,`zt`,`sjzt`,`lyqd_dm`,`qymc`,`pxh`,`lrrq`,`yxbz`) VALUES ('
f"{esc(wz_uuid)}, {esc(it['bt'])}, {esc(it['wzLj'])}, {esc(jj)}, {esc(fl)}, {esc(fl_mc)}, {esc(bqjh)}, "
f"'2', 'Y', 'import', {esc(it['excelType'])}, {pxh}, '{now}', 'Y');"
)
OUT_SQL.parent.mkdir(parents=True, exist_ok=True)
OUT_SQL.write_text('\n'.join(lines) + '\n', encoding='utf-8')
print('records', len(unique))
print('unmapped', unmapped)
print('written', OUT_SQL)
if __name__ == '__main__':
main()