我想做一个基于Hadoop的机器学习,有一个data.csv文件在HDFS的/car/data.cs...

创建于:2026年1月3日

使用 GPT-5.2 Thinking 回答 Chat01

问题

data.csv

我想做一个基于Hadoop的机器学习,有一个data.csv文件在HDFS的/car/data.csv文件,我会给你上传文件,帮我分析后,写一个python代码,我想基于机器学习预测未来的汽车销量

Python

python
import pandas as pd, os, numpy as np, textwrap, re, math, statistics path="/mnt/data/data.csv" df=pd.read_csv(path) df.shape, df.head(), df.dtypes

Python

python
import chardet, pathlib, pandas as pd, numpy as np, os, io data=pathlib.Path("/mnt/data/data.csv").read_bytes() chardet.detect(data[:20000]) {'encoding': 'GB2312', 'confidence': 0.99, 'language': 'Chinese'}

Python

python
import pandas as pd df=pd.read_csv("/mnt/data/data.csv", encoding="gb2312") df.shape, df.head(), df.dtypes ((70358, 39), 省份 月份 市 县 年份 车架号 \ 0 Shanxi Province 3 Shuozhou shuoCity District 2013 LZW6450PF 1 Shanxi Province 3 jincheng City District 2013 EQ6450PF1 2 Shanxi Province 12 changzhi changzhiCity District 2013 BJ6440BKV1A 3 Shanxi Province 12 changzhi changzhiCity District 2013 DXK6440AF2F 4 Shanxi Province 12 changzhi changzhiCity District 2013 DXK6440AFF COMPANY 车系 车型 \ 0 SAIC GM Wuling Automobile Co., Ltd WuLing Small passenger car 1 Dongfeng Xiaokang Automobile Co., Ltd dongfeng Small passenger car 2 BAIC Yinxiang Automobile Co., Ltd beijing Small passenger car 3 Dongfeng Xiaokang Automobile Co., Ltd dongfeng Small passenger car 4 Dongfeng Xiaokang Automobile Co., Ltd dongfeng Small passenger car 商用或个人 ... Unnamed: 29 Unnamed: 30 Unnamed: 31 \ 0 personal ... NaN 7 NaN 1 personal ... NaN 7 NaN 2 personal ... NaN NaN NaN 3 personal ... NaN 5 NaN 4 personal ... NaN 5 NaN 生产商 生产子COMPANY 车机生产编号 \ 0 NaN NaN NaN 1 Dongfeng Xiaokang Automobile Co., Ltd NaN EQ6440KMF 2 BAIC Yinxiang Automobile Co., Ltd beijing BJ6440BKV1A 3 Dongfeng Xiaokang Automobile Co., Ltd dongfeng DXK6440AF2F 4 Dongfeng Xiaokang Automobile Co., Ltd dongfeng DXK6440AFF 所属企业 车属分类 Unnamed: 37 性别 0 SAIC GM Wuling Automobile Co., Ltd KK 1913.0 男性 1 chongqin渝安淮海动力-COL-COMPANY KK 1929.0 男性 2 BAIC Yinxiang Automobile Co., Ltd NaN 1938.0 男性 3 chongqin小康动力-COL-COMPANY 多用途乘用车 1926.0 女性 4 chongqin小康动力-COL-COMPANY 多用途乘用车 1934.0 女性 [5 rows x 39 columns], 省份 object 月份 int64 object object 年份 int64 车架号 object COMPANY object 车系 object 车型 object 商用或个人 object 用途 object 销量 int64 发动机 object 排量 object 发动机参数1 float64 发动机参数2燃油类型 object 发动机参数3 float64 发动机参数1.1 float64 发动机参数4 float64 发动机参数1.2 object 发动机参数5 object 发动机参数6 object Unnamed: 22 float64 Unnamed: 23 object Unnamed: 24 object Unnamed: 25 object Unnamed: 26 float64 Unnamed: 27 object Unnamed: 28 float64 Unnamed: 29 object Unnamed: 30 object Unnamed: 31 float64 生产商 object 生产子COMPANY object 车机生产编号 object 所属企业 object 车属分类 object Unnamed: 37 float64 性别 object dtype: object)

