In this notebook, I begin where I left off from from the azure notebook. We will begin by reading in the data prepared in azure and migrated to S3. Then, we will start the analysis from TF-IDF of Determining the most important words in economics subreddits comments body using TF-IDF.¶

Setup - AWS¶

In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.4.0

# install spark-nlp
%pip install spark-nlp==5.1.3

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Retrieving notices: ...working... done
Collecting package metadata (current_repodata.json): done
Solving environment: done


==> WARNING: A newer version of conda exists. <==
  current version: 23.3.1
  latest version: 23.10.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.10.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2023.08.22 |       h06a4308_0         123 KB
    certifi-2023.11.17         |  py310h06a4308_0         158 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       341.3 MB

The following NEW packages will be INSTALLED:

  openjdk            pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 

The following packages will be UPDATED:

  ca-certificates    conda-forge::ca-certificates-2023.7.2~ --> pkgs/main::ca-certificates-2023.08.22-h06a4308_0 
  certifi            conda-forge/noarch::certifi-2023.7.22~ --> pkgs/main/linux-64::certifi-2023.11.17-py310h06a4308_0 



Downloading and Extracting Packages
certifi-2023.11.17   | 158 KB    |                                       |   0% 
ca-certificates-2023 | 123 KB    |                                       |   0% 

openjdk-11.0.13      | 341.0 MB  |                                       |   0% 
certifi-2023.11.17   | 158 KB    | ##################################### | 100% 

openjdk-11.0.13      | 341.0 MB  | 4                                     |   1% 

openjdk-11.0.13      | 341.0 MB  | ##1                                   |   6% 

openjdk-11.0.13      | 341.0 MB  | ###7                                  |  10% 

openjdk-11.0.13      | 341.0 MB  | #####7                                |  15% 

openjdk-11.0.13      | 341.0 MB  | #######5                              |  20% 

openjdk-11.0.13      | 341.0 MB  | #########4                            |  26% 

openjdk-11.0.13      | 341.0 MB  | ###########3                          |  31% 

openjdk-11.0.13      | 341.0 MB  | #############1                        |  36% 

openjdk-11.0.13      | 341.0 MB  | ###############                       |  41% 

openjdk-11.0.13      | 341.0 MB  | ################8                     |  46% 

openjdk-11.0.13      | 341.0 MB  | ##################7                   |  51% 

openjdk-11.0.13      | 341.0 MB  | ####################6                 |  56% 

openjdk-11.0.13      | 341.0 MB  | ######################5               |  61% 

openjdk-11.0.13      | 341.0 MB  | ########################4             |  66% 

openjdk-11.0.13      | 341.0 MB  | ##########################4           |  71% 

openjdk-11.0.13      | 341.0 MB  | ############################3         |  77% 

openjdk-11.0.13      | 341.0 MB  | ##############################2       |  82% 

openjdk-11.0.13      | 341.0 MB  | ################################1     |  87% 

openjdk-11.0.13      | 341.0 MB  | ##################################    |  92% 

openjdk-11.0.13      | 341.0 MB  | ###################################9  |  97% 

                                                                                
                                                                                

                                                                                
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.
Collecting pyspark==3.4.0
  Using cached pyspark-3.4.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark==3.4.0)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.0
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Collecting spark-nlp==5.1.3
  Obtaining dependency information for spark-nlp==5.1.3 from https://files.pythonhosted.org/packages/cd/7d/bc0eca4c9ec4c9c1d9b28c42c2f07942af70980a7d912d0aceebf8db32dd/spark_nlp-5.1.3-py2.py3-none-any.whl.metadata
  Using cached spark_nlp-5.1.3-py2.py3-none-any.whl.metadata (53 kB)
