Setup¶

In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.3.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Retrieving notices: ...working... done
Collecting package metadata (current_repodata.json): done
Solving environment: done


==> WARNING: A newer version of conda exists. <==
  current version: 23.3.1
  latest version: 23.10.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.10.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2023.08.22 |       h06a4308_0         123 KB
    certifi-2023.7.22          |  py310h06a4308_0         153 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       341.3 MB

The following NEW packages will be INSTALLED:

  openjdk            pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 

The following packages will be UPDATED:

  ca-certificates    conda-forge::ca-certificates-2023.7.2~ --> pkgs/main::ca-certificates-2023.08.22-h06a4308_0 

The following packages will be SUPERSEDED by a higher-priority channel:

  certifi            conda-forge/noarch::certifi-2023.7.22~ --> pkgs/main/linux-64::certifi-2023.7.22-py310h06a4308_0 



Downloading and Extracting Packages
certifi-2023.7.22    | 153 KB    |                                       |   0% 
ca-certificates-2023 | 123 KB    |                                       |   0% 

certifi-2023.7.22    | 153 KB    | ##################################### | 100% 
ca-certificates-2023 | 123 KB    | ##################################### | 100% 

openjdk-11.0.13      | 341.0 MB  | 3                                     |   1% 

openjdk-11.0.13      | 341.0 MB  | #9                                    |   5% 

openjdk-11.0.13      | 341.0 MB  | ###1                                  |   8% 

openjdk-11.0.13      | 341.0 MB  | ####8                                 |  13% 

openjdk-11.0.13      | 341.0 MB  | ######6                               |  18% 

openjdk-11.0.13      | 341.0 MB  | ########4                             |  23% 

openjdk-11.0.13      | 341.0 MB  | ##########2                           |  28% 

openjdk-11.0.13      | 341.0 MB  | ############                          |  33% 

openjdk-11.0.13      | 341.0 MB  | #############7                        |  37% 

openjdk-11.0.13      | 341.0 MB  | ###############4                      |  42% 

openjdk-11.0.13      | 341.0 MB  | #################                     |  46% 

openjdk-11.0.13      | 341.0 MB  | ##################7                   |  51% 

openjdk-11.0.13      | 341.0 MB  | ####################4                 |  55% 

openjdk-11.0.13      | 341.0 MB  | ######################1               |  60% 

openjdk-11.0.13      | 341.0 MB  | #######################9              |  65% 

openjdk-11.0.13      | 341.0 MB  | #########################7            |  70% 

openjdk-11.0.13      | 341.0 MB  | ###########################5          |  74% 

openjdk-11.0.13      | 341.0 MB  | #############################2        |  79% 

openjdk-11.0.13      | 341.0 MB  | ###############################       |  84% 

openjdk-11.0.13      | 341.0 MB  | ################################8     |  89% 

openjdk-11.0.13      | 341.0 MB  | ##################################5   |  93% 

openjdk-11.0.13      | 341.0 MB  | ####################################3 |  98% 

                                                                                
                                                                                

                                                                                
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.
Collecting pyspark==3.3.0
  Using cached pyspark-3.3.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.5 (from pyspark==3.3.0)
  Using cached py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Out[ ]:
In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-69ee557d-28be-4359-a171-21bb4dfa0142;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 405ms :: artifacts dl 35ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-69ee557d-28be-4359-a171-21bb4dfa0142
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/30ms)
23/11/10 03:14:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.3.0
In [ ]:
import sagemaker

session = sagemaker.Session()
bucket = session.default_bucket()
print(bucket)

# Create or retrieve a Spark session
spark = SparkSession.builder.appName("ReadS3Parquet").getOrCreate()
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker-us-east-1-433974840707
23/11/10 03:14:19 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.

Data¶

In [ ]:
# S3 directory path
s3_directory = f"s3a://{bucket}/project/cleaned/comments/"

