In this notebook, I begin where I left off from from the azure notebook. We will begin by reading in the data prepared in azure and migrated to S3. Then, we will start the analysis from TF-IDF of Determining the most important words in economics subreddits comments body using TF-IDF.¶
Setup - AWS¶
In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.4.0
# install spark-nlp
%pip install spark-nlp==5.1.3
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Retrieving notices: ...working... done Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.10.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.10.0 ## Package Plan ## environment location: /opt/conda added / updated specs: - openjdk The following packages will be downloaded: package | build ---------------------------|----------------- ca-certificates-2023.08.22 | h06a4308_0 123 KB certifi-2023.11.17 | py310h06a4308_0 158 KB openjdk-11.0.13 | h87a67e3_0 341.0 MB ------------------------------------------------------------ Total: 341.3 MB The following NEW packages will be INSTALLED: openjdk pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 The following packages will be UPDATED: ca-certificates conda-forge::ca-certificates-2023.7.2~ --> pkgs/main::ca-certificates-2023.08.22-h06a4308_0 certifi conda-forge/noarch::certifi-2023.7.22~ --> pkgs/main/linux-64::certifi-2023.11.17-py310h06a4308_0 Downloading and Extracting Packages certifi-2023.11.17 | 158 KB | | 0% ca-certificates-2023 | 123 KB | | 0% openjdk-11.0.13 | 341.0 MB | | 0% certifi-2023.11.17 | 158 KB | ##################################### | 100% openjdk-11.0.13 | 341.0 MB | 4 | 1% openjdk-11.0.13 | 341.0 MB | ##1 | 6% openjdk-11.0.13 | 341.0 MB | ###7 | 10% openjdk-11.0.13 | 341.0 MB | #####7 | 15% openjdk-11.0.13 | 341.0 MB | #######5 | 20% openjdk-11.0.13 | 341.0 MB | #########4 | 26% openjdk-11.0.13 | 341.0 MB | ###########3 | 31% openjdk-11.0.13 | 341.0 MB | #############1 | 36% openjdk-11.0.13 | 341.0 MB | ############### | 41% openjdk-11.0.13 | 341.0 MB | ################8 | 46% openjdk-11.0.13 | 341.0 MB | ##################7 | 51% openjdk-11.0.13 | 341.0 MB | ####################6 | 56% openjdk-11.0.13 | 341.0 MB | ######################5 | 61% openjdk-11.0.13 | 341.0 MB | ########################4 | 66% openjdk-11.0.13 | 341.0 MB | ##########################4 | 71% openjdk-11.0.13 | 341.0 MB | ############################3 | 77% openjdk-11.0.13 | 341.0 MB | ##############################2 | 82% openjdk-11.0.13 | 341.0 MB | ################################1 | 87% openjdk-11.0.13 | 341.0 MB | ################################## | 92% openjdk-11.0.13 | 341.0 MB | ###################################9 | 97% Preparing transaction: done Verifying transaction: done Executing transaction: done Note: you may need to restart the kernel to use updated packages. Collecting pyspark==3.4.0 Using cached pyspark-3.4.0-py2.py3-none-any.whl Collecting py4j==0.10.9.7 (from pyspark==3.4.0) Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB) Installing collected packages: py4j, pyspark Successfully installed py4j-0.10.9.7 pyspark-3.4.0 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages. Collecting spark-nlp==5.1.3 Obtaining dependency information for spark-nlp==5.1.3 from https://files.pythonhosted.org/packages/cd/7d/bc0eca4c9ec4c9c1d9b28c42c2f07942af70980a7d912d0aceebf8db32dd/spark_nlp-5.1.3-py2.py3-none-any.whl.metadata Using cached spark_nlp-5.1.3-py2.py3-none-any.whl.metadata (53 kB) Using cached spark_nlp-5.1.3-py2.py3-none-any.whl (537 kB) Installing collected packages: spark-nlp Successfully installed spark-nlp-5.1.3 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
Out[ ]:
In [ ]:
import json
import sparknlp
import numpy as np
import pandas as pd
from sparknlp.base import *
from pyspark.ml import Pipeline
from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sparknlp.pretrained import PretrainedPipeline
In [ ]:
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider",
)\
.getOrCreate()
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-481cb060-bbfb-48f7-90f6-61dfa9776bfe;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.2.2 in central found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central :: resolution report :: resolve 314ms :: artifacts dl 20ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default] org.apache.hadoop#hadoop-aws;3.2.2 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-481cb060-bbfb-48f7-90f6-61dfa9776bfe confs: [default] 0 artifacts copied, 2 already retrieved (0kB/17ms) 23/11/22 18:08:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
In [ ]:
print(f"Spark version: {spark.version}")
print(f"sparknlp version: {sparknlp.version()}")
Spark version: 3.4.0 sparknlp version: 5.1.3
Read in NLP Cleaned Data¶
In [ ]:
import sagemaker
sess = sagemaker.Session()
bucket = sess.default_bucket()
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
Comments¶
In [ ]:
# S3 directory path
s3_directory_comms = f"s3a://{bucket}/project/nlp_cleaned_comments/"
# Read all the Parquet files in the directory into a DataFrame
comments_cleaned = spark.read.parquet(s3_directory_comms)
23/11/22 18:08:22 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
Submissions¶
In [ ]:
# S3 directory path
s3_directory_subs = f"s3a://{bucket}/project/nlp_cleaned_submissions/"
# Read all the Parquet files in the directory into a DataFrame
submissions_cleaned= spark.read.parquet(s3_directory_subs)
TF-IDF (continue from azure), Common Words, and Text Length¶
Determine the most important words in economics subreddits comments body using TF-IDF¶
Processing Job¶
In [ ]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF, HashingTF, StopWordsRemover
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, MapType, StringType
import string
from sparknlp.pretrained import PretrainedPipeline
import matplotlib.pyplot as plt
from pyspark.ml.feature import SQLTransformer
from sparknlp.annotator import Tokenizer, Normalizer, StopWordsCleaner, LemmatizerModel
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
econ_comms_body_words = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits)) \
.select(F.col("cleaned_body"))\
# Define HashingTF
hashingTF = HashingTF(inputCol="cleaned_body", outputCol="rawFeatures",numFeatures=40000000)
# Define IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
words = econ_comms_body_words.select('cleaned_body')
# Hashing frequency
tf = hashingTF.transform(words)
# IDF
idf_model = idf.fit(tf)
# TFIDF
tfidf = idf_model.transform(tf)
ndf = tfidf.select(F.explode('cleaned_body').name('expwords')).withColumn('words',F.array('expwords'))
# Define HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=40000000)
hashudf = F.udf(lambda vector : vector.indices.tolist()[0],F.StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(F.col('rawFeatures')))
wordtf = wordtf.dropDuplicates(["expwords"])
udf1 = F.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = tfidf.select(F.explode(udf1(F.col('features'))).name('wordhash','value'))
valuedf = valuedf.dropDuplicates(["wordhash"])
join_importance = wordtf.join(valuedf, wordtf.wordhash == valuedf.wordhash, 'inner').select(wordtf.expwords, valuedf.value)
top100_econ_comms = join_importance.sort(F.col("value").desc()).limit(100).toPandas()
top100_econ_comms = top100_econ_comms.reset_index(drop=True)
top100_econ_comms = top100_econ_comms.rename(columns={"expwords":"word", "value":"tfidf_value"})
INFO:py4j.clientserver:Error while receiving. (4 + 4) / 8] Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty INFO:py4j.clientserver:Closing down clientserver connection ERROR:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command raise Py4JNetworkError( py4j.protocol.Py4JNetworkError: Error while sending or receiving INFO:py4j.clientserver:Closing down clientserver connection
--------------------------------------------------------------------------- Py4JError Traceback (most recent call last) Cell In[13], line 19 16 tf = hashingTF.transform(words) 18 # IDF ---> 19 idf_model = idf.fit(tf) 21 # TFIDF 22 tfidf = idf_model.transform(tf) File /opt/conda/lib/python3.10/site-packages/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params) 203 return self.copy(params)._fit(dataset) 204 else: --> 205 return self._fit(dataset) 206 else: 207 raise TypeError( 208 "Params must be either a param map or a list/tuple of param maps, " 209 "but got %s." % type(params) 210 ) File /opt/conda/lib/python3.10/site-packages/pyspark/ml/wrapper.py:381, in JavaEstimator._fit(self, dataset) 380 def _fit(self, dataset: DataFrame) -> JM: --> 381 java_model = self._fit_java(dataset) 382 model = self._create_model(java_model) 383 return self._copyValues(model) File /opt/conda/lib/python3.10/site-packages/pyspark/ml/wrapper.py:378, in JavaEstimator._fit_java(self, dataset) 375 assert self._java_obj is not None 377 self._transfer_params_to_java() --> 378 return self._java_obj.fit(dataset._jdf) File /opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args) 1316 command = proto.CALL_COMMAND_NAME +\ 1317 self.command_header +\ 1318 args_command +\ 1319 proto.END_COMMAND_PART 1321 answer = self.gateway_client.send_command(command) -> 1322 return_value = get_return_value( 1323 answer, self.gateway_client, self.target_id, self.name) 1325 for temp_arg in temp_args: 1326 if hasattr(temp_arg, "_detach"): File /opt/conda/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw) 167 def deco(*a: Any, **kw: Any) -> Any: 168 try: --> 169 return f(*a, **kw) 170 except Py4JJavaError as e: 171 converted = convert_exception(e.java_exception) File /opt/conda/lib/python3.10/site-packages/py4j/protocol.py:334, in get_return_value(answer, gateway_client, target_id, name) 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) 333 else: --> 334 raise Py4JError( 335 "An error occurred while calling {0}{1}{2}". 336 format(target_id, ".", name)) 337 else: 338 type = answer[1] Py4JError: An error occurred while calling o59.fit
In [ ]:
top100_econ_comms.to_csv("../data/csv/tfidf_econ_comms.csv",index=False)
INFO:py4j.clientserver:Error while receiving. Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty INFO:py4j.clientserver:Closing down clientserver connection ERROR:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command raise Py4JNetworkError( py4j.protocol.Py4JNetworkError: Error while sending or receiving INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection INFO:py4j.clientserver:Closing down clientserver connection
Determine the most common words among politics submissions title and body¶
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
pol_subs_body_title_words = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
.select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
.alias("nlpbodytext"))
# Split the nlpbodytext into an array of words
pol_subs_body_title_words = pol_subs_body_title_words.withColumn("array_words", F.split(pol_subs_body_title_words["nlpbodytext"], " "))
# Flatten words array
words_flat = pol_subs_body_title_words.select(F.explode("array_words").alias("word"))
# Count word occurrences
word_counts = words_flat.groupBy("word").count()
# Get top 100 most common words
freq100_pol_subs = word_counts.orderBy(F.desc("count")).limit(100).toPandas()
# Rename columns
freq100_pol_subs = freq100_pol_subs.rename(columns={"word": "word", "count": "word_count"})
freq100_pol_subs.to_csv('../../data/csv/wordcount_pol_subs.csv',index=False)
Determine the most common words among economics submissions title and body¶
In [ ]:
# Concat body, title and filter for political subreddits
econ_subs_body_title_words = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
.select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
.alias("nlpbodytext"))
substr_to_remove = ["http", "https", "tinyurl", "com", "www", "jpg", "uploads"]
regex = "|".join(substr_to_remove)
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("nlpbodytext", F.regexp_replace("nlpbodytext", regex, " "))
# Split the nlpbodytext into an array of words
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("array_words", F.split(econ_subs_body_title_words["nlpbodytext"], " "))
# Flatten words array
words_flat = econ_subs_body_title_words.select(F.explode("array_words").alias("word"))
# Count word occurrences
word_counts = words_flat.groupBy("word").count()
# Remove rows with blank strings and the letters 's' and 'e'
word_counts = word_counts.filter((F.length("word") > 0) & (F.col("word") != "s") & (F.col("word") != "e"))
# Get top 100 most common words
freq100_econ_subs = word_counts.orderBy(F.desc("count")).limit(100).toPandas()
# Rename columns
freq100_econ_subs = freq100_econ_subs.rename(columns={"word": "word", "count": "word_count"})
freq100_econ_subs.to_csv('../../data/csv/wordcount_econ_subs.csv',index=False)
Determine the most common words among politics comments body¶
In [ ]:
pol_comms_body_words = comments_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))
# Flatten words array
words_flat = pol_comms_body_words.select(F.explode("cleaned_body").alias("word"))
# Count word occurrences
word_counts = words_flat.groupBy("word").count()
# Get top 100 most common words
freq100_pol_comms = word_counts.orderBy(F.desc("count")).limit(100).toPandas()
# Rename columns
freq100_pol_comms = freq100_pol_comms.rename(columns={"word": "word", "count": "word_count"})
freq100_pol_comms.to_csv('../../data/csv/wordcount_pol_comms.csv',index=False)
Determine the most common words among economics comments body¶
In [ ]:
econ_comms_body_words = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))
# Flatten words array
words_flat = econ_comms_body_words.select(F.explode("cleaned_body").alias("word"))
# Count word occurrences
word_counts = words_flat.groupBy("word").count()
# Get top 100 most common words
freq100_econ_comms = word_counts.orderBy(F.desc("count")).limit(100).toPandas()
# Rename columns
freq100_econ_comms = freq100_econ_comms.rename(columns={"word": "word", "count": "word_count"})
freq100_econ_comms.to_csv('../../data/csv/wordcount_econ_comms.csv',index=False)
Distribution of text lengths of politics subreddits submissions body¶
In [ ]:
# Calculate text lengths
text_lengths = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
.select(F.length("selftext").alias("text_length"))
# Convert Spark dataframe to Pandas dataframe
pandas_df_pol_subs = text_lengths.toPandas()
In [ ]:
# Plot distribution of text lengths
plt.hist(pandas_df_pol_subs["text_length"], bins=25)
plt.xlabel("Text Length (Words)")
plt.ylabel("Count")
plt.title("Politics Subreddits Submissions - Distribution of Text Lengths")
plt.savefig("../../data/plots/text_length_pol_subs.png")
plt.show()
Distribution of text lengths of economics subreddits submissions body¶
In [ ]:
# Calculate text lengths
text_lengths = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
.select(F.length("selftext").alias("text_length"))
# Convert Spark dataframe to Pandas dataframe
pandas_df_econ_subs = text_lengths.toPandas()
In [ ]:
# Plot distribution of text lengths
plt.hist(pandas_df_econ_subs["text_length"], bins=50)
plt.xlabel("Text Length (Words)")
plt.ylabel("Count")
plt.title("Economics Subreddits Submissions - Distribution of Text Lengths")
plt.savefig("../../data/plots/text_length_econ_subs.png")
plt.show()
Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out at java.base/java.net.PlainSocketImpl.socketAccept(Native Method) at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:458) at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565) at java.base/java.net.ServerSocket.accept(ServerSocket.java:533) at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
Distribution of text lengths of politics subreddits comments body¶
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Calculate text lengths
text_lengths = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
.select(F.length("selftext").alias("text_length"))
# Convert Spark dataframe to Pandas dataframe
pandas_df_pol_comms = text_lengths.toPandas()
In [ ]:
import matplotlib.pyplot as plt
# Plot distribution of text lengths
plt.hist(pandas_df_pol_comms["text_length"], bins=50)
plt.xlabel("Text Length (Words)")
plt.ylabel("Count")
plt.title("Politics Subreddits Comments - Distribution of Text Lengths")
plt.savefig("../../data/plots/text_length_pol_comms.png")
plt.show()
Distribution of text lengths of economics subreddits comments body¶
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Calculate text lengths
text_lengths = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
.select(F.length("body").alias("text_length"))
# Convert Spark dataframe to Pandas dataframe
pandas_df_econ_comms = text_lengths.toPandas()
In [ ]:
# Plot distribution of text lengths
plt.hist(pandas_df_econ_comms["text_length"], bins=50)
plt.xlabel("Text Length (Words)")
plt.ylabel("Count")
plt.title("Economics Subreddits Comments - Distribution of Text Lengths", y=1.05)
plt.savefig("../../data/plots/text_length_econ_comms.png")
plt.show()
Variation in text length over time¶
In [ ]:
# filter for subreddits + reformat date
econ_subs_body_title_words = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
.withColumn('created_utc', F.date_format('created_utc', 'yyyy-MM-dd'))
pol_subs_body_title_words = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
.withColumn('created_utc', F.date_format('created_utc', 'yyyy-MM-dd'))
pol_comms_body_words = comments_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
.withColumn('created_utc', F.date_format('created_utc', 'yyyy-MM-dd'))
econ_comms_body_words = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
.withColumn('created_utc', F.date_format('created_utc', 'yyyy-MM-dd'))
# create year + month cols
econ_subs_body_title_words = econ_subs_body_title_words.withColumn('year', F.year('created_utc')).withColumn('month', F.month('created_utc'))
pol_subs_body_title_words = pol_subs_body_title_words.withColumn('year', F.year('created_utc')).withColumn('month', F.month('created_utc'))
pol_comms_body_words = pol_comms_body_words.withColumn('year', F.year('created_utc')).withColumn('month', F.month('created_utc'))
econ_comms_body_words = econ_comms_body_words.withColumn('year', F.year('created_utc')).withColumn('month', F.month('created_utc'))
In [ ]:
# Add a column for wordcount
econ_subs_body_title_words = econ_subs_body_title_words.withColumn('word_count', F.size(F.split('selftext', ' ')))
pol_subs_body_title_words = pol_subs_body_title_words.withColumn('word_count', F.size(F.split('selftext', ' ')))
pol_comms_body_words = pol_comms_body_words.withColumn('word_count', F.size(F.split('body', ' ')))
econ_comms_body_words = econ_comms_body_words.withColumn('word_count', F.size(F.split('body', ' ')))
In [ ]:
# Grouping by month and year of the date_column
econ_subs_wordcount_grouped = econ_subs_body_title_words.groupBy("year", "month").agg(F.avg("word_count").alias("avg_count"))
# Sorting the DataFrame by year and month
econ_subs_wordcount_grouped = econ_subs_wordcount_grouped.sort(F.asc("year"), F.asc("month"))
# Grouping by month and year of the date_column
pol_subs_wordcount_grouped = pol_subs_body_title_words.groupBy("year", "month").agg(F.avg("word_count").alias("avg_count"))
# Sorting the DataFrame by year and month
pol_subs_wordcount_grouped = pol_subs_wordcount_grouped.sort(F.asc("year"), F.asc("month"))
# Grouping by month and year of the date_column
pol_comms_wordcount_grouped = pol_comms_body_words.groupBy("year", "month").agg(F.avg("word_count").alias("avg_count"))
# Sorting the DataFrame by year and month
pol_comms_wordcount_grouped = pol_comms_wordcount_grouped.sort(F.asc("year"), F.asc("month"))
# Grouping by month and year of the date_column
econ_comms_wordcount_grouped = econ_comms_body_words.groupBy("year", "month").agg(F.avg("word_count").alias("avg_count"))
# Sorting the DataFrame by year and month
econ_comms_wordcount_grouped = econ_comms_wordcount_grouped.sort(F.asc("year"), F.asc("month"))
# Converting Spark DataFrames to Pandas DataFrames
econ_subs_wordcount_grouped = econ_subs_wordcount_grouped.toPandas()
pol_subs_wordcount_grouped = pol_subs_wordcount_grouped.toPandas()
pol_comms_wordcount_grouped = pol_comms_wordcount_grouped.toPandas()
econ_comms_wordcount_grouped = econ_comms_wordcount_grouped.toPandas()
# Add subreddit+post column
econ_subs_wordcount_grouped['type'] = 'Economics Subreddits Submissions'
pol_subs_wordcount_grouped['type'] = 'Politics Subreddits Submissions'
pol_comms_wordcount_grouped['type'] = 'Politics Subreddits Comments'
econ_comms_wordcount_grouped['type'] = 'Economics Subreddits Comments'
# Outputting Pandas DataFrames to CSV files
# econ_subs_wordcount_grouped.to_csv("../data/csv/wordcount_econ_subs.csv", index=False)
# pol_subs_wordcount_grouped.to_csv("../data/csv/wordcount_pol_subs.csv", index=False)
# pol_comms_wordcount_grouped.to_csv("../data/csv/wordcount_pol_comms.csv", index=False)
# econ_comms_wordcount_grouped.to_csv("../data/csv/wordcount_econ_comms.csv", index=False)
In [ ]:
df_textlength = pd.concat([econ_subs_wordcount_grouped, pol_subs_wordcount_grouped, pol_comms_wordcount_grouped, econ_comms_wordcount_grouped], ignore_index=True)
df_textlength[['month', 'year']] = df_textlength[['month', 'year']].astype(str)
df_textlength['month_year'] = df_textlength.apply(lambda x: pd.to_datetime(x['month'] + ' ' + x['year'], format='%m %Y').strftime('%b %Y'), axis=1)
df_textlength = df_textlength[['month_year','avg_count','type']]
df_textlength['avg_count'] = df_textlength['avg_count'].round(2)
# Outputting Appended Pandas DataFrames to CSV files
df_textlength.to_csv("../../data/csv/wordcount_all.csv", index=False)
In [ ]:
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (5,2)
plt.rcParams["figure.dpi"] = 200
# Set font size
plt.rcParams.update({'font.size': 8})
# Create plot
fig, ax = plt.subplots(figsize=(10, 4))
for name, group in df_textlength.groupby('type'):
ax.plot(group['month_year'], group['avg_count'], label=name)
# Set labels and title
ax.set_xlabel(None)
ax.set_ylabel('Average Word Count')
ax.set_title('Average Length of Politics and Economics Subreddits - Submissions and Comments')
ax.set_xticklabels(df_textlength['month_year'], rotation=45)
# Set legend
legend = ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.2), ncol=len(df_textlength['type'].unique()), fontsize=8, title=None)
# Remove the box around the legend
legend.get_frame().set_linewidth(0.0)
# Save plot
plt.savefig("../../data/plots/wordcount_over_time.png", bbox_inches='tight')
# Show plot
plt.show()
/tmp/ipykernel_5969/1166014668.py:18: UserWarning: FixedFormatter should only be used together with FixedLocator ax.set_xticklabels(df_textlength['month_year'], rotation=45)
LDA with gensim and pyLDAvis¶
Politics Subreddits Submissions¶
In [ ]:
!pip install gensim
Requirement already satisfied: gensim in /opt/conda/lib/python3.10/site-packages (4.1.2) Requirement already satisfied: numpy>=1.17.0 in /opt/conda/lib/python3.10/site-packages (from gensim) (1.26.0) Requirement already satisfied: scipy>=0.18.1 in /opt/conda/lib/python3.10/site-packages (from gensim) (1.11.3) Requirement already satisfied: smart-open>=1.8.1 in /opt/conda/lib/python3.10/site-packages (from gensim) (5.2.1) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip
In [ ]:
!pip install pyLDAvis
Collecting pyLDAvis Downloading pyLDAvis-3.4.1-py3-none-any.whl (2.6 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.6/2.6 MB 24.6 MB/s eta 0:00:0000:01 Requirement already satisfied: numpy>=1.24.2 in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (1.26.0) Requirement already satisfied: scipy in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (1.11.3) Collecting pandas>=2.0.0 (from pyLDAvis) Obtaining dependency information for pandas>=2.0.0 from https://files.pythonhosted.org/packages/1b/fa/4e5d054549faf1524230ffcd57ca98bb7350a4ed62ef722daabde4cb7632/pandas-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata Downloading pandas-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB) Requirement already satisfied: joblib>=1.2.0 in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (1.3.2) Requirement already satisfied: jinja2 in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (3.1.2) Requirement already satisfied: numexpr in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (2.8.3) Collecting funcy (from pyLDAvis) Downloading funcy-2.0-py2.py3-none-any.whl (30 kB) Requirement already satisfied: scikit-learn>=1.0.0 in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (1.0.1) Requirement already satisfied: gensim in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (4.1.2) Requirement already satisfied: setuptools in /opt/conda/lib/python3.10/site-packages (from pyLDAvis) (68.2.2) Requirement already satisfied: python-dateutil>=2.8.2 in /opt/conda/lib/python3.10/site-packages (from pandas>=2.0.0->pyLDAvis) (2.8.2) Requirement already satisfied: pytz>=2020.1 in /opt/conda/lib/python3.10/site-packages (from pandas>=2.0.0->pyLDAvis) (2022.1) Collecting tzdata>=2022.1 (from pandas>=2.0.0->pyLDAvis) Downloading tzdata-2023.3-py2.py3-none-any.whl (341 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 341.8/341.8 kB 6.2 MB/s eta 0:00:0000:01 Requirement already satisfied: threadpoolctl>=2.0.0 in /opt/conda/lib/python3.10/site-packages (from scikit-learn>=1.0.0->pyLDAvis) (2.2.0) Requirement already satisfied: smart-open>=1.8.1 in /opt/conda/lib/python3.10/site-packages (from gensim->pyLDAvis) (5.2.1) Requirement already satisfied: MarkupSafe>=2.0 in /opt/conda/lib/python3.10/site-packages (from jinja2->pyLDAvis) (2.1.3) Requirement already satisfied: packaging in /opt/conda/lib/python3.10/site-packages (from numexpr->pyLDAvis) (21.3) Requirement already satisfied: six>=1.5 in /opt/conda/lib/python3.10/site-packages (from python-dateutil>=2.8.2->pandas>=2.0.0->pyLDAvis) (1.16.0) Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /opt/conda/lib/python3.10/site-packages (from packaging->numexpr->pyLDAvis) (3.0.9) Downloading pandas-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 12.3/12.3 MB 42.0 MB/s eta 0:00:00:00:010:01 Installing collected packages: funcy, tzdata, pandas, pyLDAvis Attempting uninstall: pandas Found existing installation: pandas 1.4.4 Uninstalling pandas-1.4.4: Successfully uninstalled pandas-1.4.4 ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts. autovizwidget 0.21.0 requires pandas<2.0.0,>=0.20.1, but you have pandas 2.1.3 which is incompatible. hdijupyterutils 0.21.0 requires pandas<2.0.0,>=0.17.1, but you have pandas 2.1.3 which is incompatible. panel 0.13.1 requires bokeh<2.5.0,>=2.4.0, but you have bokeh 3.3.0 which is incompatible. sagemaker-datawrangler 0.4.3 requires sagemaker-data-insights==0.4.0, but you have sagemaker-data-insights 0.3.3 which is incompatible. Successfully installed funcy-2.0 pandas-2.1.3 pyLDAvis-3.4.1 tzdata-2023.3 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip
In [ ]:
#Gensim
import gensim
import gensim.corpora as corpora
#vis
import pyLDAvis
import pyLDAvis.gensim
/opt/conda/lib/python3.10/site-packages/sklearn/utils/multiclass.py:14: DeprecationWarning: Please use `spmatrix` from the `scipy.sparse` namespace, the `scipy.sparse.base` namespace is deprecated. from scipy.sparse.base import spmatrix /opt/conda/lib/python3.10/site-packages/sklearn/utils/optimize.py:18: DeprecationWarning: Please use `line_search_wolfe2` from the `scipy.optimize` namespace, the `scipy.optimize.linesearch` namespace is deprecated. from scipy.optimize.linesearch import line_search_wolfe2, line_search_wolfe1 /opt/conda/lib/python3.10/site-packages/sklearn/utils/optimize.py:18: DeprecationWarning: Please use `line_search_wolfe1` from the `scipy.optimize` namespace, the `scipy.optimize.linesearch` namespace is deprecated. from scipy.optimize.linesearch import line_search_wolfe2, line_search_wolfe1
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
pol_subs_body_title_words = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits))\
.select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
.alias("nlpbodytext"))
# Function to tokenize a text into a list of words
def tokenize_text(text):
return text.split()
# Define a UDF to tokenize the text
tokenize_udf = F.udf(tokenize_text, ArrayType(StringType()))
# Apply the UDF to tokenize the text and create a new column
pol_subs_body_title_words = pol_subs_body_title_words.withColumn('tokenized_text', tokenize_udf(pol_subs_body_title_words['nlpbodytext']))
# Collect the tokenized text as a list of lists
texts = [row['tokenized_text'] for row in pol_subs_body_title_words.select('tokenized_text').collect()]
# Create the Gensim dictionary
id2word = corpora.Dictionary(texts)
# Create the corpus
corpus = [id2word.doc2bow(tokens) for tokens in texts]
print(corpus[0][0:20])
print(id2word[[0][:1][0]])
[(0, 2), (1, 1), (2, 1), (3, 1), (4, 2), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1), (10, 1), (11, 1), (12, 1), (13, 1), (14, 1), (15, 1), (16, 1), (17, 1), (18, 1), (19, 1)] actually
In [ ]:
lda_model = gensim.models.ldamodel.LdaModel(corpus=corpus,
id2word=id2word,
num_topics=5,
random_state=100,
update_every=1,
chunksize=100,
passes=10,
alpha="auto")
In [ ]:
pyLDAvis.enable_notebook()
vis = pyLDAvis.gensim.prepare(lda_model, corpus, id2word, mds="mmds", R=30, n_jobs=1)
vis
/opt/conda/lib/python3.10/site-packages/pandas/core/dtypes/cast.py:1841: DeprecationWarning: np.find_common_type is deprecated. Please use `np.result_type` or `np.promote_types`. See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information. (Deprecated NumPy 1.25) with warnings.catch_warnings():
Out[ ]:
In [ ]:
pyLDAvis.save_html(vis, '../../data/plots/pol_subs_lda.html')
Economics Subreddits Submissions¶
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
econ_subs_body_title_words = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
.select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
.alias("nlpbodytext"))
substr_to_remove = ["http", "https", "tinyurl", "com", "www", "jpg", "uploads", "delete", "remove", ]
regex = "|".join(substr_to_remove)
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("nlpbodytext", F.regexp_replace("nlpbodytext", regex, " "))
# Function to tokenize a text into a list of words
def tokenize_text(text):
return text.split()
# Define a UDF to tokenize the text
tokenize_udf = F.udf(tokenize_text, ArrayType(StringType()))
# Apply the UDF to tokenize the text and create a new column
econ_subs_body_title_words = econ_subs_body_title_words.withColumn('tokenized_text', tokenize_udf(econ_subs_body_title_words['nlpbodytext']))
# Collect the tokenized text as a list of lists
texts = [row['tokenized_text'] for row in econ_subs_body_title_words.select('tokenized_text').collect()]
# Create the Gensim dictionary
id2word = corpora.Dictionary(texts)
# Create the corpus
corpus = [id2word.doc2bow(tokens) for tokens in texts]
print(corpus[0][0:20])
print(id2word[[0][:1][0]])
[(0, 1)] shubham
In [ ]:
lda_model = gensim.models.ldamodel.LdaModel(corpus=corpus,
id2word=id2word,
num_topics=5,
random_state=100,
update_every=1,
chunksize=100,
passes=10,
alpha="auto")
In [ ]:
pyLDAvis.enable_notebook()
vis = pyLDAvis.gensim.prepare(lda_model, corpus, id2word, mds="mmds", R=30, n_jobs=1)
vis
/opt/conda/lib/python3.10/site-packages/pandas/core/dtypes/cast.py:1841: DeprecationWarning: np.find_common_type is deprecated. Please use `np.result_type` or `np.promote_types`. See https://numpy.org/devdocs/release/1.25.0-notes.html and the docs for more information. (Deprecated NumPy 1.25) with warnings.catch_warnings():
Out[ ]:
In [ ]:
pyLDAvis.save_html(vis, '../../data/plots/econ_subs_lda.html')