Using cached spark_nlp-5.1.3-py2.py3-none-any.whl (537 kB)
Installing collected packages: spark-nlp
Successfully installed spark-nlp-5.1.3
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Out[ ]:
In [ ]:
import json
import sparknlp
import numpy as np
import pandas as pd
from sparknlp.base import *
from pyspark.ml import Pipeline
from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sparknlp.pretrained import PretrainedPipeline
In [ ]:
# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")\
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )\
    .getOrCreate()
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-481cb060-bbfb-48f7-90f6-61dfa9776bfe;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 314ms :: artifacts dl 20ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-481cb060-bbfb-48f7-90f6-61dfa9776bfe
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/17ms)
23/11/22 18:08:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
In [ ]:
print(f"Spark version: {spark.version}")
print(f"sparknlp version: {sparknlp.version()}")
Spark version: 3.4.0
sparknlp version: 5.1.3

Read in NLP Cleaned Data¶

In [ ]:
import sagemaker
sess = sagemaker.Session()
bucket = sess.default_bucket()
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml

Comments¶

In [ ]:
# S3 directory path
s3_directory_comms = f"s3a://{bucket}/project/nlp_cleaned_comments/"

# Read all the Parquet files in the directory into a DataFrame
comments_cleaned = spark.read.parquet(s3_directory_comms)
23/11/22 18:08:22 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

Submissions¶

In [ ]:
# S3 directory path
s3_directory_subs = f"s3a://{bucket}/project/nlp_cleaned_submissions/"

# Read all the Parquet files in the directory into a DataFrame
submissions_cleaned= spark.read.parquet(s3_directory_subs)

TF-IDF (continue from azure), Common Words, and Text Length¶

Determine the most important words in economics subreddits comments body using TF-IDF¶

Processing Job¶

In [ ]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF, HashingTF, StopWordsRemover
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, MapType, StringType
import string
from sparknlp.pretrained import PretrainedPipeline
import matplotlib.pyplot as plt

from pyspark.ml.feature import SQLTransformer
from sparknlp.annotator import Tokenizer, Normalizer, StopWordsCleaner, LemmatizerModel
In [ ]:
excluded_subreddits = ["Economics", "finance"]

# Concat body, title and filter for political subreddits
econ_comms_body_words = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits)) \
                                               .select(F.col("cleaned_body"))\

# Define HashingTF
hashingTF = HashingTF(inputCol="cleaned_body", outputCol="rawFeatures",numFeatures=40000000)

# Define IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")

words = econ_comms_body_words.select('cleaned_body')

# Hashing frequency
tf = hashingTF.transform(words)

# IDF
idf_model = idf.fit(tf)

# TFIDF
tfidf = idf_model.transform(tf)

ndf = tfidf.select(F.explode('cleaned_body').name('expwords')).withColumn('words',F.array('expwords'))

# Define HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=40000000)

hashudf = F.udf(lambda vector : vector.indices.tolist()[0],F.StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(F.col('rawFeatures')))
wordtf = wordtf.dropDuplicates(["expwords"])

udf1 = F.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = tfidf.select(F.explode(udf1(F.col('features'))).name('wordhash','value'))
valuedf = valuedf.dropDuplicates(["wordhash"])
join_importance = wordtf.join(valuedf, wordtf.wordhash == valuedf.wordhash, 'inner').select(wordtf.expwords, valuedf.value)
top100_econ_comms = join_importance.sort(F.col("value").desc()).limit(100).toPandas()

top100_econ_comms = top100_econ_comms.reset_index(drop=True)
top100_econ_comms = top100_econ_comms.rename(columns={"expwords":"word", "value":"tfidf_value"})
INFO:py4j.clientserver:Error while receiving.                       (4 + 4) / 8]
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
INFO:py4j.clientserver:Closing down clientserver connection
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
INFO:py4j.clientserver:Closing down clientserver connection
---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
Cell In[13], line 19
     16 tf = hashingTF.transform(words)
     18 # IDF
---> 19 idf_model = idf.fit(tf)
     21 # TFIDF
     22 tfidf = idf_model.transform(tf)

File /opt/conda/lib/python3.10/site-packages/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params)
    203         return self.copy(params)._fit(dataset)
    204     else:
--> 205         return self._fit(dataset)
    206 else:
    207     raise TypeError(
    208         "Params must be either a param map or a list/tuple of param maps, "
    209         "but got %s." % type(params)
    210     )