# Read all the Parquet files in the directory into a DataFrame
df_comments_centrist_libertarian = spark.read.parquet(s3_directory)
In [ ]:
# S3 directory path
s3_directory = f"s3a://{bucket}/project/cleaned/submissions/"

# Read all the Parquet files in the directory into a DataFrame
df_submissions_centrist_libertarian = spark.read.parquet(s3_directory)

Importing Data for other Subreddits¶

In [ ]:
# Tegveer subreddits
anthony_bucket = 'sagemaker-us-east-1-711387073580'
s3_directory_anthony_submissions = f"s3a://{anthony_bucket}/project/cleaned/submissions/"
s3_directory_anthony_comments = f"s3a://{anthony_bucket}/project/cleaned/comments/"

# Read all the Parquet files in the directory into a DataFrame
df_submissions_askpolitics_changemyview = spark.read.parquet(s3_directory_anthony_submissions)
df_comments_askpolitics_changemyview = spark.read.parquet(s3_directory_anthony_comments)
In [ ]:
# Eric subreddits
eric_bucket = 'sagemaker-us-east-1-395393721134'
s3_directory_eric_submissions = f"s3a://{eric_bucket}/project/cleaned/submissions/"
s3_directory_eric_comments = f"s3a://{eric_bucket}/project/cleaned/comments/"

# Read all the Parquet files in the directory into a DataFrame
df_submissions_socialism_economics_liberal = spark.read.parquet(s3_directory_eric_submissions)
df_comments_socialism_economics_liberal = spark.read.parquet(s3_directory_eric_comments)
In [ ]:
# Raunak subreddits

raunak_bucket = 'sagemaker-us-east-1-224518912016'
s3_directory_raunak_submissions = f"s3a://{raunak_bucket}/project/cleaned/submissions/"
s3_directory_raunak_comments = f"s3a://{raunak_bucket}/project/cleaned/comments/"

# Read all the Parquet files in the directory into a DataFrame
df_submissions_conservative_finance = spark.read.parquet(s3_directory_raunak_submissions)
df_comments_conservative_finance = spark.read.parquet(s3_directory_raunak_comments)

EDA - Submissions Cosine Similarity and Jaccard Index for Grouped Subreddits (Politics & Economics)¶

In [ ]:
import pyspark.sql.functions as f
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
import plotly.graph_objs as go
import plotly.express as px
import plotly.io as pio
import pandas as pd
import numpy as np
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

Economics Word List¶

Title¶

In [ ]:
word_list_econ = ["recession", "inflation", "unemployment", "crypto", "blockchain", "nft"]

# Regular expression pattern to match words not in the list of words_to_keep

word_list_econ_pattern = "|".join(word_list_econ)

df_submissions_finance = df_submissions_conservative_finance.filter(col("subreddit")=="finance")
df_submissions_economics = df_submissions_socialism_economics_liberal.filter(col("subreddit")=="Economics")
df_submissions_conservative = df_submissions_conservative_finance.filter(col("subreddit")=="Conservative")
df_submissions_socialism_liberal = df_submissions_socialism_economics_liberal.filter(col("subreddit")!="Economics")

df_submissions_pol = df_submissions_centrist_libertarian.union(df_submissions_conservative)
df_submissions_pol = df_submissions_pol.union(df_submissions_socialism_liberal)

# rename all political subreddits to politics for later groupby

df_submissions_pol = df_submissions_pol.withColumn("subreddit", f.lit("politics"))

# rename all econ subreddits to economics for later groupby

df_submissions_econ = df_submissions_finance.union(df_submissions_economics)

df_submissions_econ = df_submissions_econ.withColumn("subreddit", f.lit("economics"))

# Lowercase the selftext column for both df_submissions_pol and df_submissions_econ
df_submissions_pol = df_submissions_pol.withColumn("title_lowered", f.lower(col("title")))
df_submissions_econ = df_submissions_econ.withColumn("title_lowered", f.lower(col("title")))