Python

python
df.columns.tolist() ['省份', '月份', '市', '县', '年份', '车架号', 'COMPANY', '车系', '车型', '商用或个人', '用途', '销量', '发动机', '排量', '发动机参数1', '发动机参数2燃油类型', '发动机参数3', '发动机参数1.1', '发动机参数4', '发动机参数1.2', '发动机参数5', '发动机参数6', 'Unnamed: 22', 'Unnamed: 23', 'Unnamed: 24', 'Unnamed: 25', 'Unnamed: 26', 'Unnamed: 27', 'Unnamed: 28', 'Unnamed: 29', 'Unnamed: 30', 'Unnamed: 31', '生产商', '生产子COMPANY', '车机生产编号', '所属企业', '车属分类', 'Unnamed: 37', '性别']

Python

python
df['销量'].describe(), df['销量'].value_counts().head() (count 70358.0 mean 1.0 std 0.0 min 1.0 25% 1.0 50% 1.0 75% 1.0 max 1.0 Name: 销量, dtype: float64, 销量 1 70358 Name: count, dtype: int64)

Python

python
df[['年份','月份']].describe() 年份 月份 count 70358.0 70358.000000 mean 2013.0 6.489639 std 0.0 3.802634 min 2013.0 1.000000 25% 2013.0 3.000000 50% 2013.0 7.000000 75% 2013.0 10.000000 max 2013.0 12.000000

Python

python
df['年份'].unique()[:10], len(df['年份'].unique()) (array([2013]), 1)

Python

python
df['省份'].nunique(), df['省份'].value_counts().head() (1, 省份 Shanxi Province 70358 Name: count, dtype: int64)

Python

python
df['市'].nunique(), df['市'].value_counts().head() (11, Taiyuan 13920 yuncheng 8350 changzhi 7707 Jinzhong 7227 datong 7063 Name: count, dtype: int64)

Python

python
top_cities=df['市'].value_counts().head(5) top_cities Taiyuan 13920 yuncheng 8350 changzhi 7707 Jinzhong 7227 datong 7063 Name: count, dtype: int64

Python

python
city_month=df.groupby(['年份','月份','市']).size().reset_index(name='sales') city_month.shape, city_month.head() ((132, 4), 年份 月份 市 sales 0 2013 1 Jinzhong 1039 1 2013 1 Linfen 1250 2 2013 1 Lüliang 658 3 2013 1 Shuozhou 218 4 2013 1 Taiyuan 1585)

思考

city_month=df.groupby(['年份','月份','市']).size().reset_index(name='sales') city_month.shape, city_month.head()

((132, 4), 年份 月份 市 sales 0 2013 1 Jinzhong 1039 1 2013 1 Linfen 1250 2 2013 1 Lüliang 658 3 2013 1 Shuozhou 218 4 2013 1 Taiyuan 1585)

Thought for 2m 16s

回答

下面我先基于你上传的 data.csv 做一个快速数据画像,然后给你一份可在 Hadoop/Spark(读取 HDFS /car/data.csv)上跑的 PySpark 机器学习预测代码,用来预测未来月份的汽车销量。

1) 数据快速分析(基于你上传的文件)

我读了你上传的 data.csv(编码是 GB2312),关键信息如下:

  • 数据量:70,358 行,39 列
  • 销量全部为 1(70,358 条都是 1),说明每一行基本代表一辆车的销量记录
    → 真正做“销量预测”需要把它按时间(年/月)聚合成每月销量(count)
  • 年份 只有 2013(没有多年的历史)
    → 这会导致“预测未来”只能学到季节性/月份规律,无法学到真实的长期趋势(建议后续补充多年的数据)
  • 省份 只有一个:Shanxi Province
  • 11 个城市(例如 Top5:Taiyuan 13920、yuncheng 8350、changzhi 7707、Jinzhong 7227、datong 7063)