File /opt/conda/lib/python3.10/site-packages/pyspark/ml/wrapper.py:381, in JavaEstimator._fit(self, dataset)
    380 def _fit(self, dataset: DataFrame) -> JM:
--> 381     java_model = self._fit_java(dataset)
    382     model = self._create_model(java_model)
    383     return self._copyValues(model)

File /opt/conda/lib/python3.10/site-packages/pyspark/ml/wrapper.py:378, in JavaEstimator._fit_java(self, dataset)
    375 assert self._java_obj is not None
    377 self._transfer_params_to_java()
--> 378 return self._java_obj.fit(dataset._jdf)

File /opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/conda/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
    167 def deco(*a: Any, **kw: Any) -> Any:
    168     try:
--> 169         return f(*a, **kw)
    170     except Py4JJavaError as e:
    171         converted = convert_exception(e.java_exception)

File /opt/conda/lib/python3.10/site-packages/py4j/protocol.py:334, in get_return_value(answer, gateway_client, target_id, name)
    330             raise Py4JError(
    331                 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332                 format(target_id, ".", name, value))
    333     else:
--> 334         raise Py4JError(
    335             "An error occurred while calling {0}{1}{2}".
    336             format(target_id, ".", name))
    337 else:
    338     type = answer[1]

Py4JError: An error occurred while calling o59.fit
In [ ]:
top100_econ_comms.to_csv("../data/csv/tfidf_econ_comms.csv",index=False)
INFO:py4j.clientserver:Error while receiving.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
INFO:py4j.clientserver:Closing down clientserver connection
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection
INFO:py4j.clientserver:Closing down clientserver connection

Determine the most common words among politics submissions title and body¶

In [ ]:
excluded_subreddits = ["Economics", "finance"]

# Concat body, title and filter for political subreddits
pol_subs_body_title_words = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
                                               .select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
                                               .alias("nlpbodytext"))

# Split the nlpbodytext into an array of words
pol_subs_body_title_words = pol_subs_body_title_words.withColumn("array_words", F.split(pol_subs_body_title_words["nlpbodytext"], " "))

# Flatten words array
words_flat = pol_subs_body_title_words.select(F.explode("array_words").alias("word"))

# Count word occurrences
word_counts = words_flat.groupBy("word").count()

# Get top 100 most common words
freq100_pol_subs = word_counts.orderBy(F.desc("count")).limit(100).toPandas()

# Rename columns
freq100_pol_subs = freq100_pol_subs.rename(columns={"word": "word", "count": "word_count"})

freq100_pol_subs.to_csv('../../data/csv/wordcount_pol_subs.csv',index=False)
                                                                                

Determine the most common words among economics submissions title and body¶

In [ ]:
# Concat body, title and filter for political subreddits
econ_subs_body_title_words = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
                                               .select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
                                               .alias("nlpbodytext"))

substr_to_remove = ["http", "https", "tinyurl", "com", "www", "jpg", "uploads"]
regex = "|".join(substr_to_remove)
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("nlpbodytext", F.regexp_replace("nlpbodytext", regex, " "))

# Split the nlpbodytext into an array of words
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("array_words", F.split(econ_subs_body_title_words["nlpbodytext"], " "))

# Flatten words array
words_flat = econ_subs_body_title_words.select(F.explode("array_words").alias("word"))

# Count word occurrences
word_counts = words_flat.groupBy("word").count()

# Remove rows with blank strings and the letters 's' and 'e'
word_counts = word_counts.filter((F.length("word") > 0) & (F.col("word") != "s") & (F.col("word") != "e"))

# Get top 100 most common words
freq100_econ_subs = word_counts.orderBy(F.desc("count")).limit(100).toPandas()

# Rename columns
freq100_econ_subs = freq100_econ_subs.rename(columns={"word": "word", "count": "word_count"})

freq100_econ_subs.to_csv('../../data/csv/wordcount_econ_subs.csv',index=False)
                                                                                

Determine the most common words among politics comments body¶

In [ ]:
pol_comms_body_words = comments_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))

# Flatten words array
words_flat = pol_comms_body_words.select(F.explode("cleaned_body").alias("word"))