# Filter selftext_lowered for df_submissions_pol using regex
df_submissions_pol_wle = df_submissions_pol.filter(col("title_lowered").rlike(word_list_econ_pattern))

# Filter selftext_lowered for df_submissions_econ using regex
df_submissions_econ_wle = df_submissions_econ.filter(col("title_lowered").rlike(word_list_econ_pattern))

# User-defined function to filter words
def filter_words(text):
    words = text.split()
    filtered_words = [word for word in words if word in word_list_econ]
    return ' '.join(filtered_words)

# Register UDF
filter_words_udf = f.udf(filter_words, StringType())

# Apply UDF to each df
df_submissions_pol_wle = df_submissions_pol_wle.withColumn("title_filtered", filter_words_udf(col("title_lowered")))
df_submissions_econ_wle = df_submissions_econ_wle.withColumn("title_filtered", filter_words_udf(col("title_lowered")))

# Sanity check to remove any non-matching rows
df_submissions_pol_wle = df_submissions_pol_wle.filter(col("title_filtered")!='')
df_submissions_econ_wle = df_submissions_econ_wle.filter(col("title_filtered")!='')

# Combine both dataframes to create pivoted table
df_combined = df_submissions_pol_wle.union(df_submissions_econ_wle)

# Apply regex pattern and count occurrences of words in word_list_econ for each subreddit
df_combined = df_combined.withColumn("list_word", f.split(col('title_filtered'), ' '))
df_combined = df_combined.withColumn("word", f.explode(col('list_word')))
df_combined_word_counts = df_combined.groupBy("subreddit", "word").agg(f.count("*").alias("wordCount"))

# Pivot the DataFrame to get total counts for each word by subreddit
df_pivoted = df_combined_word_counts.groupBy("subreddit").pivot("word").agg(f.sum("wordCount"))

df_pivoted.show()

# convert to pandas df
df_pivoted = df_pivoted.toPandas()
df_pivoted.to_csv('../../data/csv/econ_word_counts_grouped_subreddits_title.csv', index=False)

# convert back to pyspark df
df_pivoted = spark.createDataFrame(df_pivoted)
                                                                                
+---------+----------+------+---------+---+---------+------------+
|subreddit|blockchain|crypto|inflation|nft|recession|unemployment|
+---------+----------+------+---------+---+---------+------------+
| politics|        59|   462|     3195|256|      437|         448|
|economics|       224|  1365|     2350|156|      637|         302|
+---------+----------+------+---------+---+---------+------------+

                                                                                
In [ ]:
def cosine_similarity_calc(vec_1,vec_2):
	
	sim = np.dot(vec_1,vec_2)/(np.linalg.norm(vec_1)*np.linalg.norm(vec_2))
	
	return sim

pol_vec = df_pivoted.filter(col("subreddit") == "politics").select("recession", "inflation", "unemployment", "crypto", "blockchain", "nft").rdd.flatMap(lambda x: x).collect()
econ_vec = df_pivoted.filter(col("subreddit") == "economics").select("recession", "inflation", "unemployment", "crypto", "blockchain", "nft").rdd.flatMap(lambda x: x).collect()

cosine_sim_econ_grouped_subreddits_title = cosine_similarity_calc(pol_vec,econ_vec)

print('Cosine smilarity:', cosine_sim_econ_grouped_subreddits_title)
Cosine smilarity: 0.9248006097149873
In [ ]:
# Calculate Jaccard Similarity

def jaccard_index(counts1, counts2):
  """Calculates the Jaccard index between two vectors of counts."""

  intersection = np.sum(np.minimum(counts1, counts2))
  union = np.sum(np.maximum(counts1, counts2))

  return intersection / union

# Calculate the Jaccard index between the two documents
jaccard_index_econ_grouped_subreddits_title = jaccard_index(pol_vec, econ_vec)

# Print the Jaccard index
print('Jaccard smilarity:', jaccard_index_econ_grouped_subreddits_title)
Jaccard smilarity: 0.6148571428571429