按“全省维度”聚合到 2013 年每月销量(即每月记录数)是:

  • 1月 10413
  • 2月 4103
  • 3月 6548
  • 4月 4635
  • 5月 5151
  • 6月 3903
  • 7月 4449
  • 8月 4488
  • 9月 4889
  • 10月 7352
  • 11月 7070
  • 12月 7357

因为只有 2013 一年,如果你直接预测 2014,模型大概率只能“重复”这种月度起伏。


2) Hadoop/Spark 上的机器学习方案(推荐)

既然你要“基于 Hadoop”,最合适的是 Spark on YARN + MLlib

思路

  1. 从 HDFS 读取 /car/data.csv
  2. 先聚合到 (year, month, city) -> sales_count(这样有 11 城市 × 12 月 = 132 个训练样本,比只有 12 个“全省月度点”强很多)
  3. 用 ML 模型学习:year、month、citysales
  4. 生成未来 N 个月(比如 12 个月)的 (year, month, city) 特征做预测
  5. 再把各城市预测加总成“全省月销量预测”

3) 可直接运行的 PySpark 代码(读取 HDFS /car/data.csv 并预测未来销量)

你把下面保存为:car_sales_forecast_spark.py
然后用 spark-submit 在集群上跑。

python
# car_sales_forecast_spark.py # -*- coding: utf-8 -*- import argparse from datetime import date from dateutil.relativedelta import relativedelta from pyspark.sql import SparkSession from pyspark.sql.functions import ( col, count, lpad, concat_ws, lit, to_date, sum as Fsum, when ) from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler from pyspark.ml.regression import GBTRegressor from pyspark.ml.evaluation import RegressionEvaluator def add_months(y: int, m: int, k: int): """从 (y,m) 往后推 k 个月,返回 (new_y, new_m)""" d = date(y, m, 1) + relativedelta(months=k) return d.year, d.month def build_future_months(last_year: int, last_month: int, horizon: int): """生成未来 horizon 个月的 (year, month) 列表""" out = [] for i in range(1, horizon + 1): y, m = add_months(last_year, last_month, i) out.append((y, m)) return out def main(): parser = argparse.ArgumentParser() parser.add_argument("--hdfs_path", type=str, default="hdfs:///car/data.csv", help="HDFS 上 CSV 路径,例如 hdfs:///car/data.csv 或 /car/data.csv") parser.add_argument("--encoding", type=str, default="GB2312", help="CSV 编码,默认 GB2312") parser.add_argument("--horizon", type=int, default=12, help="预测未来多少个月") parser.add_argument("--out_path", type=str, default="hdfs:///car/predictions_monthly", help="输出预测结果到 HDFS 的目录(会写 parquet)") args = parser.parse_args() spark = ( SparkSession.builder .appName("CarSalesForecast-MLlib") .getOrCreate() ) # 1) 读取 HDFS CSV(你的文件是 GB2312) raw = ( spark.read .option("header", True) .option("encoding", args.encoding) .csv(args.hdfs_path) ) # 2) 只取需要列,并做类型转换 base = ( raw.select( col("年份").cast("int").alias("year"), col("月份").cast("int").alias("month"), col("市").alias("city") ) .where(col("year").isNotNull() & col("month").isNotNull() & col("city").isNotNull()) ) # 3) 聚合:城市-月份销量 = 记录数(因为每行销量=1) city_month = ( base.groupBy("year", "month", "city") .agg(count(lit(1)).alias("sales")) ) # 构造日期字段用于时间切分(每月1号) city_month = city_month.withColumn( "ds", to_date(concat_ws("-", col("year"), lpad(col("month").cast("string"), 2, "0"), lit("01"))) ) # 4) 时间切分:用最后2个月做测试(如果月份不足则退化为随机/全部训练) distinct_ds = [r["ds"] for r in city_month.select("ds").distinct().orderBy("ds").collect()] if len(distinct_ds) >= 4: cutoff = distinct_ds[-3] # 最后两个月:>= distinct_ds[-2], distinct_ds[-1] train_df = city_month.where(col("ds") <= lit(cutoff)) test_df = city_month.where(col("ds") > lit(cutoff)) else: train_df = city_month test_df = city_month.limit(0) # 5) 特征工程 + 模型(城市做类别特征,month/year 做数值特征) city_indexer = StringIndexer(inputCol="city", outputCol="city_idx", handleInvalid="keep") city_ohe = OneHotEncoder(inputCols=["city_idx"], outputCols=["city_vec"], handleInvalid="keep") assembler = VectorAssembler( inputCols=["year", "month", "city_vec"], outputCol="features" ) # GBT 回归:对小样本也相对稳健(你也可以换成 RandomForestRegressor / LinearRegression) reg = GBTRegressor( labelCol="sales", featuresCol="features", maxIter=200, maxDepth=5 ) pipeline = Pipeline(stages=[city_indexer, city_ohe, assembler, reg]) model = pipeline.fit(train_df) # 6) 评估(如果有 test) if test_df.count() > 0: pred_test = model.transform(test_df) evaluator = RegressionEvaluator(labelCol="sales", predictionCol="prediction", metricName="rmse") rmse = evaluator.evaluate(pred_test) print(f"[INFO] Test RMSE = {rmse:.3f}") # 7) 生成未来 horizon 个月的特征并预测 # 找到数据里最后一个月份 last_row = city_month.select("year", "month").orderBy(col("year").desc(), col("month").desc()).first() last_year, last_month = int(last_row["year"]), int(last_row["month"]) future_months = build_future_months(last_year, last_month, args.horizon) cities = [r["city"] for r in city_month.select("city").distinct().collect()] future_rows = [] for (y, m) in future_months: for c in cities: future_rows.append((y, m, c)) future_df = spark.createDataFrame(future_rows, ["year", "month", "city"]).withColumn( "ds", to_date(concat_ws("-", col("year"), lpad(col("month").cast("string"), 2, "0"), lit("01"))) ) pred_future_city = model.transform(future_df) # 预测值清洗:销量不能为负;可选择四舍五入 pred_future_city = pred_future_city.withColumn( "pred_sales_city", when(col("prediction") < 0, lit(0.0)).otherwise(col("prediction")) ) # 8) 汇总到“全省每月预测销量” pred_future_monthly = ( pred_future_city.groupBy("year", "month", "ds") .agg(Fsum(col("pred_sales_city")).alias("pred_sales_total")) .orderBy("ds") ) # 9) 输出到 HDFS(parquet 更适合大数据;你也可以改成 .csv) pred_future_city.write.mode("overwrite").parquet(args.out_path.rstrip("/") + "/city_level") pred_future_monthly.write.mode("overwrite").parquet(args.out_path.rstrip("/") + "/monthly_total") print(f"[INFO] Saved predictions to: {args.out_path}") spark.stop() if __name__ == "__main__": main()

在 Hadoop/YARN 上运行示例

bash
spark-submit \ --master yarn \ --deploy-mode client \ car_sales_forecast_spark.py \ --hdfs_path hdfs:///car/data.csv \ --horizon 12 \ --out_path hdfs:///car/predictions_monthly

输出会写到:

  • hdfs:///car/predictions_monthly/city_level(城市粒度预测)
  • hdfs:///car/predictions_monthly/monthly_total(全省月度汇总预测)

4) 你这个数据目前的“预测上限”(很重要)

因为你现在的数据只有:

  • 一个省
  • 一个年份(2013)

所以任何机器学习模型都很难学到“趋势”(比如增长/衰退),最多学到:

  • 不同月份的季节性波动
  • 不同城市的销量水平差异

如果你想要更靠谱的“未来销量预测”,建议至少补充:

  • 连续 2–3 年以上的历史(月度越长越好)
  • 或者更细的日期(比如按日/周),并确保跨年

如果你愿意,我也可以在你补充更多年份数据后,把模型升级成更像“真正预测”的版本:加入滞后特征(lag 1/3/6/12)、滚动均值、节假日/促销特征,预测效果会明显提升。

分享这个问答