# Count word occurrences
word_counts = words_flat.groupBy("word").count()

# Get top 100 most common words
freq100_pol_comms = word_counts.orderBy(F.desc("count")).limit(100).toPandas()

# Rename columns
freq100_pol_comms = freq100_pol_comms.rename(columns={"word": "word", "count": "word_count"})

freq100_pol_comms.to_csv('../../data/csv/wordcount_pol_comms.csv',index=False)
                                                                                

Determine the most common words among economics comments body¶

In [ ]:
econ_comms_body_words = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))

# Flatten words array
words_flat = econ_comms_body_words.select(F.explode("cleaned_body").alias("word"))

# Count word occurrences
word_counts = words_flat.groupBy("word").count()

# Get top 100 most common words
freq100_econ_comms = word_counts.orderBy(F.desc("count")).limit(100).toPandas()

# Rename columns
freq100_econ_comms = freq100_econ_comms.rename(columns={"word": "word", "count": "word_count"})

freq100_econ_comms.to_csv('../../data/csv/wordcount_econ_comms.csv',index=False)
                                                                                

Distribution of text lengths of politics subreddits submissions body¶

In [ ]:
# Calculate text lengths
text_lengths = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
                                            .select(F.length("selftext").alias("text_length"))

# Convert Spark dataframe to Pandas dataframe
pandas_df_pol_subs = text_lengths.toPandas()
                                                                                
In [ ]:
# Plot distribution of text lengths
plt.hist(pandas_df_pol_subs["text_length"], bins=25)
plt.xlabel("Text Length (Words)")
plt.ylabel("Count")
plt.title("Politics Subreddits Submissions - Distribution of Text Lengths")
plt.savefig("../../data/plots/text_length_pol_subs.png")
plt.show()
No description has been provided for this image

Distribution of text lengths of economics subreddits submissions body¶

In [ ]:
# Calculate text lengths
text_lengths = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
                                           .select(F.length("selftext").alias("text_length"))

# Convert Spark dataframe to Pandas dataframe
pandas_df_econ_subs = text_lengths.toPandas()
                                                                                
In [ ]:
# Plot distribution of text lengths
plt.hist(pandas_df_econ_subs["text_length"], bins=50)
plt.xlabel("Text Length (Words)")
plt.ylabel("Count")
plt.title("Economics Subreddits Submissions - Distribution of Text Lengths")
plt.savefig("../../data/plots/text_length_econ_subs.png")
plt.show()
No description has been provided for this image
Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:458)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)

Distribution of text lengths of politics subreddits comments body¶

In [ ]:
excluded_subreddits = ["Economics", "finance"]

# Calculate text lengths
text_lengths = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
                                            .select(F.length("selftext").alias("text_length"))

# Convert Spark dataframe to Pandas dataframe
pandas_df_pol_comms = text_lengths.toPandas()
                                                                                
In [ ]:
import matplotlib.pyplot as plt
# Plot distribution of text lengths
plt.hist(pandas_df_pol_comms["text_length"], bins=50)
plt.xlabel("Text Length (Words)")
plt.ylabel("Count")
plt.title("Politics Subreddits Comments - Distribution of Text Lengths")
plt.savefig("../../data/plots/text_length_pol_comms.png")
plt.show()
No description has been provided for this image

Distribution of text lengths of economics subreddits comments body¶

In [ ]:
excluded_subreddits = ["Economics", "finance"]

# Calculate text lengths
text_lengths = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
                                         .select(F.length("body").alias("text_length"))

# Convert Spark dataframe to Pandas dataframe
pandas_df_econ_comms = text_lengths.toPandas()
                                                                                
In [ ]:
# Plot distribution of text lengths
plt.hist(pandas_df_econ_comms["text_length"], bins=50)
plt.xlabel("Text Length (Words)")
plt.ylabel("Count")
plt.title("Economics Subreddits Comments - Distribution of Text Lengths", y=1.05)
plt.savefig("../../data/plots/text_length_econ_comms.png")
plt.show()
No description has been provided for this image

Variation in text length over time¶

