Setup¶
In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.4.0
# install spark-nlp
%pip install spark-nlp==5.1.3
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.10.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.10.0 ## Package Plan ## environment location: /opt/conda added / updated specs: - openjdk The following packages will be downloaded: package | build ---------------------------|----------------- ca-certificates-2023.08.22 | h06a4308_0 123 KB certifi-2023.7.22 | py310h06a4308_0 153 KB openjdk-11.0.13 | h87a67e3_0 341.0 MB ------------------------------------------------------------ Total: 341.3 MB The following NEW packages will be INSTALLED: openjdk pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 The following packages will be UPDATED: ca-certificates conda-forge::ca-certificates-2023.7.2~ --> pkgs/main::ca-certificates-2023.08.22-h06a4308_0 The following packages will be SUPERSEDED by a higher-priority channel: certifi conda-forge/noarch::certifi-2023.7.22~ --> pkgs/main/linux-64::certifi-2023.7.22-py310h06a4308_0 Downloading and Extracting Packages openjdk-11.0.13 | 341.0 MB | | 0% certifi-2023.7.22 | 153 KB | | 0% ca-certificates-2023 | 123 KB | | 0% ca-certificates-2023 | 123 KB | ##################################### | 100% Preparing transaction: done Verifying transaction: done Executing transaction: done Note: you may need to restart the kernel to use updated packages. Collecting pyspark==3.4.0 Using cached pyspark-3.4.0-py2.py3-none-any.whl Collecting py4j==0.10.9.7 (from pyspark==3.4.0) Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB) Installing collected packages: py4j, pyspark Successfully installed py4j-0.10.9.7 pyspark-3.4.0 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages. Collecting spark-nlp==5.1.3 Obtaining dependency information for spark-nlp==5.1.3 from https://files.pythonhosted.org/packages/cd/7d/bc0eca4c9ec4c9c1d9b28c42c2f07942af70980a7d912d0aceebf8db32dd/spark_nlp-5.1.3-py2.py3-none-any.whl.metadata Using cached spark_nlp-5.1.3-py2.py3-none-any.whl.metadata (53 kB) Using cached spark_nlp-5.1.3-py2.py3-none-any.whl (537 kB) Installing collected packages: spark-nlp Successfully installed spark-nlp-5.1.3 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
Out[ ]:
Set up Spark Session¶
In [ ]:
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("PySparkApp")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider",
)
.getOrCreate()
)
print(spark.version)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-e01f492a-282f-4313-8151-4068b0e91e27;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.2.2 in central found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central :: resolution report :: resolve 320ms :: artifacts dl 29ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default] org.apache.hadoop#hadoop-aws;3.2.2 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-e01f492a-282f-4313-8151-4068b0e91e27 confs: [default] 0 artifacts copied, 2 already retrieved (0kB/20ms) 23/11/20 11:24:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.4.0
In [ ]:
# S3 directory path
s3_directory1 = f"s3a://sagemaker-us-east-1-395393721134/project/sentiment/comments/"
s3_directory2 = f"s3a://sagemaker-us-east-1-433974840707/project/sentiment/comments/"
s3_directory3 = f"s3a://sagemaker-us-east-1-711387073580/project/sentiment/comments/"
s3_directory4 = f"s3a://sagemaker-us-east-1-224518912016/project/sentiment/comments/"
# Read all the Parquet files in the directory into a DataFrame
df_comments1 = spark.read.parquet(s3_directory1)
df_comments2 = spark.read.parquet(s3_directory2)
df_comments3 = spark.read.parquet(s3_directory3)
df_comments4 = spark.read.parquet(s3_directory4)
df_comments = df_comments1.union(df_comments2).union(df_comments3).union(df_comments4)
# check counts (ensuring all needed subreddits exist)
df_comments.groupBy('subreddit').count().show()
# get cols
print(df_comments.columns)
df_comments = df_comments.select('subreddit', 'created_utc', 'score', 'controversiality', 'distinguished', 'gilded', 'sentiment_concat_comments')
df_comments.show(5, truncate=False)
23/11/20 11:47:49 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
+------------+-------+ | subreddit| count| +------------+-------+ | socialism| 370695| | Liberal| 95706| | Economics|1425088| | Libertarian|2691208| | centrist| 917568| |changemyview|3898931| |Ask_Politics| 60098| | finance| 136308| |Conservative|5204573| +------------+-------+ ['author', 'subreddit', 'created_utc', 'score', 'controversiality', 'distinguished', 'gilded', 'concat_comments', 'document_concat_comments', 'sentence_embeddings_concat_comments', 'sentiment_concat_comments'] +---------+-----------+-----+----------------+-------------+------+----------------------------------------------------------------------------------------------------+ |subreddit|created_utc|score|controversiality|distinguished|gilded|sentiment_concat_comments | +---------+-----------+-----+----------------+-------------+------+----------------------------------------------------------------------------------------------------+ |Economics|2023-01-20 |1 |0 |null |0 |[{category, 0, 5, negative, {sentence -> 0, positive -> 0.0, negative -> 1.0}, []}] | |Economics|2023-03-06 |9 |0 |null |0 |[{category, 0, 56, negative, {sentence -> 0, positive -> 0.21862681, negative -> 0.78137314}, []}] | |Economics|2021-05-02 |6 |0 |null |0 |[{category, 0, 1081, positive, {sentence -> 0, positive -> 0.9763095, negative -> 0.023690531}, []}]| |Economics|2022-08-24 |1 |0 |null |0 |[{category, 0, 5, negative, {sentence -> 0, positive -> 0.0, negative -> 1.0}, []}] | |socialism|2022-05-17 |1 |0 |null |0 |[{category, 0, 5, negative, {sentence -> 0, positive -> 0.0, negative -> 1.0}, []}] | +---------+-----------+-----+----------------+-------------+------+----------------------------------------------------------------------------------------------------+ only showing top 5 rows
Submissions¶
In [ ]:
# S3 directory path
s3_directory1 = f"s3a://sagemaker-us-east-1-395393721134/project/sentiment/submissions/"
s3_directory2 = f"s3a://sagemaker-us-east-1-433974840707/project/sentiment/submissions/"
s3_directory3 = f"s3a://sagemaker-us-east-1-711387073580/project/sentiment/submissions/"
s3_directory4 = f"s3a://sagemaker-us-east-1-224518912016/project/sentiment/submissions/"
# Read all the Parquet files in the directory into a DataFrame
df_submissions1 = spark.read.parquet(s3_directory1)
df_submissions2 = spark.read.parquet(s3_directory2)
df_submissions3 = spark.read.parquet(s3_directory3)
df_submissions4 = spark.read.parquet(s3_directory4)
df_submissions = df_submissions1.union(df_submissions2).union(df_submissions3).union(df_submissions4)
# check counts (ensuring all needed subreddits exist)
df_submissions.groupBy('subreddit').count().show()
# get cols
df_submissions = df_submissions.select('subreddit', 'created_utc', 'score', 'num_comments', 'distinguished', 'gilded', 'sentiment_concat_submissions')
df_submissions.show(5, truncate=False)
+------------+------+ | subreddit| count| +------------+------+ | socialism| 40053| | Liberal| 11083| | Economics| 40475| | Libertarian| 51136| | centrist| 13585| |changemyview| 64632| |Ask_Politics| 5902| | finance| 28817| |Conservative|343660| +------------+------+ +---------+-----------+-----+------------+-------------+------+-------------------------------------------------------------------------------------------------+ |subreddit|created_utc|score|num_comments|distinguished|gilded|sentiment_concat_submissions | +---------+-----------+-----+------------+-------------+------+-------------------------------------------------------------------------------------------------+ |socialism|2021-07-29 |6 |3 |null |0 |[{category, 0, 874, negative, {sentence -> 0, positive -> 0.0, negative -> 1.0}, []}] | |socialism|2022-04-15 |11 |0 |null |0 |[{category, 0, 61, positive, {sentence -> 0, positive -> 0.9320256, negative -> 0.06797439}, []}]| |Economics|2021-02-03 |1 |1 |null |0 |[{category, 0, 6, neutral, {sentence -> 0, positive -> 0.54308814, negative -> 0.45691183}, []}] | |Liberal |2021-06-22 |17 |2 |null |0 |[{category, 0, 68, positive, {sentence -> 0, positive -> 1.0, negative -> 5.9324034E-12}, []}] | |socialism|2021-03-25 |1761 |43 |null |0 |[{category, 0, 135, negative, {sentence -> 0, positive -> 0.0, negative -> 1.0}, []}] | +---------+-----------+-----+------------+-------------+------+-------------------------------------------------------------------------------------------------+ only showing top 5 rows
DF Prep¶
In [ ]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def extract_sentiment(sentiment_col):
sentiment = ""
if isinstance(sentiment_col, list) and len(sentiment_col) > 0: # checking if raw sentiment col has appropriately formatted data
sentiment_data = sentiment_col[0]
if 'result' in sentiment_data:
sentiment = sentiment_data['result']
return sentiment
extract_sentiment_udf = udf(extract_sentiment, StringType())
# comments
comms_df = df_comments.withColumn('sentiment', extract_sentiment_udf('sentiment_concat_comments'))
comms_df = comms_df.select('subreddit', 'created_utc', 'score', 'controversiality', 'distinguished', 'gilded', 'sentiment')
comms_df.show(5, truncate=False)
# submissions
subs_df = df_submissions.withColumn('sentiment', extract_sentiment_udf('sentiment_concat_submissions'))
subs_df = subs_df.select('subreddit', 'created_utc', 'score', 'num_comments', 'distinguished', 'gilded', 'sentiment')
subs_df.show(5, truncate=False)
+---------+-----------+-----+----------------+-------------+------+---------+ |subreddit|created_utc|score|controversiality|distinguished|gilded|sentiment| +---------+-----------+-----+----------------+-------------+------+---------+ |Economics|2023-01-20 |1 |0 |null |0 |negative | |Economics|2023-03-06 |9 |0 |null |0 |negative | |Economics|2021-05-02 |6 |0 |null |0 |positive | |Economics|2022-08-24 |1 |0 |null |0 |negative | |socialism|2022-05-17 |1 |0 |null |0 |negative | +---------+-----------+-----+----------------+-------------+------+---------+ only showing top 5 rows +---------+-----------+-----+------------+-------------+------+---------+ |subreddit|created_utc|score|num_comments|distinguished|gilded|sentiment| +---------+-----------+-----+------------+-------------+------+---------+ |socialism|2021-07-29 |6 |3 |null |0 |negative | |socialism|2022-04-15 |11 |0 |null |0 |positive | |Economics|2021-02-03 |1 |1 |null |0 |neutral | |Liberal |2021-06-22 |17 |2 |null |0 |positive | |socialism|2021-03-25 |1761 |43 |null |0 |negative | +---------+-----------+-----+------------+-------------+------+---------+ only showing top 5 rows
Model Summary Statistics¶
In [ ]:
from pyspark.sql.functions import col, round
comms_stats = comms_df.groupBy('sentiment').count().withColumnRenamed('count', 'total_comments')
subs_stats = subs_df.groupBy('sentiment').count().withColumnRenamed('count', 'total_submissions')
comms_total = comms_stats.agg({'total_comments': 'sum'}).collect()[0]['sum(total_comments)']
comms_stats = comms_stats.withColumn(
'percentage',
round((col('total_comments') * 100 / comms_total), 2).cast('string')
)
subs_total = subs_stats.agg({'total_submissions': 'sum'}).collect()[0]['sum(total_submissions)']
subs_stats = subs_stats.withColumn(
'percentage',
round((col('total_submissions') * 100 / subs_total), 2).cast('string')
)
comms_stats.show(truncate=False)
subs_stats.show(truncate=False)
+---------+--------------+----------+ |sentiment|total_comments|percentage| +---------+--------------+----------+ |positive |8561324 |57.85 | |neutral |594511 |4.02 | |negative |5644340 |38.14 | +---------+--------------+----------+
[Stage 33:==================================================> (18 + 2) / 20]
+---------+-----------------+----------+ |sentiment|total_submissions|percentage| +---------+-----------------+----------+ |positive |325300 |54.28 | |neutral |22209 |3.71 | |negative |251834 |42.02 | +---------+-----------------+----------+
In [ ]:
import plotly.graph_objects as go
import plotly.io as pio
comms_stats_df = comms_stats.toPandas()
comms_stats_df = comms_stats_df.rename(columns={'sentiment': 'Sentiment', 'total_comments': 'Total Comments', 'percentage': 'Percentage'})
comms_stats_df['Sentiment'] = comms_stats_df['Sentiment'].apply(lambda x: x.title()) # convert to title case
comms_stats_df['Total Comments'] = comms_stats_df['Total Comments'].apply(lambda x: "{:,}".format(x)) # add commas every 3 digits
fig = go.Figure(data=go.Table(
header=dict(
values=list(comms_stats_df.columns),
fill_color='lightgreen',
align='center',
font=dict(size=14, color='black')
),
cells=dict(
values=[comms_stats_df[col] for col in comms_stats_df.columns],
align='center')
))
fig.show();
# save
pio.write_html(fig, file='../../data/plots/comms_sentiment_table.html')
In [ ]:
subs_stats_df = subs_stats.toPandas()
subs_stats_df = subs_stats_df.rename(columns={'sentiment': 'Sentiment', 'total_submissions': 'Total Submissions', 'percentage': 'Percentage'})
subs_stats_df['Sentiment'] = subs_stats_df['Sentiment'].apply(lambda x: x.title()) # convert to title case
subs_stats_df['Total Submissions'] = subs_stats_df['Total Submissions'].apply(lambda x: "{:,}".format(x)) # add commas every 3 digits
fig = go.Figure(data=go.Table(
header=dict(
values=list(subs_stats_df.columns),
fill_color='lightgreen',
align='center',
font=dict(size=14, color='black')
),
cells=dict(
values=[subs_stats_df[col] for col in subs_stats_df.columns],
align='center')
))
fig.show();
# save
pio.write_html(fig, file='../../data/plots/subs_sentiment_table.html')
Subreddit Summary Statistics¶
In [ ]:
from pyspark.sql.functions import col, round, when
# Group by 'subreddit' and 'sentiment', perform count operations
comms_stats_subreddit = comms_df.groupBy('subreddit', 'sentiment').count().withColumnRenamed('count', 'total_comments')
subs_stats_subreddit = subs_df.groupBy('subreddit', 'sentiment').count().withColumnRenamed('count', 'total_submissions')
# Calculate total comments and submissions per subreddit
comms_totals_per_subreddit = comms_stats_subreddit.groupBy('subreddit').agg({'total_comments': 'sum'}).withColumnRenamed('sum(total_comments)', 'subreddit_total_comments')
subs_totals_per_subreddit = subs_stats_subreddit.groupBy('subreddit').agg({'total_submissions': 'sum'}).withColumnRenamed('sum(total_submissions)', 'subreddit_total_submissions')
# Calculate percentage for comments and submissions per subreddit
comms_stats_subreddit = comms_stats_subreddit.join(comms_totals_per_subreddit, 'subreddit')
comms_stats_subreddit = comms_stats_subreddit.withColumn('percentage', round((col('total_comments') * 100 / comms_totals_per_subreddit['subreddit_total_comments']), 2).cast('string'))
subs_stats_subreddit = subs_stats_subreddit.join(subs_totals_per_subreddit, 'subreddit')
subs_stats_subreddit = subs_stats_subreddit.withColumn('percentage', round((col('total_submissions') * 100 / subs_totals_per_subreddit['subreddit_total_submissions']), 2).cast('string'))
# Define a sentiment ordering
sentiment_order = {'positive': 1, 'neutral': 2, 'negative': 3}
# Order the DataFrame
comms_stats_subreddit = comms_stats_subreddit.orderBy('subreddit',
when(col('sentiment') == 'positive', sentiment_order['positive'])
.when(col('sentiment') == 'neutral', sentiment_order['neutral'])
.when(col('sentiment') == 'negative', sentiment_order['negative'])
.otherwise(4))
subs_stats_subreddit = subs_stats_subreddit.orderBy('subreddit',
when(col('sentiment') == 'positive', sentiment_order['positive'])
.when(col('sentiment') == 'neutral', sentiment_order['neutral'])
.when(col('sentiment') == 'negative', sentiment_order['negative'])
.otherwise(4))
comms_stats_subreddit.show(truncate=False)
subs_stats_subreddit.show(truncate=False)
+------------+---------+--------------+------------------------+----------+ |subreddit |sentiment|total_comments|subreddit_total_comments|percentage| +------------+---------+--------------+------------------------+----------+ |Ask_Politics|positive |31109 |60098 |51.76 | |Ask_Politics|neutral |1540 |60098 |2.56 | |Ask_Politics|negative |27449 |60098 |45.67 | |Conservative|positive |2989597 |5204573 |57.44 | |Conservative|neutral |256119 |5204573 |4.92 | |Conservative|negative |1958857 |5204573 |37.64 | |Economics |positive |575456 |1425088 |40.38 | |Economics |neutral |34262 |1425088 |2.4 | |Economics |negative |815370 |1425088 |57.22 | |Liberal |positive |41746 |95706 |43.62 | |Liberal |neutral |3632 |95706 |3.79 | |Liberal |negative |50328 |95706 |52.59 | |Libertarian |positive |1572821 |2691208 |58.44 | |Libertarian |neutral |113612 |2691208 |4.22 | |Libertarian |negative |1004775 |2691208 |37.34 | |centrist |positive |552017 |917568 |60.16 | |centrist |neutral |37080 |917568 |4.04 | |centrist |negative |328471 |917568 |35.8 | |changemyview|positive |2518140 |3898931 |64.59 | |changemyview|neutral |129175 |3898931 |3.31 | +------------+---------+--------------+------------------------+----------+ only showing top 20 rows
+------------+---------+-----------------+---------------------------+----------+ |subreddit |sentiment|total_submissions|subreddit_total_submissions|percentage| +------------+---------+-----------------+---------------------------+----------+ |Ask_Politics|positive |2764 |5902 |46.83 | |Ask_Politics|neutral |211 |5902 |3.58 | |Ask_Politics|negative |2927 |5902 |49.59 | |Conservative|positive |176071 |343660 |51.23 | |Conservative|neutral |13395 |343660 |3.9 | |Conservative|negative |154194 |343660 |44.87 | |Economics |positive |23057 |40475 |56.97 | |Economics |neutral |1206 |40475 |2.98 | |Economics |negative |16212 |40475 |40.05 | |Liberal |positive |6026 |11083 |54.37 | |Liberal |neutral |455 |11083 |4.11 | |Liberal |negative |4602 |11083 |41.52 | |Libertarian |positive |28533 |51136 |55.8 | |Libertarian |neutral |1691 |51136 |3.31 | |Libertarian |negative |20912 |51136 |40.89 | |centrist |positive |7311 |13585 |53.82 | |centrist |neutral |479 |13585 |3.53 | |centrist |negative |5795 |13585 |42.66 | |changemyview|positive |38711 |64632 |59.89 | |changemyview|neutral |2194 |64632 |3.39 | +------------+---------+-----------------+---------------------------+----------+ only showing top 20 rows
In [ ]:
comms_stats_df = comms_stats_subreddit.toPandas()
subreddits = comms_stats_df['subreddit'].unique()
label_d = {
'Ask_Politics': 'AskPolitics',
'centrist': 'Centrist',
'changemyview': 'ChangeMyView',
'finance': 'Finance',
'socialism': 'Socialism'
}
comms_stats_df = comms_stats_df.rename(columns={'sentiment': 'Sentiment', 'total_comments': 'Comments (Per Subreddit and Sentiment)',
'subreddit_total_comments': 'Total Comments (Per Subreddit)', 'percentage': 'Percentage', 'subreddit': 'Subreddit'})
comms_stats_df['Sentiment'] = comms_stats_df['Sentiment'].apply(lambda x: x.title()) # convert to title case
comms_stats_df['Comments (Per Subreddit and Sentiment)'] = comms_stats_df['Comments (Per Subreddit and Sentiment)'].apply(lambda x: "{:,}".format(x)) # add commas every 3 digits
comms_stats_df['Total Comments (Per Subreddit)'] = comms_stats_df['Total Comments (Per Subreddit)'].apply(lambda x: "{:,}".format(x)) # add commas every 3 digits
comms_stats_df['Subreddit'] = comms_stats_df['Subreddit'].replace(label_d) # relabel subreddits
fig = go.Figure(data=go.Table(
header=dict(
values=list(comms_stats_df.columns),
fill_color='lightgreen',
align='center',
font=dict(size=14, color='black')
),
cells=dict(
values=[comms_stats_df[col] for col in comms_stats_df.columns],
align='center')
))
fig.show();
# save
pio.write_html(fig, file='../../data/plots/comms_sentiment_subreddit_table.html')
In [ ]:
subs_stats_df = subs_stats_subreddit.toPandas()
subs_stats_df = subs_stats_df.rename(columns={'sentiment': 'Sentiment', 'total_submissions': 'Submissions (Per Subreddit and Sentiment)',
'subreddit_total_submissions': 'Total Submissions (Per Subreddit)', 'percentage': 'Percentage', 'subreddit': 'Subreddit'})
subs_stats_df['Sentiment'] = subs_stats_df['Sentiment'].apply(lambda x: x.title()) # convert to title case
subs_stats_df['Submissions (Per Subreddit and Sentiment)'] = subs_stats_df['Submissions (Per Subreddit and Sentiment)'].apply(lambda x: "{:,}".format(x)) # add commas every 3 digits
subs_stats_df['Total Submissions (Per Subreddit)'] = subs_stats_df['Total Submissions (Per Subreddit)'].apply(lambda x: "{:,}".format(x)) # add commas every 3 digits
subs_stats_df['Subreddit'] = subs_stats_df['Subreddit'].replace(label_d) # relabel subreddits
fig = go.Figure(data=go.Table(
header=dict(
values=list(subs_stats_df.columns),
fill_color='lightgreen',
align='center',
font=dict(size=14, color='black')
),
cells=dict(
values=[subs_stats_df[col] for col in subs_stats_df.columns],
align='center')
))
fig.show();
# save
pio.write_html(fig, file='../../data/plots/subs_sentiment_subreddit_table.html')
Change in Sentiment Based on Economy¶
In [ ]:
from pyspark.sql.functions import concat, year, month, col, lit, when
# Calculate totals for comments per subreddit per month
comms_totals_per_subreddit_month = comms_df.groupBy('subreddit', year('created_utc'), month('created_utc')).count() \
.withColumnRenamed('count', 'total_comments') \
.withColumn('year_month', concat(col('year(created_utc)'), lit('-'), col('month(created_utc)')))
# Calculate totals for submissions per subreddit per month
subs_totals_per_subreddit_month = subs_df.groupBy('subreddit', year('created_utc'), month('created_utc')).count() \
.withColumnRenamed('count', 'total_submissions') \
.withColumn('year_month', concat(col('year(created_utc)'), lit('-'), col('month(created_utc)')))
# Group by 'subreddit', 'sentiment', and month & year extracted from 'created_utc'
comms_stats_monthly = comms_df.withColumn('year_month', concat(year('created_utc'), lit('-'), month('created_utc'))) \
.groupBy('subreddit', 'sentiment', 'year_month').count() \
.withColumnRenamed('count', 'total_comments_sentiment')
subs_stats_monthly = subs_df.withColumn('year_month', concat(year('created_utc'), lit('-'), month('created_utc'))) \
.groupBy('subreddit', 'sentiment', 'year_month').count() \
.withColumnRenamed('count', 'total_submissions_sentiment')
# Calculate percentage for comments and submissions per subreddit per month
comms_stats_monthly = comms_stats_monthly.join(comms_totals_per_subreddit_month, ['subreddit', 'year_month'])
subs_stats_monthly = subs_stats_monthly.join(subs_totals_per_subreddit_month, ['subreddit', 'year_month'])
# Define a sentiment ordering
sentiment_order = {'positive': 1, 'neutral': 2, 'negative': 3}
# Order the DataFrames
comms_stats_monthly = comms_stats_monthly.orderBy('subreddit', 'year_month',
when(col('sentiment') == 'positive', sentiment_order['positive'])
.when(col('sentiment') == 'neutral', sentiment_order['neutral'])
.when(col('sentiment') == 'negative', sentiment_order['negative'])
.otherwise(4))
subs_stats_monthly = subs_stats_monthly.orderBy('subreddit', 'year_month',
when(col('sentiment') == 'positive', sentiment_order['positive'])
.when(col('sentiment') == 'neutral', sentiment_order['neutral'])
.when(col('sentiment') == 'negative', sentiment_order['negative'])
.otherwise(4))
comms_stats_monthly.show(truncate=False)
subs_stats_monthly.show(truncate=False)
# Save as CSV
comms_stats_monthly.write.csv('../../data/csv/comms_stats_subreddit_month.csv', header=True, mode='overwrite')
subs_stats_monthly.write.csv('../../data/csv/subs_stats_subreddit_month.csv', header=True, mode='overwrite')
+------------+----------+---------+------------------------+-----------------+------------------+--------------+ |subreddit |year_month|sentiment|total_comments_sentiment|year(created_utc)|month(created_utc)|total_comments| +------------+----------+---------+------------------------+-----------------+------------------+--------------+ |Ask_Politics|2021-1 |positive |4697 |2021 |1 |8266 | |Ask_Politics|2021-1 |neutral |252 |2021 |1 |8266 | |Ask_Politics|2021-1 |negative |3317 |2021 |1 |8266 | |Ask_Politics|2021-10 |positive |644 |2021 |10 |1146 | |Ask_Politics|2021-10 |neutral |19 |2021 |10 |1146 | |Ask_Politics|2021-10 |negative |483 |2021 |10 |1146 | |Ask_Politics|2021-11 |positive |407 |2021 |11 |677 | |Ask_Politics|2021-11 |neutral |11 |2021 |11 |677 | |Ask_Politics|2021-11 |negative |259 |2021 |11 |677 | |Ask_Politics|2021-12 |positive |692 |2021 |12 |1408 | |Ask_Politics|2021-12 |neutral |31 |2021 |12 |1408 | |Ask_Politics|2021-12 |negative |685 |2021 |12 |1408 | |Ask_Politics|2021-2 |positive |3307 |2021 |2 |6402 | |Ask_Politics|2021-2 |neutral |188 |2021 |2 |6402 | |Ask_Politics|2021-2 |negative |2907 |2021 |2 |6402 | |Ask_Politics|2021-3 |positive |3156 |2021 |3 |6200 | |Ask_Politics|2021-3 |neutral |153 |2021 |3 |6200 | |Ask_Politics|2021-3 |negative |2891 |2021 |3 |6200 | |Ask_Politics|2021-4 |positive |3171 |2021 |4 |5868 | |Ask_Politics|2021-4 |neutral |181 |2021 |4 |5868 | +------------+----------+---------+------------------------+-----------------+------------------+--------------+ only showing top 20 rows
+------------+----------+---------+---------------------------+-----------------+------------------+-----------------+ |subreddit |year_month|sentiment|total_submissions_sentiment|year(created_utc)|month(created_utc)|total_submissions| +------------+----------+---------+---------------------------+-----------------+------------------+-----------------+ |Ask_Politics|2021-1 |positive |504 |2021 |1 |1027 | |Ask_Politics|2021-1 |neutral |29 |2021 |1 |1027 | |Ask_Politics|2021-1 |negative |494 |2021 |1 |1027 | |Ask_Politics|2021-10 |positive |83 |2021 |10 |132 | |Ask_Politics|2021-10 |neutral |3 |2021 |10 |132 | |Ask_Politics|2021-10 |negative |46 |2021 |10 |132 | |Ask_Politics|2021-11 |positive |58 |2021 |11 |108 | |Ask_Politics|2021-11 |neutral |7 |2021 |11 |108 | |Ask_Politics|2021-11 |negative |43 |2021 |11 |108 | |Ask_Politics|2021-12 |positive |60 |2021 |12 |119 | |Ask_Politics|2021-12 |neutral |7 |2021 |12 |119 | |Ask_Politics|2021-12 |negative |52 |2021 |12 |119 | |Ask_Politics|2021-2 |positive |207 |2021 |2 |464 | |Ask_Politics|2021-2 |neutral |15 |2021 |2 |464 | |Ask_Politics|2021-2 |negative |242 |2021 |2 |464 | |Ask_Politics|2021-3 |positive |187 |2021 |3 |398 | |Ask_Politics|2021-3 |neutral |14 |2021 |3 |398 | |Ask_Politics|2021-3 |negative |197 |2021 |3 |398 | |Ask_Politics|2021-4 |positive |184 |2021 |4 |389 | |Ask_Politics|2021-4 |neutral |18 |2021 |4 |389 | +------------+----------+---------+---------------------------+-----------------+------------------+-----------------+ only showing top 20 rows
In [ ]:
import pandas as pd
comms_stats_subreddit_date = spark.read.csv("../../data/csv/comms_stats_subreddit_month.csv", header=True)
subs_stats_subreddit_date = spark.read.csv("../../data/csv/subs_stats_subreddit_month.csv", header=True)
comms_stats_subreddit_date = comms_stats_subreddit_date.select('subreddit', 'year_month', 'sentiment', 'total_comments_sentiment')
subs_stats_subreddit_date = subs_stats_subreddit_date.select('subreddit', 'year_month', 'sentiment', 'total_submissions_sentiment')
comms_df = comms_stats_subreddit_date.toPandas()
subs_df = subs_stats_subreddit_date.toPandas()
comms_df['year_month'] = pd.to_datetime(comms_df['year_month'], format = "%Y/%m")
subs_df['year_month'] = pd.to_datetime(subs_df['year_month'], format = "%Y/%m")
real_gdp = pd.read_csv("../../data/csv/real_gdp.csv", index_col = 0)
real_gdp['date'] = pd.to_datetime(real_gdp['date'], format = "%Y/%m")
In [ ]:
subs_gdp_df = pd.merge(real_gdp, subs_df, left_on='date', right_on='year_month', how='outer')
subs_gdp_df
Out[ ]:
value | date | value_shifted | subreddit | year_month | sentiment | total_submissions_sentiment | |
---|---|---|---|---|---|---|---|
0 | 19055.655 | 2021-01-01 | NaN | Ask_Politics | 2021-01-01 | positive | 504 |
1 | 19055.655 | 2021-01-01 | NaN | Ask_Politics | 2021-01-01 | neutral | 29 |
2 | 19055.655 | 2021-01-01 | NaN | Ask_Politics | 2021-01-01 | negative | 494 |
3 | 19055.655 | 2021-01-01 | NaN | Conservative | 2021-01-01 | positive | 11810 |
4 | 19055.655 | 2021-01-01 | NaN | Conservative | 2021-01-01 | neutral | 1063 |
... | ... | ... | ... | ... | ... | ... | ... |
726 | NaN | NaT | NaN | finance | 2023-03-01 | neutral | 65 |
727 | NaN | NaT | NaN | finance | 2023-03-01 | negative | 379 |
728 | NaN | NaT | NaN | socialism | 2023-03-01 | positive | 678 |
729 | NaN | NaT | NaN | socialism | 2023-03-01 | neutral | 49 |
730 | NaN | NaT | NaN | socialism | 2023-03-01 | negative | 495 |
731 rows × 7 columns
In [ ]:
# sanity check
unique_sorted_values = subs_gdp_df['year_month'].unique()
unique_sorted_values.sort()
unique_sorted_values
Out[ ]:
array(['2021-01-01T00:00:00.000000000', '2021-02-01T00:00:00.000000000', '2021-03-01T00:00:00.000000000', '2021-04-01T00:00:00.000000000', '2021-05-01T00:00:00.000000000', '2021-06-01T00:00:00.000000000', '2021-07-01T00:00:00.000000000', '2021-08-01T00:00:00.000000000', '2021-09-01T00:00:00.000000000', '2021-10-01T00:00:00.000000000', '2021-11-01T00:00:00.000000000', '2021-12-01T00:00:00.000000000', '2022-01-01T00:00:00.000000000', '2022-02-01T00:00:00.000000000', '2022-03-01T00:00:00.000000000', '2022-04-01T00:00:00.000000000', '2022-05-01T00:00:00.000000000', '2022-06-01T00:00:00.000000000', '2022-07-01T00:00:00.000000000', '2022-08-01T00:00:00.000000000', '2022-09-01T00:00:00.000000000', '2022-10-01T00:00:00.000000000', '2022-11-01T00:00:00.000000000', '2022-12-01T00:00:00.000000000', '2023-01-01T00:00:00.000000000', '2023-02-01T00:00:00.000000000', '2023-03-01T00:00:00.000000000', 'NaT'], dtype='datetime64[ns]')
In [ ]:
subs_gdp_df['value'].unique()
Out[ ]:
array([19055.655, nan, 19358.176, 19465.195, 19805.962, 19727.918, 19681.682, 20021.721, 20182.491, 20235.878, 20386.467, 22491.567])
In [ ]:
comms_gdp_df = pd.merge(real_gdp, comms_df, left_on='date', right_on='year_month', how='outer')
comms_gdp_df
Out[ ]:
value | date | value_shifted | subreddit | year_month | sentiment | total_comments_sentiment | |
---|---|---|---|---|---|---|---|
0 | 19055.655 | 2021-01-01 | NaN | Ask_Politics | 2021-01-01 | positive | 4697 |
1 | 19055.655 | 2021-01-01 | NaN | Ask_Politics | 2021-01-01 | neutral | 252 |
2 | 19055.655 | 2021-01-01 | NaN | Ask_Politics | 2021-01-01 | negative | 3317 |
3 | 19055.655 | 2021-01-01 | NaN | Conservative | 2021-01-01 | positive | 206993 |
4 | 19055.655 | 2021-01-01 | NaN | Conservative | 2021-01-01 | neutral | 17587 |
... | ... | ... | ... | ... | ... | ... | ... |
726 | NaN | NaT | NaN | finance | 2023-03-01 | neutral | 332 |
727 | NaN | NaT | NaN | finance | 2023-03-01 | negative | 2769 |
728 | NaN | NaT | NaN | socialism | 2023-03-01 | positive | 6077 |
729 | NaN | NaT | NaN | socialism | 2023-03-01 | neutral | 306 |
730 | NaN | NaT | NaN | socialism | 2023-03-01 | negative | 4204 |
731 rows × 7 columns
In [ ]:
# sanity check
unique_sorted_values = comms_gdp_df['year_month'].unique()
unique_sorted_values.sort()
unique_sorted_values
Out[ ]:
array(['2021-01-01T00:00:00.000000000', '2021-02-01T00:00:00.000000000', '2021-03-01T00:00:00.000000000', '2021-04-01T00:00:00.000000000', '2021-05-01T00:00:00.000000000', '2021-06-01T00:00:00.000000000', '2021-07-01T00:00:00.000000000', '2021-08-01T00:00:00.000000000', '2021-09-01T00:00:00.000000000', '2021-10-01T00:00:00.000000000', '2021-11-01T00:00:00.000000000', '2021-12-01T00:00:00.000000000', '2022-01-01T00:00:00.000000000', '2022-02-01T00:00:00.000000000', '2022-03-01T00:00:00.000000000', '2022-04-01T00:00:00.000000000', '2022-05-01T00:00:00.000000000', '2022-06-01T00:00:00.000000000', '2022-07-01T00:00:00.000000000', '2022-08-01T00:00:00.000000000', '2022-09-01T00:00:00.000000000', '2022-10-01T00:00:00.000000000', '2022-11-01T00:00:00.000000000', '2022-12-01T00:00:00.000000000', '2023-01-01T00:00:00.000000000', '2023-02-01T00:00:00.000000000', '2023-03-01T00:00:00.000000000', 'NaT'], dtype='datetime64[ns]')
In [ ]:
unique_sorted_values = comms_gdp_df['date'].unique()
unique_sorted_values.sort()
unique_sorted_values
Out[ ]:
array(['2021-01-01T00:00:00.000000000', '2021-04-01T00:00:00.000000000', '2021-07-01T00:00:00.000000000', '2021-10-01T00:00:00.000000000', '2022-01-01T00:00:00.000000000', '2022-04-01T00:00:00.000000000', '2022-07-01T00:00:00.000000000', '2022-10-01T00:00:00.000000000', '2023-01-01T00:00:00.000000000', '2023-04-01T00:00:00.000000000', '2023-07-01T00:00:00.000000000', 'NaT'], dtype='datetime64[ns]')
In [ ]:
comms_gdp_df['value'].unique()
Out[ ]:
array([19055.655, 19358.176, 19465.195, 19805.962, 19727.918, 19681.682, 20021.721, 20182.491, 20235.878, 20386.467, 22491.567, nan])
Get Data for HighChart¶
In [ ]:
subreddits = subs_gdp_df['subreddit'].unique()
for subreddit in subreddits:
print(subreddit)
result_list = subs_gdp_df[(subs_gdp_df['sentiment'] == 'positive') & (subs_gdp_df['subreddit'] == subreddit)].groupby('year_month')['total_submissions_sentiment'].sum().astype(int).to_numpy()
result_string = ', '.join(map(str, result_list))
print(result_string)
Ask_Politics 504, 207, 187, 184, 175, 124, 99, 74, 95, 83, 58, 60, 70, 75, 72, 60, 62, 80, 77, 75, 52, 71, 70, 36, 43, 43, 28 Economics 745, 752, 841, 916, 651, 619, 547, 446, 588, 528, 464, 655, 740, 535, 520, 512, 597, 619, 669, 2864, 4067, 763, 671, 603, 664, 558, 923 Conservative 11810, 8246, 8827, 8169, 7674, 7053, 7079, 6974, 6655, 6960, 8153, 7855, 7818, 7594, 5837, 6142, 5872, 6322, 6252, 5558, 4756, 4856, 5075, 3466, 3456, 3413, 4199 Libertarian 2576, 1671, 1839, 1696, 1436, 1398, 1630, 1508, 1740, 1345, 1471, 1413, 1336, 1228, 917, 806, 711, 669, 493, 426, 325, 347, 356, 298, 259, 282, 357 Liberal 561, 311, 294, 320, 362, 243, 238, 212, 234, 239, 234, 233, 186, 187, 143, 158, 168, 180, 202, 177, 156, 167, 193, 180, 122, 156, 170 changemyview 1973, 1714, 1913, 1956, 1694, 1681, 1672, 1575, 1623, 1665, 1667, 1510, 1401, 1125, 1139, 1209, 1265, 1373, 1289, 1222, 1009, 1162, 1098, 1220, 1201, 1165, 1190 centrist 669, 355, 270, 331, 362, 415, 258, 256, 230, 267, 282, 197, 265, 234, 187, 185, 272, 268, 271, 268, 188, 220, 170, 214, 193, 223, 261 socialism 1429, 1162, 1104, 1049, 994, 914, 1064, 922, 978, 1039, 901, 1133, 1074, 895, 715, 794, 785, 754, 685, 649, 747, 620, 552, 623, 733, 626, 678 finance 622, 680, 556, 452, 544, 609, 644, 518, 553, 584, 536, 582, 600, 509, 746, 530, 592, 553, 520, 2412, 1586, 782, 839, 809, 601, 453, 796 nan
In [ ]:
subreddits = subs_gdp_df['subreddit'].unique()
for subreddit in subreddits:
print(subreddit)
result_list = subs_gdp_df[(subs_gdp_df['sentiment'] == 'negative') & (subs_gdp_df['subreddit'] == subreddit)].groupby('year_month')['total_submissions_sentiment'].sum().astype(int).to_numpy()
result_string = ', '.join(map(str, result_list))
print(result_string)
Ask_Politics 494, 242, 197, 187, 219, 136, 99, 105, 72, 46, 43, 52, 78, 74, 108, 62, 61, 87, 89, 72, 69, 68, 70, 53, 54, 38, 52 Economics 557, 537, 533, 489, 431, 419, 369, 363, 439, 302, 280, 364, 407, 372, 679, 436, 472, 556, 542, 1498, 2412, 765, 656, 582, 600, 501, 651 Conservative 8594, 6749, 8286, 7747, 7127, 6220, 6207, 7782, 5337, 5114, 5868, 5505, 6229, 6022, 5740, 5074, 5619, 5739, 5663, 5219, 4725, 4873, 4360, 3295, 3279, 3451, 4370 Libertarian 1696, 1230, 1553, 1431, 1235, 1172, 1146, 1203, 1045, 776, 845, 910, 826, 918, 808, 607, 547, 475, 360, 302, 250, 301, 249, 231, 235, 241, 320 Liberal 447, 211, 275, 331, 351, 182, 171, 219, 169, 99, 107, 153, 99, 96, 125, 107, 136, 155, 119, 143, 128, 146, 150, 123, 125, 103, 132 changemyview 1387, 1201, 1308, 1325, 1206, 977, 1054, 1124, 698, 511, 627, 665, 576, 604, 876, 766, 984, 897, 931, 804, 681, 790, 789, 725, 744, 734, 743 centrist 479, 288, 236, 303, 292, 286, 188, 241, 159, 126, 149, 127, 183, 198, 170, 159, 258, 238, 219, 259, 143, 200, 150, 163, 194, 156, 231 socialism 797, 706, 712, 691, 787, 578, 603, 594, 568, 499, 471, 507, 578, 570, 575, 532, 529, 493, 432, 445, 484, 413, 445, 430, 477, 435, 495 finance 284, 264, 194, 197, 190, 344, 314, 231, 266, 258, 202, 227, 257, 255, 293, 246, 252, 281, 246, 1104, 798, 329, 341, 345, 257, 265, 379 nan
In [ ]:
subreddits = subs_gdp_df['subreddit'].unique()
for subreddit in subreddits:
print(subreddit)
result_list = subs_gdp_df[(subs_gdp_df['sentiment'] == 'neutral') & (subs_gdp_df['subreddit'] == subreddit)].groupby('year_month')['total_submissions_sentiment'].sum().astype(int).to_numpy()
result_string = ', '.join(map(str, result_list))
print(result_string)
Ask_Politics 29, 15, 14, 18, 13, 5, 7, 6, 9, 3, 7, 7, 7, 4, 6, 2, 3, 7, 6, 4, 9, 4, 9, 7, 3, 4, 3 Economics 50, 53, 54, 47, 45, 31, 19, 17, 30, 20, 19, 26, 27, 27, 33, 29, 38, 46, 42, 109, 179, 56, 39, 35, 32, 41, 62 Conservative 1063, 630, 709, 692, 628, 572, 570, 559, 370, 383, 493, 453, 470, 491, 439, 479, 486, 548, 510, 454, 414, 431, 388, 261, 241, 289, 372 Libertarian 176, 96, 126, 104, 96, 101, 101, 81, 73, 69, 67, 80, 63, 61, 58, 43, 51, 39, 19, 29, 24, 17, 25, 23, 20, 24, 25 Liberal 50, 24, 30, 20, 31, 21, 17, 16, 16, 5, 9, 14, 11, 12, 14, 11, 12, 18, 18, 12, 15, 12, 18, 11, 14, 8, 16 changemyview 147, 116, 102, 133, 113, 86, 96, 102, 65, 46, 58, 54, 53, 50, 57, 75, 75, 51, 91, 58, 82, 69, 90, 67, 100, 66, 92 centrist 45, 34, 19, 22, 22, 22, 19, 18, 12, 13, 22, 12, 10, 13, 15, 14, 18, 17, 17, 20, 14, 11, 12, 16, 12, 19, 11 socialism 115, 86, 79, 92, 78, 59, 79, 68, 55, 37, 59, 61, 63, 58, 41, 52, 62, 48, 45, 48, 55, 47, 52, 24, 37, 39, 49 finance 39, 39, 26, 26, 15, 30, 24, 25, 30, 24, 25, 19, 26, 19, 41, 22, 23, 33, 17, 93, 79, 36, 69, 52, 39, 54, 65 nan
In [ ]: