sparkStatementMeta(bdbc6c25-e1fb-4fd1-9ce0-b2c60e73c096, 34, 6, Finished, Available)
SparkSession - hive
Azure setup
sparkStatementMeta(bdbc6c25-e1fb-4fd1-9ce0-b2c60e73c096, 34, 6, Finished, Available)
SparkSession - hive
#import package
import json
import numpy as np
import pandas as pd
# import sparknlp
# from sparknlp.base import *
# from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql.functions import sum as _sum, mean, stddev, max as _max, min as _min, count, percentile_approx, year, month, dayofmonth, ceil, col, dayofweek, hour, explode, date_format, lower, size, split, regexp_replace, isnan, when, log10, log
import pyspark.sql.types as T
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import plotly.graph_objects as go
import plotly.subplots as sp
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "plotly_mimetype+notebook_connected"
import seaborn as snsblob_account_name = "marckvnonprodblob"
blob_container_name = "bigdata"
# read only
blob_sas_token = "?sv=2021-10-04&st=2023-10-04T01%3A42%3A59Z&se=2024-01-02T02%3A42%3A00Z&sr=c&sp=rlf&sig=w3CH9MbCOpwO7DtHlrahc7AlRPxSZZb8MOgS6TaXLzI%3D"
wasbs_base_url = (
f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/"
)
spark.conf.set(
f"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net",
blob_sas_token,
)submissions_path = "reddit-parquet/submissions/"
submissions_df = spark.read.parquet(f"{wasbs_base_url}{submissions_path}")
# Single subreddit
submissions = submissions_df.filter(submissions_df.subreddit == "anime")
# sub = submissions.select("subreddit", "author", "author_flair_text", "created_utc", "title", "selftext", "num_comments", "num_crossposts", "over_18", "score", "stickied", "id") #"promoted", "pinned", "is_video", "gilded", "is_self"
sub = submissions.select("subreddit", "author", "author_flair_text", "created_utc", "title", "selftext", "num_comments", "num_crossposts", "over_18", "score", "stickied", "id", "promoted", "pinned", "is_video", "gilded", "is_self")sub.printSchema()sub = sub.filter(
(col('title')!='') & \
(col('title')!='[deleted]') & \
(col('title')!='[removed]') & \
(col('selftext')!='') & \
(col('selftext')!='[deleted]') & \
(col('selftext')!='[removed]') & \
(col('author')!='[deleted]') & \
(col('author')!='[removed]')
)
sub_cleaned = (
sub
.withColumn("created_date", date_format("created_utc", "yyyy-MM-dd")) # create date column
.withColumn("created_hour", hour("created_utc")) # create hour column
.withColumn("created_week", dayofweek("created_utc")) # create day of the week column
.withColumn("created_month", month("created_utc")) # create month of the year column
.withColumn("created_year", year("created_utc")) # create the year column
.withColumn("title", lower(col('title'))) # text cleaning: lowercase
.withColumn("selftext", lower(col('selftext'))) # text cleaning: lowercase
.withColumn("cleaned_title", regexp_replace(col('title'), r'[^a-zA-Z0-9\s]', '')) # text cleaning: only contain words or number
.withColumn("cleaned_title", regexp_replace(col('cleaned_title'), r'\s+', ' ')) # text cleaning: remove extra space in text
.withColumn('title_wordCount', size(split(col('cleaned_title'), ' '))) # word count
.withColumn("cleaned_selftext", regexp_replace(col('selftext'), r'[^a-zA-Z0-9\s]', '')) # text cleaning: only contain words or number
.withColumn("cleaned_selftext", regexp_replace(col('cleaned_selftext'), r'\s+', ' ')) # text cleaning: remove extra space in text
.withColumn('selftext_wordCount', size(split(col('cleaned_selftext'), ' '))) # word count
.withColumn('contain_pokemon', col("cleaned_title").rlike("""(?i)pokemon|(?i)pokémon""")) # create dummy variable column on title
)datastore = 'azureml://datastores/workspaceblobstore/paths'
# https://<STORAGE-ACCOUNT>.blob.core.windows.net/<CONTAINER-NAME>
Storage_URI="https://group09astorage08f5ea16c.blob.core.windows.net/azureml-blobstore-600c08e7-3c4d-4e17-a310-86a7327468a9"
workspace_default_storage_account = "group09astorage08f5ea16c"
workspace_default_container = "azureml-blobstore-600c08e7-3c4d-4e17-a310-86a7327468a9"
workspace_wasbs_base_url = (
f"wasbs://{workspace_default_container}@{workspace_default_storage_account}.blob.core.windows.net/")# sub_cleaned.write.mode("overwrite").parquet(f"{datastore}/anime_sub_cleaned.parquet")
sub_cleaned.write.mode("overwrite").parquet(f"{workspace_wasbs_base_url}/anime_sub_cleaned.parquet")datastore = 'azureml://datastores/workspaceblobstore/paths'
# https://<STORAGE-ACCOUNT>.blob.core.windows.net/<CONTAINER-NAME>
Storage_URI="https://group09astorage08f5ea16c.blob.core.windows.net/azureml-blobstore-600c08e7-3c4d-4e17-a310-86a7327468a9"
workspace_default_storage_account = "group09astorage08f5ea16c"
workspace_default_container = "azureml-blobstore-600c08e7-3c4d-4e17-a310-86a7327468a9"
workspace_wasbs_base_url = (
f"wasbs://{workspace_default_container}@{workspace_default_storage_account}.blob.core.windows.net/")StatementMeta(bdbc6c25-e1fb-4fd1-9ce0-b2c60e73c096, 34, 7, Finished, Available)
AWS Set up
# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.3.0
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")Collecting package metadata (current_repodata.json): done
Solving environment: done
==> WARNING: A newer version of conda exists. <==
current version: 23.3.1
latest version: 23.10.0
Please update conda by running
$ conda update -n base -c defaults conda
Or to minimize the number of packages updated during conda update use
conda install conda=23.10.0
# All requested packages already installed.
Note: you may need to restart the kernel to use updated packages.
Requirement already satisfied: pyspark==3.3.0 in /opt/conda/lib/python3.10/site-packages (3.3.0)
Requirement already satisfied: py4j==0.10.9.5 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.3.0) (0.10.9.5)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("PySparkApp")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider",
)
.getOrCreate()
)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
print(spark.version)Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4f91a17b-2e80-4706-8198-2f07151c0914;1.0
confs: [default]
found org.apache.hadoop#hadoop-aws;3.2.2 in central
found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 443ms :: artifacts dl 25ms
:: modules in use:
com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 0 | 0 | 0 || 2 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-4f91a17b-2e80-4706-8198-2f07151c0914
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/19ms)
23/11/28 20:20:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.3.0
#import package
import json
import numpy as np
import pandas as pd
# import sparknlp
# from sparknlp.base import *
# from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql.functions import sum as _sum, mean, stddev, max as _max, min as _min, count, percentile_approx, year, month, dayofmonth, ceil, col, dayofweek, hour, explode, date_format, lower, size, split, regexp_replace, isnan, when, log10, log
import pyspark.sql.types as T
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import plotly.graph_objects as go
import plotly.subplots as sp
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "plotly_mimetype+notebook_connected"
import seaborn as snsAzure file read
datastore = 'azureml://datastores/workspaceblobstore/paths'
sub = spark.read.parquet(f"{datastore}/anime_sub_cleaned.parquet")
# com = spark.read.parquet(f"{datastore}/anime_com_cleaned.parquet")StatementMeta(bdbc6c25-e1fb-4fd1-9ce0-b2c60e73c096, 34, 9, Finished, Available)
AWS file read
# Read cleaned data from parquet
import sagemaker
session = sagemaker.Session()
# bucket = session.default_bucket()
bucket = 'sagemaker-us-east-1-315969085594'
sub_bucket_path = f"s3a://{bucket}/project/cleaned/sub"
# com_bucket_path = f"s3a://{bucket}/project/cleaned/com"
print(f"reading submissions from {sub_bucket_path}")
sub = spark.read.parquet(sub_bucket_path, header=True)
print(f"shape of the sub dataframe is {sub.count():,}x{len(sub.columns)}")
# print(f"reading comments from {com_bucket_path}")
# com = spark.read.parquet(com_bucket_path, header=True)
# print(f"shape of the com dataframe is {com.count():,}x{len(com.columns)}")sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
reading submissions from s3a://sagemaker-us-east-1-315969085594/project/cleaned/sub
23/11/28 20:20:16 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
[Stage 1:============================================> (3 + 1) / 4]
shape of the sub dataframe is 110,247x22
sub.printSchema()root
|-- subreddit: string (nullable = true)
|-- author: string (nullable = true)
|-- author_flair_text: string (nullable = true)
|-- created_utc: timestamp (nullable = true)
|-- title: string (nullable = true)
|-- selftext: string (nullable = true)
|-- num_comments: long (nullable = true)
|-- num_crossposts: long (nullable = true)
|-- over_18: boolean (nullable = true)
|-- score: long (nullable = true)
|-- stickied: boolean (nullable = true)
|-- id: string (nullable = true)
|-- created_date: string (nullable = true)
|-- created_hour: integer (nullable = true)
|-- created_week: integer (nullable = true)
|-- created_month: integer (nullable = true)
|-- created_year: integer (nullable = true)
|-- cleaned_title: string (nullable = true)
|-- title_wordCount: integer (nullable = true)
|-- cleaned_selftext: string (nullable = true)
|-- selftext_wordCount: integer (nullable = true)
|-- contain_pokemon: boolean (nullable = true)
Business Goal: Determine the score of submissions in Anime subreddit
Technical Proposal: Convert the numeric post scores into categorical labels, such as “Low,” “Medium,” and “High,” based on predefined score thresholds. Create a classification model using features like created time, word counts, number of comments, etc. Evaluate the model’s performance using metrics like accuracy and confusion matrices to determine the effectiveness of score categorization. Predict the level of score for each new post.
sub.count()
110247
# Select columns
df = sub.select(['num_comments', 'num_crossposts', 'over_18', 'score', 'stickied', 'title_wordCount', 'selftext_wordCount']) #"promoted", "pinned", "is_video", "gilded", "is_self"df.show()[Stage 8:> (0 + 1) / 1]
+------------+--------------+-------+-----+--------+---------------+------------------+
|num_comments|num_crossposts|over_18|score|stickied|title_wordCount|selftext_wordCount|
+------------+--------------+-------+-----+--------+---------------+------------------+
| 12| 0| false| 0| false| 4| 64|
| 7| 0| false| 0| false| 15| 42|
| 9| 0| false| 0| false| 10| 18|
| 91| 1| false| 153| false| 11| 22|
| 4| 0| false| 1| false| 5| 78|
| 10| 0| false| 0| false| 12| 62|
| 5| 0| false| 0| false| 14| 30|
| 9| 0| false| 7| false| 5| 15|
| 8| 0| false| 0| false| 7| 42|
| 4| 0| false| 1| false| 4| 28|
| 11| 0| false| 1| false| 7| 27|
| 3| 0| false| 2| false| 17| 130|
| 4| 0| false| 0| false| 7| 10|
| 139| 0| false| 17| false| 11| 132|
| 1| 0| false| 0| false| 6| 83|
| 41| 2| false| 165| false| 9| 5549|
| 8| 2| false| 102| false| 9| 5441|
| 9| 0| false| 0| false| 4| 234|
| 3| 0| false| 7| false| 5| 92|
| 24| 0| false| 6| false| 15| 151|
+------------+--------------+-------+-----+--------+---------------+------------------+
only showing top 20 rows
df.printSchema()root
|-- num_comments: long (nullable = true)
|-- num_crossposts: long (nullable = true)
|-- over_18: boolean (nullable = true)
|-- score: long (nullable = true)
|-- stickied: boolean (nullable = true)
|-- title_wordCount: integer (nullable = true)
|-- selftext_wordCount: integer (nullable = true)
df.count()
110247
The score in a submission is an important indicator of its popularity and the intensity of discussion. Understanding which content features can affect or are relevant to the score allows us to predict the potential score of a post based on its other content features, contributing to the achievement of our project goal. This capability aids in better maintaining community dynamics and engagement. Moreover, it helps stakeholders understand which content features are more highly discussed and welcomed. Providing insights enables them to make informed decisions when marketing their anime or avoiding inefficient submissions.
limited_df = df.limit(500).toPandas()
fig = px.scatter(limited_df, x='score', title="Score Distribution")
fig.update_layout(title_text="Score Distribution", title_x=0.5, paper_bgcolor='white')
# fig.savefig(f"../../img/ml_topic9_score_scatter.png")
fig.show()StatementMeta(bdbc6c25-e1fb-4fd1-9ce0-b2c60e73c096, 50, 26, Finished, Available)
As for our target variable score, due to the non-linearity observed in the score distribution histogram above, a model trained without addressing this non-linearity may lack precision during prediction. Consequently, we apply a logarithmic function to linearize our target variable score and restrict predictions to be positive.
In line with our previous analysis of the distribution of selftext word counts in the submissions dataset during the EDA section, it is notable that selftext provides additional context to the posts’ titles, enabling clearer expression of their thoughts or issues. Unlike titles, selftext typically has a higher word count, and the variability in word counts is substantial. In order to utilize the full potential of the selftext word count feature, we perform data transformation on selftext’s word count feature. Specifically, we apply the Bucketizer approach to map the column of continuous features, in this case, selftext’s word counts, into discrete feature buckets. Examining the word count distribution of selftext in the following plot, we observe a varied but relatively evenly distributed word count range. Based on this distribution, we choose the following buckets: [1,30], [31,60], [61,100], [100,∞], enabling effective bucketization of the selftext word count feature.
# df selftext_wordCount
selftext_length = df \
.select("selftext_wordCount") \
.withColumn("text_length",F.when(sub.selftext_wordCount<=30,"<=030") \
.when(sub.selftext_wordCount.between(31,60),"<=060") \
.when(sub.selftext_wordCount.between(61,100),"<=100") \
.otherwise(">100"))
length_selftext = selftext_length.groupBy("text_length").count().sort(F.asc("text_length")).toPandas()
plt.rcParams['figure.dpi'] = 360
plt.figure(figsize=(12, 8))
plt.bar(length_selftext['text_length'], length_selftext['count'], color='#d13a47') # Updated color
plt.xlabel('Length')
plt.ylabel('Counts')
plt.title("Selftext Length Distribution")
plt.show()
# Create pipline to transform data
from pyspark.ml import Pipeline, Model
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler, StandardScaler, Bucketizer
# target value lof transform
df = df.withColumn("log_score", log(col("score")+1))
# Bucketizer: [x,y) except the last bucket, which also includes y
bucketizer = Bucketizer(splits=[0, 31, 61, 101, float("inf")], inputCol="selftext_wordCount", outputCol="bucked_selftext_wordCount")
# oneHotEncoder = OneHotEncoder(inputCol="bucked_selftext_wordCount", outputCol="selftext_wordCount_vec")
assembler = VectorAssembler(
inputCols=['num_comments', 'num_crossposts', 'over_18', 'stickied', 'title_wordCount', 'bucked_selftext_wordCount'],
outputCol= "features")In addition, we performed the VectorAssembler, a data transformation step on all features, including the bucketed selftext_word count we transformed in previous steps. This step merges all the selected features into a single feature vector for further ML model training. We also apply the VectorIndex transformation step to identify categorical features from the raw features generated from previous VectorAssembler steps. This step indexes them and creates a new final “features” column for modeling.
To avoid rerunning large parts of the code multiple times on the dataset for comparing hyperparameters, we apply a pipeline to assemble all data transformation steps.
# scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
# withStd=True, withMean=False)
transform_Pipeline = Pipeline(
stages=[
bucketizer,
assembler
])pipelineTransform = transform_Pipeline.fit(df)
df_trans = pipelineTransform.transform(df)
df_trans.printSchema()
# final_data = data.select("features", "medv")root
|-- num_comments: long (nullable = true)
|-- num_crossposts: long (nullable = true)
|-- over_18: boolean (nullable = true)
|-- score: long (nullable = true)
|-- stickied: boolean (nullable = true)
|-- title_wordCount: integer (nullable = true)
|-- selftext_wordCount: integer (nullable = true)
|-- log_score: double (nullable = true)
|-- bucked_selftext_wordCount: double (nullable = true)
|-- features: vector (nullable = true)
df_trans.select("features").show(1,truncate=False)
+--------------------------+
|features |
+--------------------------+
|[12.0,0.0,0.0,0.0,4.0,2.0]|
+--------------------------+
only showing top 1 row
df_miss = df_trans.select([count(when(((col(c).isNull())), c)).alias(c) for c in df_trans.columns])
df_miss.show()[Stage 5:=============================> (2 + 2) / 4]
+------------+--------------+-------+-----+--------+---------------+------------------+---------+-------------------------+--------+
|num_comments|num_crossposts|over_18|score|stickied|title_wordCount|selftext_wordCount|log_score|bucked_selftext_wordCount|features|
+------------+--------------+-------+-----+--------+---------------+------------------+---------+-------------------------+--------+
| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
+------------+--------------+-------+-----+--------+---------------+------------------+---------+-------------------------+--------+
df_trans.select("score","log_score","selftext_wordCount","bucked_selftext_wordCount").show(5)+-----+------------------+------------------+-------------------------+
|score| log_score|selftext_wordCount|bucked_selftext_wordCount|
+-----+------------------+------------------+-------------------------+
| 0| 0.0| 64| 2.0|
| 0| 0.0| 42| 1.0|
| 0| 0.0| 18| 0.0|
| 153|5.0369526024136295| 22| 0.0|
| 1|0.6931471805599453| 78| 2.0|
+-----+------------------+------------------+-------------------------+
only showing top 5 rows
df_trans.select("features").show(3)+--------------------+
| features|
+--------------------+
|[12.0,0.0,0.0,0.0...|
|[7.0,0.0,0.0,0.0,...|
|(6,[0,4],[9.0,10.0])|
+--------------------+
only showing top 3 rows
train_data, test_data, val_data = df_trans.randomSplit([0.7, 0.2, 0.1], seed=32)train_data.cache()
val_data.cache()
test_data.cache()DataFrame[num_comments: bigint, num_crossposts: bigint, over_18: boolean, score: bigint, stickied: boolean, title_wordCount: int, selftext_wordCount: int, log_score: double, bucked_selftext_wordCount: double, features: vector]
print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
print("Number of validation records : " + str(val_data.count()))
Number of training records: 77071
Number of testing records : 22091
[Stage 18:===========================================> (3 + 1) / 4]
Number of validation records : 11085
Since our goal is to predict the score based on other features in Reddit submissions, we have chosen to apply Gradient Boosted Regression Trees (GBRT) with hyperparameter tuning to build and fine-tune the model. We selected this model because it is based on the idea of an ensemble method derived from a decision tree. Unlike linear models, it maps non-linear relationships quite well, aligning with the characteristics of our target variable score.
The data has been split into a training set (70%), testing set (20%), and validation set (10%). We use multiple evaluation metrics on the model and hyperparameter options to compare their performance in prediction. The metrics include Mean Squared Error (MSE), Mean Absolute Error (MAE), Root Mean Squared Error (RMSE), and R-squared (\(R^2\)).
# Gradient-boosted trees (GBTs)
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
# Train a GBT model.
gbtr1 = GBTRegressor(featuresCol='features', labelCol='log_score', maxIter=10, maxDepth=10)
gbtr1 = gbtr1.fit(train_data)
val_pred1 = gbtr1.transform(val_data)WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.util.SizeEstimator$ (file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/spark-core_2.12-3.3.0.jar) to field java.nio.charset.Charset.name
WARNING: Please consider reporting this to the maintainers of org.apache.spark.util.SizeEstimator$
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
# Select example rows to display
# val_pred1.select("prediction", "score", "features").show(5)
val_pred1.select("prediction", "log_score", "features").show(5)[Stage 214:> (0 + 1) / 1]
23/11/28 20:22:02 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/28 20:22:02 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
+------------------+-----+-------------------+
| prediction|score| features|
+------------------+-----+-------------------+
|0.6568067401986687| 0| (6,[4],[4.0])|
|0.6568067401986687| 0| (6,[4],[4.0])|
|0.5089705808854492| 0|(6,[4,5],[4.0,1.0])|
|0.5089705808854492| 0|(6,[4,5],[4.0,1.0])|
|0.9764843125821158| 0| (6,[4],[5.0])|
+------------------+-----+-------------------+
only showing top 5 rows
mse = RegressionEvaluator(labelCol="log_score", predictionCol="prediction", metricName="mse")
rmse = RegressionEvaluator(labelCol="log_score", predictionCol="prediction", metricName="rmse")
mae = RegressionEvaluator(labelCol="log_score", predictionCol="prediction", metricName="mae")
r2 = RegressionEvaluator(labelCol="log_score", predictionCol="prediction", metricName="r2")val_mse1 = mse.evaluate(val_pred1)
val_rmse1 = rmse.evaluate(val_pred1)
val_mae1 = mae.evaluate(val_pred1)
val_r21 = r2.evaluate(val_pred1)
print("Mean Squared Error (MSE) on val data = %g" % val_mse1)
print("Mean Absolute Error (MAE) on val data = %g" % val_mae1)
print("Root Mean Squared Error (RMSE) on val data = %g" % val_rmse1)
print("R squared on val data = %g" % val_r21)
GBT_vals = {
'Model': ['GBT 1'],
'MSE': [val_mse1],
'MAE': [val_mae1],
'RMSE': [val_rmse1],
'R^2': [val_r21]
}
Mean Squared Error (MSE) on val data = 1.44883
Mean Absolute Error (MAE) on val data = 0.92386
Root Mean Squared Error (RMSE) on val data = 1.20367
R squared on val data = 0.533944
val_pred_plot1 = val_pred1.limit(1000)
x_v1 = list(range(0, val_pred_plot1.count()))
yv_pred1=val_pred_plot1.select("prediction").collect()
yv1=val_pred_plot1.select("log_score").collect()plt.rcParams['figure.dpi'] = 360
plt.figure(figsize=(12, 6))
plt.plot(x_v1, yv1, label="original", color="#42a1b9")
plt.plot(x_v1, yv_pred1, label="predicted", color = "#f7c200")
plt.title("Submission scores vs. Predicted validation data")
plt.xlabel('X-axis')
plt.ylabel('Submission Scores')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.savefig("../../img/GBT_val_1.png")
plt.show()Second GBT model with different hyperparameter set.
gbtr2 = GBTRegressor(featuresCol='features', labelCol='log_score', maxIter=30, maxDepth=5, maxBins=64, stepSize=0.01) #
gbtr2 = gbtr2.fit(train_data)
val_pred2 = gbtr2.transform(val_data)# Select example rows to display
val_pred2.select("prediction", "log_score", "features").show(5)+------------------+---------+-------------------+
| prediction|log_score| features|
+------------------+---------+-------------------+
|0.7318397453989006| 0.0| (6,[4],[4.0])|
|0.7318397453989006| 0.0| (6,[4],[4.0])|
|0.7397837786561348| 0.0|(6,[4,5],[4.0,1.0])|
|0.7397837786561348| 0.0|(6,[4,5],[4.0,1.0])|
|0.8275420802171086| 0.0| (6,[4],[5.0])|
+------------------+---------+-------------------+
only showing top 5 rows
val_mse2 = mse.evaluate(val_pred2)
val_rmse2 = rmse.evaluate(val_pred2)
val_mae2 = mae.evaluate(val_pred2)
val_r22 = r2.evaluate(val_pred2)
print("Mean Squared Error (MSE) on val data = %g" % val_mse2)
print("Mean Absolute Error (MAE) on val data = %g" % val_mae2)
print("Root Mean Squared Error (RMSE) on val data = %g" % val_rmse2)
print("R squared on val data = %g" % val_r22)
GBT_vals['Model'].append('GBT 2')
GBT_vals['MSE'].append(val_mse2)
GBT_vals['MAE'].append(val_mae2)
GBT_vals['RMSE'].append(val_rmse2)
GBT_vals['R^2'].append(val_r22)Mean Squared Error (MSE) on val data = 1.44056
Mean Absolute Error (MAE) on val data = 0.927095
Root Mean Squared Error (RMSE) on val data = 1.20023
R squared on val data = 0.536604
val_pred_plot2 = val_pred2.limit(1000)
x_v2 = range(0, val_pred_plot2.count())
yv_pred2=val_pred_plot2.select("prediction").collect()
yv2=val_pred_plot2.select("log_score").collect()plt.rcParams['figure.dpi'] = 360
plt.figure(figsize=(12, 6))
plt.plot(x_v2, yv2, label="original", color="#42a1b9")
plt.plot(x_v2, yv_pred2, label="predicted", color = "#f7c200")
plt.title("Submission scores vs. Predicted validation data")
plt.xlabel('X-axis')
plt.ylabel('Submission Scores')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.savefig("../../img/GBT_val_2.png")
plt.show()GBT_vals
GBT_vals_compare=pd.DataFrame.from_dict(GBT_vals)
GBT_vals_compare| Model | MSE | MAE | RMSE | R^2 | |
|---|---|---|---|---|---|
| 0 | GBT 1 | 1.448826 | 0.923860 | 1.203672 | 0.533944 |
| 1 | GBT 2 | 1.440557 | 0.927095 | 1.200232 | 0.536604 |
The gbtr2 model tuned some parameter based on gbtr1 model, with maxIter changed from 10 to 30, and maxDepth decreased from 10 to 5. Also set the maxBins`` to 64 withstepSize`` of 0.01.
From the evaluation metrics, we can observe that both models (gbtr1 and gbtr2) have very similar performance metrics, with slight differences in \(R^2\) and MAE. The gbtr1 model has slight lower MAE, while gbtr2 has higher \(R^2\).
Overall, these two GBTR models seem to capture around 53-54% of the variance in the log_score, and the errors (MSE, MAE, RMSE) are at a moderate level.
test_data.write.mode("overwrite").parquet(f"{workspace_wasbs_base_url}/ml_q9_test_data.parquet")# Make predictions.
predictions = gbtr2.transform(test_data)
# Select example rows to display.
predictions.select("prediction", "score", "features").show(5)[Stage 538:> (0 + 1) / 1]
+------------------+-----+-------------------+
| prediction|score| features|
+------------------+-----+-------------------+
|0.7318397453989006| 0| (6,[4],[4.0])|
|0.7318397453989006| 0| (6,[4],[4.0])|
|0.7318397453989006| 0| (6,[4],[4.0])|
|0.7318397453989006| 0| (6,[4],[4.0])|
|0.7397837786561348| 0|(6,[4,5],[4.0,1.0])|
+------------------+-----+-------------------+
only showing top 5 rows
test_mse = mse.evaluate(predictions)
test_rmse = rmse.evaluate(predictions)
test_mae = mae.evaluate(predictions)
test_r2 = r2.evaluate(predictions)
print("Mean Squared Error (MSE) on test data = %g" % test_mse)
print("Mean Absolute Error (MAE) on test data = %g" % test_rmse)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_mae)
print("R squared on test data = %g" % test_r2)
Mean Squared Error (MSE) on test data = 1.41439
Mean Absolute Error (MAE) on test data = 1.18928
Root Mean Squared Error (RMSE) on test data = 0.927723
R squared on test data = 0.527066
importances = gbtr2.featureImportances.values
indices = gbtr2.featureImportances.indices
feature_names = ['num_comments', 'num_crossposts', 'over_18', 'stickied', 'title_wordCount', 'bucked_selftext_wordCount']
feature_importances = {}
for idx, index in enumerate(indices):
feature_importances[feature_names[index]] = importances[idx]
df_feature_importances = pd.DataFrame.from_dict(feature_importances, orient='index', columns=['importance'])
print(df_feature_importances) importance
num_comments 0.601587
num_crossposts 0.056616
over_18 0.001983
title_wordCount 0.126110
bucked_selftext_wordCount 0.213705
The identified feature importance reflects their correlation with our target variable score and their respective roles in optimizing the model’s ability to predict Reddit submission scores.
num_comments has the highest importance, approximately 60.16%, indicating that the number of comments on a submission strongly influences its score, reflecting higher engagement—a significant indicator of popularity and community interest. Additionally, selftext_wordCount with 21.37% importance to score has a substantial impact on the score prediction. This suggests that the depth or length of the content within the submission plays a crucial role. In-depth discussions or detailed content may contribute positively to the overall score.
The identified features align with the project goal of predicting Reddit submission scores to enhance community dynamics and engagement. Content creators and stakeholders can focus on encouraging discussions (num_comments) and crafting comprehensive selftext content (bucked_selftext_wordCount) to potentially improve submission scores. Moreover, whether the content is over 18 or not has limited impact on the overall score prediction.
pred_plot = predictions.limit(1000)
xt = range(0, pred_plot.count())
ypred=pred_plot.select("prediction").collect()
yt=pred_plot.select("log_score").collect()plt.rcParams['figure.dpi'] = 360
plt.figure(figsize=(12, 6))
plt.plot(xt, yt, label="original", color="#42a1b9")
plt.plot(xt, ypred, label="predicted", color = "#f7c200")
plt.title("Submission scores vs. Predicted teesting data")
plt.xlabel('X-axis')
plt.ylabel('Submission Scores')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.savefig("../../img/ML_GBT_test.png")
plt.show()The gradient boosted regression tree models yield moderately accurate predictions of Reddit post scores based on content features. Both gbtr1 and gbtr2 capture around 53-54% of the variance and demonstrate reasonable error rates. These models provide a valuable starting point for predicting community engagement, surpassing baseline regression. Feature importance analysis reveals the key content areas that drive user discussions.
Given the unique dynamics of social platforms, further refinements tailored to Reddit data could enhance score predictions:
Incorporating temporal features, such as time-of-day or day-of-week posts, as social media engagement varies with cycles.
Employing embeddings to represent text semantics beyond simple counts. Deep encodings can improve generalization.
Implementing NLP model results as features to expand exploration.
While the current models offer a solid foundation, Reddit-specific customization leveraging domain knowledge could enhance score predictions. The implementation of these advanced extensions has the potential to unlock additional performance gains.
# Save the model
gbtr2.save("project_ml_topic9_gbtr_model")