Body¶

In [ ]:
word_list_econ = ["recession", "inflation", "unemployment", "crypto", "blockchain", "nft"]

# Regular expression pattern to match words not in the list of words_to_keep

word_list_econ_pattern = "|".join(word_list_econ)

df_submissions_finance = df_submissions_conservative_finance.filter(col("subreddit")=="finance")
df_submissions_economics = df_submissions_socialism_economics_liberal.filter(col("subreddit")=="Economics")
df_submissions_conservative = df_submissions_conservative_finance.filter(col("subreddit")=="Conservative")
df_submissions_socialism_liberal = df_submissions_socialism_economics_liberal.filter(col("subreddit")!="Economics")

df_submissions_pol = df_submissions_centrist_libertarian.union(df_submissions_conservative)
df_submissions_pol = df_submissions_pol.union(df_submissions_socialism_liberal)

# rename all political subreddits to politics for later groupby

df_submissions_pol = df_submissions_pol.withColumn("subreddit", f.lit("politics"))

# rename all econ subreddits to economics for later groupby

df_submissions_econ = df_submissions_finance.union(df_submissions_economics)

df_submissions_econ = df_submissions_econ.withColumn("subreddit", f.lit("economics"))

# Lowercase the selftext column for both df_submissions_pol and df_submissions_econ
df_submissions_pol = df_submissions_pol.withColumn("selftext_lowered", f.lower(col("selftext")))
df_submissions_econ = df_submissions_econ.withColumn("selftext_lowered", f.lower(col("selftext")))

# Filter selftext_lowered for df_submissions_pol using regex
df_submissions_pol_wle = df_submissions_pol.filter(col("selftext_lowered").rlike(word_list_econ_pattern))

# Filter selftext_lowered for df_submissions_econ using regex
df_submissions_econ_wle = df_submissions_econ.filter(col("selftext_lowered").rlike(word_list_econ_pattern))

# User-defined function to filter words
def filter_words(text):
    words = text.split()
    filtered_words = [word for word in words if word in word_list_econ]
    return ' '.join(filtered_words)

# Register UDF
filter_words_udf = f.udf(filter_words, StringType())

# Apply UDF to each df
df_submissions_pol_wle = df_submissions_pol_wle.withColumn("selftext_filtered", filter_words_udf(col("selftext_lowered")))
df_submissions_econ_wle = df_submissions_econ_wle.withColumn("selftext_filtered", filter_words_udf(col("selftext_lowered")))

# Sanity check to remove any non-matching rows
df_submissions_pol_wle = df_submissions_pol_wle.filter(col("selftext_filtered")!='')
df_submissions_econ_wle = df_submissions_econ_wle.filter(col("selftext_filtered")!='')

# Combine both dataframes to create pivoted table
df_combined = df_submissions_pol_wle.union(df_submissions_econ_wle)

# Apply regex pattern and count occurrences of words in word_list_econ for each subreddit
df_combined = df_combined.withColumn("list_word", f.split(col('selftext_filtered'), ' '))
df_combined = df_combined.withColumn("word", f.explode(col('list_word')))
df_combined_word_counts = df_combined.groupBy("subreddit", "word").agg(f.count("*").alias("wordCount"))

# Pivot the DataFrame to get total counts for each word by subreddit
df_pivoted = df_combined_word_counts.groupBy("subreddit").pivot("word").agg(f.sum("wordCount"))

df_pivoted.show()

# convert to pandas df
df_pivoted = df_pivoted.toPandas()
df_pivoted.to_csv('../../data/csv/econ_word_counts_grouped_subreddits_body.csv', index=False)

# convert back to pyspark df
df_pivoted = spark.createDataFrame(df_pivoted)
                                                                                
+---------+----------+------+---------+----+---------+------------+
|subreddit|blockchain|crypto|inflation| nft|recession|unemployment|
+---------+----------+------+---------+----+---------+------------+
| politics|        20|    98|      349|   5|       39|         146|
|economics|      null|  null|        9|null|        2|          10|
+---------+----------+------+---------+----+---------+------------+

                                                                                
In [ ]:
# Calculate Cosine Similarity

