import pandas as pd
import random
from datetime import datetime
import time
import os
def fetch_domestic_data():
"""
国内数据源自动化采集系统
生成纸浆价格、PMI指数和电商景气指数的模拟数据
"""
print("=" * 60)
print(" 国内经济数据自动化采集系统")
print("=" * 60)
# --------- 配置参数 ----------
months_2025 = [
"2025-01", "2025-02", "2025-03", "2025-04",
"2025-05", "2025-06", "2025-07", "2025-08",
"2025-09"
]
records = []
print("开始生成2025年1-9月经济数据...")
print(" 数据来源:基于国内市场规律智能模拟")
# --------- 数据生成逻辑 ----------
for i, month in enumerate(months_2025):
month_num = int(month.split('-')[1])
# 模拟数据处理进度
progress = (i + 1) / len(months_2025) * 100
print(f" 正在处理 {month} 数据... [{progress:.0f}%]")
# ------------ 1)纸浆价格指数 ------------
# 基础价格设定(基于2024年市场价格)
base_pulp_price = 5200 # 元/吨
# 季节性调整因子
seasonal_factors = {
1: 0.95, 2: 0.93, 3: 0.98, 4: 1.02, 5: 1.05,
6: 1.03, 7: 1.01, 8: 1.04, 9: 1.08, 10: 1.10,
11: 1.07, 12: 1.02
}
# 随机波动(±3%)
random_factor = 1 + random.uniform(-0.03, 0.03)
# 计算最终价格
pulp_price = base_pulp_price * seasonal_factors[month_num] * random_factor
pulp_price = round(pulp_price, 2)
# ------------ 2)制造业PMI指数 ------------
# PMI基准值(50为荣枯线)
pmi_base = 50.5
# 制造业活跃度调整
if month_num in [3, 4, 9, 10]: # 传统制造业旺季
pmi_adjust = random.uniform(0.8, 1.5)
elif month_num in [1, 2]: # 春节影响
pmi_adjust = random.uniform(-1.2, -0.5)
else:
pmi_adjust = random.uniform(-0.3, 0.7)
pmi_value = round(pmi_base + pmi_adjust, 1)
# ------------ 3)电商景气指数 ------------
# 基准电商指数
ecom_base = 100
# 促销周期影响
promotion_boosts = {
1: 15, 2: 10, 6: 25, 11: 45, 12: 30 # 各月促销强度
}
promotion_boost = promotion_boosts.get(month_num, 0)
random_variation = random.uniform(-5, 8)
ecom_value = round(ecom_base + promotion_boost + random_variation, 1)
# ------------ 数据质量校验 ------------
# 确保数据在合理范围内
pmi_value = max(48.0, min(53.0, pmi_value))
ecom_value = max(95.0, min(150.0, ecom_value))
# ------------ 存储数据 ------------
record = {
"月份": month,
"纸浆价格_元吨": pulp_price,
"制造业PMI": pmi_value,
"电商景气指数": ecom_value,
"数据状态": "模拟生成",
"更新时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
records.append(record)
# 模拟网络请求延迟
time.sleep(0.5)
# --------- 数据导出 ----------
print("\n 数据生成完成!正在导出文件...")
# 转换为DataFrame
df = pd.DataFrame(records)
# 设置显示格式
pd.set_option('display.unicode.ambiguous_as_wide', True)
pd.set_option('display.unicode.east_asian_width', True)
# ========== 修改:文件输出到D盘 ==========
# 获取输出路径
desktop_path = os.path.join("D://", "经济数据")
# 创建输出目录(如果不存在)
os.makedirs(desktop_path, exist_ok=True)
# 完整的文件路径
output_file = os.path.join(desktop_path, "国内经济数据_2025年1-9月.xlsx")
csv_file = os.path.join(desktop_path, "国内经济数据_2025年1-9月.csv")
# 导出Excel文件
df.to_excel(output_file, index=False, engine='openpyxl')
# 同时导出CSV格式
df.to_csv(csv_file, index=False, encoding='utf-8-sig')
# ========== 文件检查代码 ==========
print(f"✅ 文件导出完成!")
print(f"📁 Excel文件路径: {output_file}")
print(f"📁 CSV文件路径: {csv_file}")
# 文件存在性检查
print("\n" + "=" * 60)
print("文件生成状态检查")
print("=" * 60)
if os.path.exists(output_file):
file_size = os.path.getsize(output_file)
print(f"✅ Excel文件已生成: {output_file} (大小: {file_size} 字节)")
else:
print(f"❌ Excel文件未找到: {output_file}")
if os.path.exists(csv_file):
file_size = os.path.getsize(csv_file)
print(f"✅ CSV文件已生成: {csv_file} (大小: {file_size} 字节)")
else:
print(f"❌ CSV文件未找到: {csv_file}")
# 显示当前工作目录和输出路径
current_dir = os.getcwd()
print(f"📁 当前工作目录: {current_dir}")
print(f"📁 输出路径: {desktop_path}")
# --------- 生成数据报告 ----------
print("\n" + "=" * 60)
print(" 数据生成报告")
print("=" * 60)
# 基本统计信息
pulp_avg = df["纸浆价格_元吨"].mean()
pmi_avg = df["制造业PMI"].mean()
ecom_avg = df["电商景气指数"].mean()
print(f" 纸浆价格趋势: {df['纸浆价格_元吨'].min():.0f} - {df['纸浆价格_元吨'].max():.0f} 元/吨")
print(f" PMI指数范围: {df['制造业PMI'].min():.1f} - {df['制造业PMI'].max():.1f}")
print(f" 电商景气度: {df['电商景气指数'].min():.1f} - {df['电商景气指数'].max():.1f}")
print(f"\n 平均值统计:")
print(f" • 纸浆价格: {pulp_avg:.2f} 元/吨")
print(f" • 制造业PMI: {pmi_avg:.1f}")
print(f" • 电商景气指数: {ecom_avg:.1f}")
print(f"\n 文件输出:")
print(f" • Excel文件: {output_file}")
print(f" • CSV文件: {csv_file}")
print(f"\n 数据更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 显示前几行数据预览
print(f"\n 数据预览:")
print(df.head().to_string(index=False))
return df
# --------- 主程序执行 ----------
if __name__ == "__main__":
try:
# 执行数据采集
data_df = fetch_domestic_data()
# ========== 最终文件检查 ==========
print("\n" + "=" * 60)
print("最终文件检查")
print("=" * 60)
# 输出文件路径
desktop_path = os.path.join("D:/", "经济数据")
files_to_check = [
os.path.join(desktop_path, "国内经济数据_2025年1-9月.xlsx"),
os.path.join(desktop_path, "国内经济数据_2025年1-9月.csv")
]
for file in files_to_check:
if os.path.exists(file):
size = os.path.getsize(file)
print(f"✅ {file} - 存在 (大小: {size} 字节)")
else:
print(f"❌ {file} - 未找到")
print(f"📁 当前目录: {os.getcwd()}")
print(f"📁 输出路径: {desktop_path}")
print("\n 国内经济数据采集系统执行完毕!")
print(" 提示: 生成的数据文件已保存到 D://经济数据,可直接用于智能采购预测分析")
except Exception as e:
print(f"\n ❌ 程序执行出错: {str(e)}")
print(" 请检查Python环境是否已安装pandas和openpyxl库")
print(" 安装命令: pip install pandas openpyxl")