In [ ]:
# filter for subreddits + reformat date
econ_subs_body_title_words = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
                                                .withColumn('created_utc', F.date_format('created_utc', 'yyyy-MM-dd'))
pol_subs_body_title_words = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
                                                .withColumn('created_utc', F.date_format('created_utc', 'yyyy-MM-dd'))
pol_comms_body_words = comments_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
                                                .withColumn('created_utc', F.date_format('created_utc', 'yyyy-MM-dd'))
econ_comms_body_words = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
                                                .withColumn('created_utc', F.date_format('created_utc', 'yyyy-MM-dd'))

# create year + month cols
econ_subs_body_title_words = econ_subs_body_title_words.withColumn('year', F.year('created_utc')).withColumn('month', F.month('created_utc'))
pol_subs_body_title_words = pol_subs_body_title_words.withColumn('year', F.year('created_utc')).withColumn('month', F.month('created_utc'))
pol_comms_body_words = pol_comms_body_words.withColumn('year', F.year('created_utc')).withColumn('month', F.month('created_utc'))
econ_comms_body_words = econ_comms_body_words.withColumn('year', F.year('created_utc')).withColumn('month', F.month('created_utc'))
In [ ]:
# Add a column for wordcount
econ_subs_body_title_words = econ_subs_body_title_words.withColumn('word_count', F.size(F.split('selftext', ' ')))
pol_subs_body_title_words = pol_subs_body_title_words.withColumn('word_count', F.size(F.split('selftext', ' ')))
pol_comms_body_words = pol_comms_body_words.withColumn('word_count', F.size(F.split('body', ' ')))
econ_comms_body_words = econ_comms_body_words.withColumn('word_count', F.size(F.split('body', ' ')))
In [ ]:
# Grouping by month and year of the date_column
econ_subs_wordcount_grouped = econ_subs_body_title_words.groupBy("year", "month").agg(F.avg("word_count").alias("avg_count"))

# Sorting the DataFrame by year and month
econ_subs_wordcount_grouped = econ_subs_wordcount_grouped.sort(F.asc("year"), F.asc("month"))

# Grouping by month and year of the date_column
pol_subs_wordcount_grouped = pol_subs_body_title_words.groupBy("year", "month").agg(F.avg("word_count").alias("avg_count"))

# Sorting the DataFrame by year and month
pol_subs_wordcount_grouped = pol_subs_wordcount_grouped.sort(F.asc("year"), F.asc("month"))

# Grouping by month and year of the date_column
pol_comms_wordcount_grouped = pol_comms_body_words.groupBy("year", "month").agg(F.avg("word_count").alias("avg_count"))

# Sorting the DataFrame by year and month
pol_comms_wordcount_grouped = pol_comms_wordcount_grouped.sort(F.asc("year"), F.asc("month"))

# Grouping by month and year of the date_column
econ_comms_wordcount_grouped = econ_comms_body_words.groupBy("year", "month").agg(F.avg("word_count").alias("avg_count"))

# Sorting the DataFrame by year and month
econ_comms_wordcount_grouped = econ_comms_wordcount_grouped.sort(F.asc("year"), F.asc("month"))

# Converting Spark DataFrames to Pandas DataFrames
econ_subs_wordcount_grouped = econ_subs_wordcount_grouped.toPandas()
pol_subs_wordcount_grouped = pol_subs_wordcount_grouped.toPandas()
pol_comms_wordcount_grouped = pol_comms_wordcount_grouped.toPandas()
econ_comms_wordcount_grouped = econ_comms_wordcount_grouped.toPandas()

# Add subreddit+post column

econ_subs_wordcount_grouped['type'] = 'Economics Subreddits Submissions'
pol_subs_wordcount_grouped['type'] = 'Politics Subreddits Submissions'
pol_comms_wordcount_grouped['type'] = 'Politics Subreddits Comments'
econ_comms_wordcount_grouped['type'] = 'Economics Subreddits Comments'