pol_vec = df_pivoted.filter(col("subreddit") == "politics").select("recession", "inflation", "unemployment", "crypto", "blockchain", "nft").rdd.flatMap(lambda x: x).collect()
econ_vec = df_pivoted.filter(col("subreddit") == "economics").select("recession", "inflation", "unemployment", "crypto", "blockchain", "nft").fillna(0).rdd.flatMap(lambda x: x).collect()

cosine_sim_econ_grouped_subreddits_body = cosine_similarity_calc(pol_vec,econ_vec)

print('Cosine similarity:', cosine_sim_econ_grouped_subreddits_body)
Cosine similarity: 0.8747185771005317
In [ ]:
# Calculate Jaccard Similarity

# Calculate the Jaccard index between the two documents
jaccard_index_econ_grouped_subreddits_body = jaccard_index(pol_vec, econ_vec)

# Print the Jaccard index
print('Jaccard similarity:', jaccard_index_econ_grouped_subreddits_body)
Jaccard similarity: 0.0319634703196347

The cosine similarity is high because the two vectors are pointing in the same general direction. This means that the two documents have similar distributions of words, even though the counts of individual words may be different. To correctly measure the similarity between the counts of words in the two documents, we use a different similarity metric, the Jaccard index.

Politics Word List¶

Title¶

In [ ]:
word_list_pol = ["trump", "biden", "election", "fed", "powell"]

# Regular expression pattern to match words not in the list of words_to_keep

word_list_pol_pattern = "|".join(word_list_pol)

df_submissions_finance = df_submissions_conservative_finance.filter(col("subreddit")=="finance")
df_submissions_economics = df_submissions_socialism_economics_liberal.filter(col("subreddit")=="Economics")
df_submissions_conservative = df_submissions_conservative_finance.filter(col("subreddit")=="Conservative")
df_submissions_socialism_liberal = df_submissions_socialism_economics_liberal.filter(col("subreddit")!="Economics")

df_submissions_pol = df_submissions_centrist_libertarian.union(df_submissions_conservative)
df_submissions_pol = df_submissions_pol.union(df_submissions_socialism_liberal)

# rename all political subreddits to politics for later groupby

df_submissions_pol = df_submissions_pol.withColumn("subreddit", f.lit("politics"))

# rename all econ subreddits to economics for later groupby

df_submissions_econ = df_submissions_finance.union(df_submissions_economics)

df_submissions_econ = df_submissions_econ.withColumn("subreddit", f.lit("economics"))

# Lowercase the selftext column for both df_submissions_pol and df_submissions_econ
df_submissions_pol = df_submissions_pol.withColumn("title_lowered", f.lower(col("title")))
df_submissions_econ = df_submissions_econ.withColumn("title_lowered", f.lower(col("title")))

# Filter selftext_lowered for df_submissions_pol using regex
df_submissions_pol_wlp = df_submissions_pol.filter(col("title_lowered").rlike(word_list_pol_pattern))

# Filter selftext_lowered for df_submissions_econ using regex
df_submissions_econ_wlp = df_submissions_econ.filter(col("title_lowered").rlike(word_list_pol_pattern))

# User-defined function to filter words
def filter_words(text):
    words = text.split()
    filtered_words = [word for word in words if word in word_list_pol]
    return ' '.join(filtered_words)

# Register UDF
filter_words_udf = f.udf(filter_words, StringType())

# Apply UDF to each df
df_submissions_pol_wlp = df_submissions_pol_wlp.withColumn("title_filtered", filter_words_udf(col("title_lowered")))
df_submissions_econ_wlp = df_submissions_econ_wlp.withColumn("title_filtered", filter_words_udf(col("title_lowered")))

