我想做一个基于Hadoop的机器学习,有一个data.csv文件在HDFS的/car/data.cs...
생성일: 2026년 1월 3일
답변에 사용된 모델 GPT-5.2 Thinking by Chat01
생성일: 2026년 1월 3일
답변에 사용된 모델 GPT-5.2 Thinking by Chat01
我想做一个基于Hadoop的机器学习,有一个data.csv文件在HDFS的/car/data.csv文件,我会给你上传文件,帮我分析后,写一个python代码,我想基于机器学习预测未来的汽车销量
pythonimport pandas as pd, os, numpy as np, textwrap, re, math, statistics path="/mnt/data/data.csv" df=pd.read_csv(path) df.shape, df.head(), df.dtypes
pythonimport chardet, pathlib, pandas as pd, numpy as np, os, io data=pathlib.Path("/mnt/data/data.csv").read_bytes() chardet.detect(data[:20000]) {'encoding': 'GB2312', 'confidence': 0.99, 'language': 'Chinese'}
pythonimport pandas as pd df=pd.read_csv("/mnt/data/data.csv", encoding="gb2312") df.shape, df.head(), df.dtypes ((70358, 39), 省份 月份 市 县 年份 车架号 \ 0 Shanxi Province 3 Shuozhou shuoCity District 2013 LZW6450PF 1 Shanxi Province 3 jincheng City District 2013 EQ6450PF1 2 Shanxi Province 12 changzhi changzhiCity District 2013 BJ6440BKV1A 3 Shanxi Province 12 changzhi changzhiCity District 2013 DXK6440AF2F 4 Shanxi Province 12 changzhi changzhiCity District 2013 DXK6440AFF COMPANY 车系 车型 \ 0 SAIC GM Wuling Automobile Co., Ltd WuLing Small passenger car 1 Dongfeng Xiaokang Automobile Co., Ltd dongfeng Small passenger car 2 BAIC Yinxiang Automobile Co., Ltd beijing Small passenger car 3 Dongfeng Xiaokang Automobile Co., Ltd dongfeng Small passenger car 4 Dongfeng Xiaokang Automobile Co., Ltd dongfeng Small passenger car 商用或个人 ... Unnamed: 29 Unnamed: 30 Unnamed: 31 \ 0 personal ... NaN 7 NaN 1 personal ... NaN 7 NaN 2 personal ... NaN NaN NaN 3 personal ... NaN 5 NaN 4 personal ... NaN 5 NaN 生产商 生产子COMPANY 车机生产编号 \ 0 NaN NaN NaN 1 Dongfeng Xiaokang Automobile Co., Ltd NaN EQ6440KMF 2 BAIC Yinxiang Automobile Co., Ltd beijing BJ6440BKV1A 3 Dongfeng Xiaokang Automobile Co., Ltd dongfeng DXK6440AF2F 4 Dongfeng Xiaokang Automobile Co., Ltd dongfeng DXK6440AFF 所属企业 车属分类 Unnamed: 37 性别 0 SAIC GM Wuling Automobile Co., Ltd KK 1913.0 男性 1 chongqin渝安淮海动力-COL-COMPANY KK 1929.0 男性 2 BAIC Yinxiang Automobile Co., Ltd NaN 1938.0 男性 3 chongqin小康动力-COL-COMPANY 多用途乘用车 1926.0 女性 4 chongqin小康动力-COL-COMPANY 多用途乘用车 1934.0 女性 [5 rows x 39 columns], 省份 object 月份 int64 市 object 县 object 年份 int64 车架号 object COMPANY object 车系 object 车型 object 商用或个人 object 用途 object 销量 int64 发动机 object 排量 object 发动机参数1 float64 发动机参数2燃油类型 object 发动机参数3 float64 发动机参数1.1 float64 发动机参数4 float64 发动机参数1.2 object 发动机参数5 object 发动机参数6 object Unnamed: 22 float64 Unnamed: 23 object Unnamed: 24 object Unnamed: 25 object Unnamed: 26 float64 Unnamed: 27 object Unnamed: 28 float64 Unnamed: 29 object Unnamed: 30 object Unnamed: 31 float64 生产商 object 生产子COMPANY object 车机生产编号 object 所属企业 object 车属分类 object Unnamed: 37 float64 性别 object dtype: object)
pythondf.columns.tolist() ['省份', '月份', '市', '县', '年份', '车架号', 'COMPANY', '车系', '车型', '商用或个人', '用途', '销量', '发动机', '排量', '发动机参数1', '发动机参数2燃油类型', '发动机参数3', '发动机参数1.1', '发动机参数4', '发动机参数1.2', '发动机参数5', '发动机参数6', 'Unnamed: 22', 'Unnamed: 23', 'Unnamed: 24', 'Unnamed: 25', 'Unnamed: 26', 'Unnamed: 27', 'Unnamed: 28', 'Unnamed: 29', 'Unnamed: 30', 'Unnamed: 31', '生产商', '生产子COMPANY', '车机生产编号', '所属企业', '车属分类', 'Unnamed: 37', '性别']
pythondf['销量'].describe(), df['销量'].value_counts().head() (count 70358.0 mean 1.0 std 0.0 min 1.0 25% 1.0 50% 1.0 75% 1.0 max 1.0 Name: 销量, dtype: float64, 销量 1 70358 Name: count, dtype: int64)
pythondf[['年份','月份']].describe() 年份 月份 count 70358.0 70358.000000 mean 2013.0 6.489639 std 0.0 3.802634 min 2013.0 1.000000 25% 2013.0 3.000000 50% 2013.0 7.000000 75% 2013.0 10.000000 max 2013.0 12.000000
pythondf['年份'].unique()[:10], len(df['年份'].unique()) (array([2013]), 1)
pythondf['省份'].nunique(), df['省份'].value_counts().head() (1, 省份 Shanxi Province 70358 Name: count, dtype: int64)
pythondf['市'].nunique(), df['市'].value_counts().head() (11, 市 Taiyuan 13920 yuncheng 8350 changzhi 7707 Jinzhong 7227 datong 7063 Name: count, dtype: int64)
pythontop_cities=df['市'].value_counts().head(5) top_cities 市 Taiyuan 13920 yuncheng 8350 changzhi 7707 Jinzhong 7227 datong 7063 Name: count, dtype: int64
pythoncity_month=df.groupby(['年份','月份','市']).size().reset_index(name='sales') city_month.shape, city_month.head() ((132, 4), 年份 月份 市 sales 0 2013 1 Jinzhong 1039 1 2013 1 Linfen 1250 2 2013 1 Lüliang 658 3 2013 1 Shuozhou 218 4 2013 1 Taiyuan 1585)
下面我先基于你上传的 data.csv 做一个快速数据画像,然后给你一份可在 Hadoop/Spark(读取 HDFS /car/data.csv)上跑的 PySpark 机器学习预测代码,用来预测未来月份的汽车销量。
我读了你上传的 data.csv(编码是 GB2312),关键信息如下:
销量 列全部为 1(70,358 条都是 1),说明每一行基本代表一辆车的销量记录年份 只有 2013(没有多年的历史)省份 只有一个:Shanxi Province市 有 11 个城市(例如 Top5:Taiyuan 13920、yuncheng 8350、changzhi 7707、Jinzhong 7227、datong 7063)按“全省维度”聚合到 2013 年每月销量(即每月记录数)是:
因为只有 2013 一年,如果你直接预测 2014,模型大概率只能“重复”这种月度起伏。
既然你要“基于 Hadoop”,最合适的是 Spark on YARN + MLlib。
/car/data.csvyear、month、city → sales/car/data.csv 并预测未来销量)你把下面保存为:
car_sales_forecast_spark.py
然后用spark-submit在集群上跑。
python# car_sales_forecast_spark.py # -*- coding: utf-8 -*- import argparse from datetime import date from dateutil.relativedelta import relativedelta from pyspark.sql import SparkSession from pyspark.sql.functions import ( col, count, lpad, concat_ws, lit, to_date, sum as Fsum, when ) from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler from pyspark.ml.regression import GBTRegressor from pyspark.ml.evaluation import RegressionEvaluator def add_months(y: int, m: int, k: int): """从 (y,m) 往后推 k 个月,返回 (new_y, new_m)""" d = date(y, m, 1) + relativedelta(months=k) return d.year, d.month def build_future_months(last_year: int, last_month: int, horizon: int): """生成未来 horizon 个月的 (year, month) 列表""" out = [] for i in range(1, horizon + 1): y, m = add_months(last_year, last_month, i) out.append((y, m)) return out def main(): parser = argparse.ArgumentParser() parser.add_argument("--hdfs_path", type=str, default="hdfs:///car/data.csv", help="HDFS 上 CSV 路径,例如 hdfs:///car/data.csv 或 /car/data.csv") parser.add_argument("--encoding", type=str, default="GB2312", help="CSV 编码,默认 GB2312") parser.add_argument("--horizon", type=int, default=12, help="预测未来多少个月") parser.add_argument("--out_path", type=str, default="hdfs:///car/predictions_monthly", help="输出预测结果到 HDFS 的目录(会写 parquet)") args = parser.parse_args() spark = ( SparkSession.builder .appName("CarSalesForecast-MLlib") .getOrCreate() ) # 1) 读取 HDFS CSV(你的文件是 GB2312) raw = ( spark.read .option("header", True) .option("encoding", args.encoding) .csv(args.hdfs_path) ) # 2) 只取需要列,并做类型转换 base = ( raw.select( col("年份").cast("int").alias("year"), col("月份").cast("int").alias("month"), col("市").alias("city") ) .where(col("year").isNotNull() & col("month").isNotNull() & col("city").isNotNull()) ) # 3) 聚合:城市-月份销量 = 记录数(因为每行销量=1) city_month = ( base.groupBy("year", "month", "city") .agg(count(lit(1)).alias("sales")) ) # 构造日期字段用于时间切分(每月1号) city_month = city_month.withColumn( "ds", to_date(concat_ws("-", col("year"), lpad(col("month").cast("string"), 2, "0"), lit("01"))) ) # 4) 时间切分:用最后2个月做测试(如果月份不足则退化为随机/全部训练) distinct_ds = [r["ds"] for r in city_month.select("ds").distinct().orderBy("ds").collect()] if len(distinct_ds) >= 4: cutoff = distinct_ds[-3] # 最后两个月:>= distinct_ds[-2], distinct_ds[-1] train_df = city_month.where(col("ds") <= lit(cutoff)) test_df = city_month.where(col("ds") > lit(cutoff)) else: train_df = city_month test_df = city_month.limit(0) # 5) 特征工程 + 模型(城市做类别特征,month/year 做数值特征) city_indexer = StringIndexer(inputCol="city", outputCol="city_idx", handleInvalid="keep") city_ohe = OneHotEncoder(inputCols=["city_idx"], outputCols=["city_vec"], handleInvalid="keep") assembler = VectorAssembler( inputCols=["year", "month", "city_vec"], outputCol="features" ) # GBT 回归:对小样本也相对稳健(你也可以换成 RandomForestRegressor / LinearRegression) reg = GBTRegressor( labelCol="sales", featuresCol="features", maxIter=200, maxDepth=5 ) pipeline = Pipeline(stages=[city_indexer, city_ohe, assembler, reg]) model = pipeline.fit(train_df) # 6) 评估(如果有 test) if test_df.count() > 0: pred_test = model.transform(test_df) evaluator = RegressionEvaluator(labelCol="sales", predictionCol="prediction", metricName="rmse") rmse = evaluator.evaluate(pred_test) print(f"[INFO] Test RMSE = {rmse:.3f}") # 7) 生成未来 horizon 个月的特征并预测 # 找到数据里最后一个月份 last_row = city_month.select("year", "month").orderBy(col("year").desc(), col("month").desc()).first() last_year, last_month = int(last_row["year"]), int(last_row["month"]) future_months = build_future_months(last_year, last_month, args.horizon) cities = [r["city"] for r in city_month.select("city").distinct().collect()] future_rows = [] for (y, m) in future_months: for c in cities: future_rows.append((y, m, c)) future_df = spark.createDataFrame(future_rows, ["year", "month", "city"]).withColumn( "ds", to_date(concat_ws("-", col("year"), lpad(col("month").cast("string"), 2, "0"), lit("01"))) ) pred_future_city = model.transform(future_df) # 预测值清洗:销量不能为负;可选择四舍五入 pred_future_city = pred_future_city.withColumn( "pred_sales_city", when(col("prediction") < 0, lit(0.0)).otherwise(col("prediction")) ) # 8) 汇总到“全省每月预测销量” pred_future_monthly = ( pred_future_city.groupBy("year", "month", "ds") .agg(Fsum(col("pred_sales_city")).alias("pred_sales_total")) .orderBy("ds") ) # 9) 输出到 HDFS(parquet 更适合大数据;你也可以改成 .csv) pred_future_city.write.mode("overwrite").parquet(args.out_path.rstrip("/") + "/city_level") pred_future_monthly.write.mode("overwrite").parquet(args.out_path.rstrip("/") + "/monthly_total") print(f"[INFO] Saved predictions to: {args.out_path}") spark.stop() if __name__ == "__main__": main()
bashspark-submit \ --master yarn \ --deploy-mode client \ car_sales_forecast_spark.py \ --hdfs_path hdfs:///car/data.csv \ --horizon 12 \ --out_path hdfs:///car/predictions_monthly
输出会写到:
hdfs:///car/predictions_monthly/city_level(城市粒度预测)hdfs:///car/predictions_monthly/monthly_total(全省月度汇总预测)因为你现在的数据只有:
所以任何机器学习模型都很难学到“趋势”(比如增长/衰退),最多学到:
如果你想要更靠谱的“未来销量预测”,建议至少补充:
如果你愿意,我也可以在你补充更多年份数据后,把模型升级成更像“真正预测”的版本:加入滞后特征(lag 1/3/6/12)、滚动均值、节假日/促销特征,预测效果会明显提升。