# Outputting Pandas DataFrames to CSV files
# econ_subs_wordcount_grouped.to_csv("../data/csv/wordcount_econ_subs.csv", index=False)
# pol_subs_wordcount_grouped.to_csv("../data/csv/wordcount_pol_subs.csv", index=False)
# pol_comms_wordcount_grouped.to_csv("../data/csv/wordcount_pol_comms.csv", index=False)
# econ_comms_wordcount_grouped.to_csv("../data/csv/wordcount_econ_comms.csv", index=False)
                                                                                
In [ ]:
df_textlength = pd.concat([econ_subs_wordcount_grouped, pol_subs_wordcount_grouped, pol_comms_wordcount_grouped, econ_comms_wordcount_grouped], ignore_index=True)

df_textlength[['month', 'year']] = df_textlength[['month', 'year']].astype(str)
df_textlength['month_year'] = df_textlength.apply(lambda x: pd.to_datetime(x['month'] + ' ' + x['year'], format='%m %Y').strftime('%b %Y'), axis=1)
df_textlength = df_textlength[['month_year','avg_count','type']]
df_textlength['avg_count'] = df_textlength['avg_count'].round(2)

# Outputting Appended Pandas DataFrames to CSV files
df_textlength.to_csv("../../data/csv/wordcount_all.csv", index=False)
In [ ]:
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (5,2)
plt.rcParams["figure.dpi"] = 200

# Set font size
plt.rcParams.update({'font.size': 8})

# Create plot
fig, ax = plt.subplots(figsize=(10, 4))
for name, group in df_textlength.groupby('type'):
    ax.plot(group['month_year'], group['avg_count'], label=name)

# Set labels and title
ax.set_xlabel(None)
ax.set_ylabel('Average Word Count')
ax.set_title('Average Length of Politics and Economics Subreddits - Submissions and Comments')

ax.set_xticklabels(df_textlength['month_year'], rotation=45)

# Set legend
legend = ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.2), ncol=len(df_textlength['type'].unique()), fontsize=8, title=None)
# Remove the box around the legend
legend.get_frame().set_linewidth(0.0)
# Save plot
plt.savefig("../../data/plots/wordcount_over_time.png", bbox_inches='tight')
# Show plot
plt.show()
/tmp/ipykernel_5969/1166014668.py:18: UserWarning: FixedFormatter should only be used together with FixedLocator
  ax.set_xticklabels(df_textlength['month_year'], rotation=45)
No description has been provided for this image

LDA with gensim and pyLDAvis¶

Politics Subreddits Submissions¶