# Sanity check to remove any non-matching rows
df_submissions_pol_wlp = df_submissions_pol_wlp.filter(col("title_filtered")!='')
df_submissions_econ_wlp = df_submissions_econ_wlp.filter(col("title_filtered")!='')

# Combine both dataframes to create pivoted table
df_combined = df_submissions_pol_wlp.union(df_submissions_econ_wlp)

# Apply regex pattern and count occurrences of words in word_list_econ for each subreddit
df_combined = df_combined.withColumn("list_word", f.split(col('title_filtered'), ' '))
df_combined = df_combined.withColumn("word", f.explode(col('list_word')))
df_combined_word_counts = df_combined.groupBy("subreddit", "word").agg(f.count("*").alias("wordCount"))

# Pivot the DataFrame to get total counts for each word by subreddit
df_pivoted = df_combined_word_counts.groupBy("subreddit").pivot("word").agg(f.sum("wordCount"))

df_pivoted.show()

# convert to pandas df
df_pivoted = df_pivoted.toPandas()
df_pivoted.to_csv('../../data/csv/pol_word_counts_grouped_subreddits_title.csv', index=False)

# convert back to pyspark df
df_pivoted = spark.createDataFrame(df_pivoted)
                                                                                
+---------+-----+--------+----+------+-----+
|subreddit|biden|election| fed|powell|trump|
+---------+-----+--------+----+------+-----+
| politics|47162|    7447| 779|   315|26895|
|economics|  828|     121|1253|   215|  342|
+---------+-----+--------+----+------+-----+

                                                                                
In [ ]:
pol_vec = df_pivoted.filter(col("subreddit") == "politics").select("trump", "biden", "election", "fed", "powell").rdd.flatMap(lambda x: x).collect()
econ_vec = df_pivoted.filter(col("subreddit") == "economics").select("trump", "biden", "election", "fed", "powell").fillna(0).rdd.flatMap(lambda x: x).collect()

cosine_sim_pol_grouped_subreddits_title = cosine_similarity_calc(pol_vec,econ_vec)

print('Cosine similarity:', cosine_sim_pol_grouped_subreddits_title)
Cosine similarity: 0.5870875602800988
In [ ]:
# Calculate the Jaccard index between the two documents
jaccard_index_pol_grouped_subreddits_title = jaccard_index(pol_vec, econ_vec)

# Print the Jaccard index
print('Jaccard similarity:', jaccard_index_pol_grouped_subreddits_title)
Jaccard similarity: 0.027506259630200307

Body¶

In [ ]:
word_list_pol = ["trump", "biden", "election", "fed", "powell"]

# Regular expression pattern to match words not in the list of words_to_keep

word_list_pol_pattern = "|".join(word_list_pol)

df_submissions_finance = df_submissions_conservative_finance.filter(col("subreddit")=="finance")
df_submissions_economics = df_submissions_socialism_economics_liberal.filter(col("subreddit")=="Economics")
df_submissions_conservative = df_submissions_conservative_finance.filter(col("subreddit")=="Conservative")
df_submissions_socialism_liberal = df_submissions_socialism_economics_liberal.filter(col("subreddit")!="Economics")

df_submissions_pol = df_submissions_centrist_libertarian.union(df_submissions_conservative)
df_submissions_pol = df_submissions_pol.union(df_submissions_socialism_liberal)

# rename all political subreddits to politics for later groupby

df_submissions_pol = df_submissions_pol.withColumn("subreddit", f.lit("politics"))

# rename all econ subreddits to economics for later groupby

df_submissions_econ = df_submissions_finance.union(df_submissions_economics)

df_submissions_econ = df_submissions_econ.withColumn("subreddit", f.lit("economics"))

# Lowercase the selftext column for both df_submissions_pol and df_submissions_econ
df_submissions_pol = df_submissions_pol.withColumn("selftext_lowered", f.lower(col("selftext")))
df_submissions_econ = df_submissions_econ.withColumn("selftext_lowered", f.lower(col("selftext")))

# Filter selftext_lowered for df_submissions_pol using regex
df_submissions_pol_wlp = df_submissions_pol.filter(col("selftext_lowered").rlike(word_list_pol_pattern))

# Filter selftext_lowered for df_submissions_econ using regex
df_submissions_econ_wlp = df_submissions_econ.filter(col("selftext_lowered").rlike(word_list_pol_pattern))

# User-defined function to filter words
def filter_words(text):
    words = text.split()
    filtered_words = [word for word in words if word in word_list_pol]
    return ' '.join(filtered_words)

# Register UDF
filter_words_udf = f.udf(filter_words, StringType())

# Apply UDF to each df
df_submissions_pol_wlp = df_submissions_pol_wlp.withColumn("selftext_filtered", filter_words_udf(col("selftext_lowered")))
df_submissions_econ_wlp = df_submissions_econ_wlp.withColumn("selftext_filtered", filter_words_udf(col("selftext_lowered")))

# Sanity check to remove any non-matching rows
df_submissions_pol_wlp = df_submissions_pol_wlp.filter(col("selftext_filtered")!='')
df_submissions_econ_wlp = df_submissions_econ_wlp.filter(col("selftext_filtered")!='')

# Combine both dataframes to create pivoted table
df_combined = df_submissions_pol_wlp.union(df_submissions_econ_wlp)

# Apply regex pattern and count occurrences of words in word_list_econ for each subreddit
df_combined = df_combined.withColumn("list_word", f.split(col('selftext_filtered'), ' '))
df_combined = df_combined.withColumn("word", f.explode(col('list_word')))
df_combined_word_counts = df_combined.groupBy("subreddit", "word").agg(f.count("*").alias("wordCount"))

# Pivot the DataFrame to get total counts for each word by subreddit
df_pivoted = df_combined_word_counts.groupBy("subreddit").pivot("word").agg(f.sum("wordCount"))

df_pivoted.show()

# convert to pandas df
df_pivoted = df_pivoted.toPandas()
df_pivoted.to_csv('../../data/csv/pol_word_counts_grouped_subreddits_body.csv', index=False)

# convert back to pyspark df
df_pivoted = spark.createDataFrame(df_pivoted)
                                                                                
+---------+-----+--------+---+------+-----+
|subreddit|biden|election|fed|powell|trump|
+---------+-----+--------+---+------+-----+
| politics| 1254|    1025|189|     9| 2242|
|economics|    1|    null|  6|  null|    1|
+---------+-----+--------+---+------+-----+

                                                                                
In [ ]:
pol_vec = df_pivoted.filter(col("subreddit") == "politics").select("trump", "biden", "election", "fed", "powell").rdd.flatMap(lambda x: x).collect()
econ_vec = df_pivoted.filter(col("subreddit") == "economics").select("trump", "biden", "election", "fed", "powell").fillna(0).rdd.flatMap(lambda x: x).collect()

cosine_sim_pol_grouped_subreddits_body = cosine_similarity_calc(pol_vec,econ_vec)

print('Cosine similarity:', cosine_sim_pol_grouped_subreddits_body)
Cosine similarity: 0.2709274013987519
In [ ]:
# Calculate the Jaccard index between the two documents
jaccard_index_pol_grouped_subreddits_body = jaccard_index(pol_vec, econ_vec)

# Print the Jaccard index
print('Jaccard similarity:', jaccard_index_pol_grouped_subreddits_body)
Jaccard similarity: 0.00169527442254715

Final Table - Submission Title Cosine Sim & Jaccard Index¶

In [ ]:
schema = StructType([
    StructField("Word List", StringType(), True),
    StructField("Cosine Similarity", DoubleType(), True),
    StructField("Jaccard Similarity", DoubleType(), True)
])