In [ ]:
!pip install gensim
Requirement already satisfied: gensim in /opt/conda/lib/python3.10/site-packages (4.1.2)
Requirement already satisfied: numpy>=1.17.0 in /opt/conda/lib/python3.10/site-packages (from gensim) (1.26.0)
Requirement already satisfied: scipy>=0.18.1 in /opt/conda/lib/python3.10/site-packages (from gensim) (1.11.3)
Requirement already satisfied: smart-open>=1.8.1 in /opt/conda/lib/python3.10/site-packages (from gensim) (5.2.1)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
In [ ]:
!pip install pyLDAvis
Collecting pyLDAvis
  Downloading pyLDAvis-3.4.1-py3-none-any.whl (2.6 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.6/2.6 MB 24.6 MB/s eta 0:00:0000:01
Requirement already satisfied: numpy>=1.24.2 in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (1.26.0)
Requirement already satisfied: scipy in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (1.11.3)
Collecting pandas>=2.0.0 (from pyLDAvis)
  Obtaining dependency information for pandas>=2.0.0 from https://files.pythonhosted.org/packages/1b/fa/4e5d054549faf1524230ffcd57ca98bb7350a4ed62ef722daabde4cb7632/pandas-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading pandas-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Requirement already satisfied: joblib>=1.2.0 in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (1.3.2)
Requirement already satisfied: jinja2 in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (3.1.2)
Requirement already satisfied: numexpr in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (2.8.3)
Collecting funcy (from pyLDAvis)
  Downloading funcy-2.0-py2.py3-none-any.whl (30 kB)
Requirement already satisfied: scikit-learn>=1.0.0 in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (1.0.1)
Requirement already satisfied: gensim in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (4.1.2)
Requirement already satisfied: setuptools in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (68.2.2)
Requirement already satisfied: python-dateutil>=2.8.2 in /opt/conda/lib/python3.10/site-packages (from pandas>=2.0.0->pyLDAvis) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /opt/conda/lib/python3.10/site-packages (from pandas>=2.0.0->pyLDAvis) (2022.1)
Collecting tzdata>=2022.1 (from pandas>=2.0.0->pyLDAvis)
  Downloading tzdata-2023.3-py2.py3-none-any.whl (341 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 341.8/341.8 kB 6.2 MB/s eta 0:00:0000:01
Requirement already satisfied: threadpoolctl>=2.0.0 in /opt/conda/lib/python3.10/site-packages (from scikit-learn>=1.0.0->pyLDAvis) (2.2.0)
Requirement already satisfied: smart-open>=1.8.1 in /opt/conda/lib/python3.10/site-packages (from gensim->pyLDAvis) (5.2.1)
Requirement already satisfied: MarkupSafe>=2.0 in /opt/conda/lib/python3.10/site-packages (from jinja2->pyLDAvis) (2.1.3)
Requirement already satisfied: packaging in /opt/conda/lib/python3.10/site-packages (from numexpr->pyLDAvis) (21.3)
Requirement already satisfied: six>=1.5 in /opt/conda/lib/python3.10/site-packages (from python-dateutil>=2.8.2->pandas>=2.0.0->pyLDAvis) (1.16.0)
Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /opt/conda/lib/python3.10/site-packages (from packaging->numexpr->pyLDAvis) (3.0.9)
Downloading pandas-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 12.3/12.3 MB 42.0 MB/s eta 0:00:00:00:010:01
Installing collected packages: funcy, tzdata, pandas, pyLDAvis
  Attempting uninstall: pandas
    Found existing installation: pandas 1.4.4
    Uninstalling pandas-1.4.4:
      Successfully uninstalled pandas-1.4.4
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
autovizwidget 0.21.0 requires pandas<2.0.0,>=0.20.1, but you have pandas 2.1.3 which is incompatible.
hdijupyterutils 0.21.0 requires pandas<2.0.0,>=0.17.1, but you have pandas 2.1.3 which is incompatible.
panel 0.13.1 requires bokeh<2.5.0,>=2.4.0, but you have bokeh 3.3.0 which is incompatible.
sagemaker-datawrangler 0.4.3 requires sagemaker-data-insights==0.4.0, but you have sagemaker-data-insights 0.3.3 which is incompatible.
Successfully installed funcy-2.0 pandas-2.1.3 pyLDAvis-3.4.1 tzdata-2023.3
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
In [ ]:
#Gensim
import gensim
import gensim.corpora as corpora

#vis
import pyLDAvis
import pyLDAvis.gensim
/opt/conda/lib/python3.10/site-packages/sklearn/utils/multiclass.py:14: DeprecationWarning: Please use `spmatrix` from the `scipy.sparse` namespace, the `scipy.sparse.base` namespace is deprecated.
  from scipy.sparse.base import spmatrix
/opt/conda/lib/python3.10/site-packages/sklearn/utils/optimize.py:18: DeprecationWarning: Please use `line_search_wolfe2` from the `scipy.optimize` namespace, the `scipy.optimize.linesearch` namespace is deprecated.
  from scipy.optimize.linesearch import line_search_wolfe2, line_search_wolfe1
/opt/conda/lib/python3.10/site-packages/sklearn/utils/optimize.py:18: DeprecationWarning: Please use `line_search_wolfe1` from the `scipy.optimize` namespace, the `scipy.optimize.linesearch` namespace is deprecated.
  from scipy.optimize.linesearch import line_search_wolfe2, line_search_wolfe1
In [ ]:
excluded_subreddits = ["Economics", "finance"]

# Concat body, title and filter for political subreddits
pol_subs_body_title_words = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
                                               .select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
                                               .alias("nlpbodytext"))

# Function to tokenize a text into a list of words
def tokenize_text(text):
    return text.split()

# Define a UDF to tokenize the text
tokenize_udf = F.udf(tokenize_text, ArrayType(StringType()))

# Apply the UDF to tokenize the text and create a new column
pol_subs_body_title_words = pol_subs_body_title_words.withColumn('tokenized_text', tokenize_udf(pol_subs_body_title_words['nlpbodytext']))

# Collect the tokenized text as a list of lists
texts = [row['tokenized_text'] for row in pol_subs_body_title_words.select('tokenized_text').collect()]

# Create the Gensim dictionary
id2word = corpora.Dictionary(texts)

# Create the corpus
corpus = [id2word.doc2bow(tokens) for tokens in texts]

print(corpus[0][0:20])

print(id2word[[0][:1][0]])
                                                                                
[(0, 2), (1, 1), (2, 1), (3, 1), (4, 2), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1), (10, 1), (11, 1), (12, 1), (13, 1), (14, 1), (15, 1), (16, 1), (17, 1), (18, 1), (19, 1)]
actually
In [ ]:
lda_model = gensim.models.ldamodel.LdaModel(corpus=corpus,
                                           id2word=id2word,
                                           num_topics=5,
                                           random_state=100,
                                           update_every=1,
                                           chunksize=100,
                                           passes=10,
                                           alpha="auto")