# Create the DataFrame with the defined schema
df_title = spark.createDataFrame([(", ".join(word_list_econ), float(cosine_sim_econ_grouped_subreddits_title), float(jaccard_index_econ_grouped_subreddits_title)), 
                                  (", ".join(word_list_pol), float(cosine_sim_pol_grouped_subreddits_title), float(jaccard_index_pol_grouped_subreddits_title))
                                  ],
                                  schema)

# round numeric cols
df_title = df_title.withColumn("Cosine Similarity", f.round(df_title["Cosine Similarity"], 4))
df_title = df_title.withColumn("Jaccard Similarity", f.round(df_title["Jaccard Similarity"], 4))

# convert to pandas for plotly
df_title = df_title.toPandas()

# create plotly datatable
fig = go.Figure(data=go.Table(
    header=dict(
        values=list(df_title.columns),
        fill_color='gold',
        align='center',
        font=dict(size=14, color='black')
    ),
    cells=dict(values=[df_title[col] for col in df_title.columns], align='center')
))

fig.show();

# save as html
pio.write_html(fig, file='../../data/plots/title_cosine_jaccard.html')

# save df as pandas and export as csv
df_title.to_csv('../../data/csv/title_cosine_jaccard.csv', index=False)

Final Table - Submission Body Cosine Sim & Jaccard Index¶

In [ ]:
schema = StructType([
    StructField("Word List", StringType(), True),
    StructField("Cosine Similarity", DoubleType(), True),
    StructField("Jaccard Similarity", DoubleType(), True)
])

# Create the DataFrame with the defined schema
df_body = spark.createDataFrame([(", ".join(word_list_econ), float(cosine_sim_econ_grouped_subreddits_body), float(jaccard_index_econ_grouped_subreddits_body)), 
                                  (", ".join(word_list_pol), float(cosine_sim_pol_grouped_subreddits_body), float(jaccard_index_pol_grouped_subreddits_body))],
                                  schema)

# round numeric cols
df_body = df_body.withColumn("Cosine Similarity", f.round(df_body["Cosine Similarity"], 4))
df_body = df_body.withColumn("Jaccard Similarity", f.round(df_body["Jaccard Similarity"], 4))

# convert to pandas for plotly
df_body = df_body.toPandas()

# create plotly datatable
fig = go.Figure(data=go.Table(
    header=dict(
        values=list(df_body.columns),
        fill_color='#F4A460',
        align='center',
        font=dict(size=14, color='black')
    ),
    cells=dict(values=[df_body[col] for col in df_body.columns], align='center')
))

fig.show();

# save as html
pio.write_html(fig, file='../../data/plots/body_cosine_jaccard.html')

# save df as pandas and export as csv
df_body.to_csv('../../data/csv/body_cosine_jaccard.csv', index=False)

Reformat Tables¶

In [ ]:
df_title = pd.read_csv("../../data/csv/title_cosine_jaccard.csv")

# create plotly datatable
fig = go.Figure(data=go.Table(
    header=dict(
        values=list(df_title.columns),
        fill_color='#F4A460',
        align='center',
        font=dict(size=14, color='black')
    ),
    cells=dict(
        values=[df_title[col] for col in df_title.columns], 
        align='center',
        font=dict(size=14, color='black'),
    )
))
fig.update_layout(width=350, height=265, margin={"l":0,"r":0,"t":0,"b":0})

fig.show();

# save as html
pio.write_html(fig, file='../../data/plots/title_cosine_jaccard.html')
In [ ]:
df_body = pd.read_csv("../../data/csv/body_cosine_jaccard.csv")

# create plotly datatable
fig = go.Figure(data=go.Table(
    header=dict(
        values=list(df_body.columns),
        fill_color='#F4A460',
        align='center',
        font=dict(size=14, color='black')
    ),
    cells=dict(
        values=[df_body[col] for col in df_body.columns], 
        align='center',
        font=dict(size=14, color='black')
    )
))
fig.update_layout(width=350, height=265, margin={"l":0,"r":0,"t":0,"b":0})

fig.show();

# save as html
pio.write_html(fig, file='../../data/plots/body_cosine_jaccard.html')