In [ ]:
pyLDAvis.enable_notebook()
vis = pyLDAvis.gensim.prepare(lda_model, corpus, id2word, mds="mmds", R=30, n_jobs=1)
vis
/opt/conda/lib/python3.10/site-packages/pandas/core/dtypes/cast.py:1841: DeprecationWarning: np.find_common_type is deprecated.  Please use `np.result_type` or `np.promote_types`.
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  with warnings.catch_warnings():
Out[ ]:
In [ ]:
pyLDAvis.save_html(vis, '../../data/plots/pol_subs_lda.html')

Economics Subreddits Submissions¶

In [ ]:
excluded_subreddits = ["Economics", "finance"]

# Concat body, title and filter for political subreddits
econ_subs_body_title_words = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
                                               .select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
                                               .alias("nlpbodytext"))

substr_to_remove = ["http", "https", "tinyurl", "com", "www", "jpg", "uploads", "delete", "remove", ]
regex = "|".join(substr_to_remove)
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("nlpbodytext", F.regexp_replace("nlpbodytext", regex, " "))

# Function to tokenize a text into a list of words
def tokenize_text(text):
    return text.split()

# Define a UDF to tokenize the text
tokenize_udf = F.udf(tokenize_text, ArrayType(StringType()))

# Apply the UDF to tokenize the text and create a new column
econ_subs_body_title_words = econ_subs_body_title_words.withColumn('tokenized_text', tokenize_udf(econ_subs_body_title_words['nlpbodytext']))

# Collect the tokenized text as a list of lists
texts = [row['tokenized_text'] for row in econ_subs_body_title_words.select('tokenized_text').collect()]

# Create the Gensim dictionary
id2word = corpora.Dictionary(texts)

# Create the corpus
corpus = [id2word.doc2bow(tokens) for tokens in texts]

print(corpus[0][0:20])

print(id2word[[0][:1][0]])
                                                                                
[(0, 1)]
shubham
In [ ]:
lda_model = gensim.models.ldamodel.LdaModel(corpus=corpus,
                                           id2word=id2word,
                                           num_topics=5,
                                           random_state=100,
                                           update_every=1,
                                           chunksize=100,
                                           passes=10,
                                           alpha="auto")
In [ ]:
pyLDAvis.enable_notebook()
vis = pyLDAvis.gensim.prepare(lda_model, corpus, id2word, mds="mmds", R=30, n_jobs=1)
vis
/opt/conda/lib/python3.10/site-packages/pandas/core/dtypes/cast.py:1841: DeprecationWarning: np.find_common_type is deprecated.  Please use `np.result_type` or `np.promote_types`.
See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information.  (Deprecated NumPy 1.25)
  with warnings.catch_warnings():
Out[ ]:
In [ ]:
pyLDAvis.save_html(vis, '../../data/plots/econ_subs_lda.html')