Setup¶

In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.3.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done
Solving environment: done


==> WARNING: A newer version of conda exists. <==
  current version: 23.3.1
  latest version: 23.9.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.9.0



# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.
Requirement already satisfied: pyspark==3.3.0 in /opt/conda/lib/python3.10/site-packages (3.3.0)
Requirement already satisfied: py4j==0.10.9.5 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.3.0) (0.10.9.5)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Out[ ]:

Create Spark Session¶

In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a796d366-066e-4b3d-ae96-4501093dd79b;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 255ms :: artifacts dl 17ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-a796d366-066e-4b3d-ae96-4501093dd79b
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/12ms)
23/11/03 23:16:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/03 23:16:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/11/03 23:16:45 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
3.3.0

Check Cross Account S3 Access¶

In [ ]:
! aws s3 ls s3://sagemaker-us-east-1-224518912016/project/
                           PRE cleaned/
                           PRE comments/
                           PRE submissions/
In [ ]:
! aws s3 ls s3://sagemaker-us-east-1-711387073580/project/ 
                           PRE cleaned/
                           PRE comments/
                           PRE submissions/
In [ ]:
! aws s3 ls s3://sagemaker-us-east-1-433974840707/project/ 
                           PRE cleaned/
                           PRE comments/
                           PRE submissions/

Read in data¶

Comments¶

In [ ]:
import sagemaker

session = sagemaker.Session()

# Create or retrieve a Spark session
spark = SparkSession.builder.appName("ReadS3Parquet").getOrCreate()

# S3 directory path
s3_directory1 = f"s3a://sagemaker-us-east-1-395393721134/project/cleaned/comments/"
s3_directory2 = f"s3a://sagemaker-us-east-1-433974840707/project/cleaned/comments/"
s3_directory3 = f"s3a://sagemaker-us-east-1-711387073580/project/cleaned/comments/"
s3_directory4 = f"s3a://sagemaker-us-east-1-224518912016/project/cleaned/comments/"


# Read all the Parquet files in the directory into a DataFrame
df_comments1 = spark.read.parquet(s3_directory1)
df_comments2 = spark.read.parquet(s3_directory2)
df_comments3 = spark.read.parquet(s3_directory3)
df_comments4 = spark.read.parquet(s3_directory4)

df_comments = df_comments1.union(df_comments2).union(df_comments3).union(df_comments4)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
23/11/03 23:16:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
23/11/03 23:16:51 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                
In [ ]:
# check counts (ensuring all needed subreddits exist)
df_comments.groupBy('subreddit').count().show()

# get cols
print(df_comments.columns)
[Stage 4:========================================================>(58 + 1) / 59]
+------------+-------+
|   subreddit|  count|
+------------+-------+
|   socialism| 371369|
|     Liberal|  96396|
|   Economics|1428423|
| Libertarian|2706903|
|    centrist| 921871|
|changemyview|3909587|
|Ask_Politics|  60149|
|     finance| 137118|
|Conservative|5231661|
+------------+-------+

['author', 'author_cakeday', 'author_flair_css_class', 'body', 'can_gild', 'controversiality', 'created_utc', 'distinguished', 'edited', 'gilded', 'id', 'is_submitter', 'link_id', 'parent_id', 'permalink', 'retrieved_on', 'score', 'stickied', 'subreddit', 'subreddit_id']
                                                                                
In [ ]:
df_comments.printSchema()
root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)

Submissions¶

In [ ]:
# S3 directory path
s3_directory1 = f"s3a://sagemaker-us-east-1-395393721134/project/cleaned/submissions/"
s3_directory2 = f"s3a://sagemaker-us-east-1-433974840707/project/cleaned/submissions/"
s3_directory3 = f"s3a://sagemaker-us-east-1-711387073580/project/cleaned/submissions/"
s3_directory4 = f"s3a://sagemaker-us-east-1-224518912016/project/cleaned/submissions/"

# Read all the Parquet files in the directory into a DataFrame
df_submissions1 = spark.read.parquet(s3_directory1)
df_submissions2 = spark.read.parquet(s3_directory2)
df_submissions3 = spark.read.parquet(s3_directory3)
df_submissions4 = spark.read.parquet(s3_directory4)

df_submissions = df_submissions1.union(df_submissions2).union(df_submissions3).union(df_submissions4)
23/11/03 23:18:28 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
In [ ]:
# check counts (ensuring all needed subreddits exist)
df_submissions.groupBy('subreddit').count().show()

# get cols
print(df_submissions.columns)
[Stage 11:====================================================>   (15 + 1) / 16]
+------------+------+
|   subreddit| count|
+------------+------+
|   socialism| 40094|
|     Liberal| 11086|
|   Economics| 40604|
| Libertarian| 51153|
|    centrist| 13594|
|changemyview| 64632|
|Ask_Politics|  5903|
|     finance| 28904|
|Conservative|343938|
+------------+------+

['author', 'title', 'selftext', 'subreddit', 'score', 'num_comments', 'permalink', 'created_utc', 'url', 'domain', 'is_video', 'is_self', 'is_reddit_media_domain', 'spoiler', 'over_18', 'stickied', 'thumbnail', 'media', 'secure_media', 'gilded', 'archived', 'distinguished', 'crosspost_parent', 'crosspost_parent_list']
                                                                                
In [ ]:
df_submissions.printSchema()
root
 |-- author: string (nullable = true)
 |-- title: string (nullable = true)
 |-- selftext: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- score: long (nullable = true)
 |-- num_comments: long (nullable = true)
 |-- permalink: string (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- url: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- is_video: boolean (nullable = true)
 |-- is_self: boolean (nullable = true)
 |-- is_reddit_media_domain: boolean (nullable = true)
 |-- spoiler: boolean (nullable = true)
 |-- over_18: boolean (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- thumbnail: string (nullable = true)
 |-- media: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- oembed: struct (nullable = true)
 |    |    |-- author_name: string (nullable = true)
 |    |    |-- author_url: string (nullable = true)
 |    |    |-- cache_age: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- provider_name: string (nullable = true)
 |    |    |-- provider_url: string (nullable = true)
 |    |    |-- thumbnail_height: long (nullable = true)
 |    |    |-- thumbnail_url: string (nullable = true)
 |    |    |-- thumbnail_width: long (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- version: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- reddit_video: struct (nullable = true)
 |    |    |-- dash_url: string (nullable = true)
 |    |    |-- duration: long (nullable = true)
 |    |    |-- fallback_url: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- hls_url: string (nullable = true)
 |    |    |-- is_gif: boolean (nullable = true)
 |    |    |-- scrubber_media_url: string (nullable = true)
 |    |    |-- transcoding_status: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- type: string (nullable = true)
 |-- secure_media: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- oembed: struct (nullable = true)
 |    |    |-- author_name: string (nullable = true)
 |    |    |-- author_url: string (nullable = true)
 |    |    |-- cache_age: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- provider_name: string (nullable = true)
 |    |    |-- provider_url: string (nullable = true)
 |    |    |-- thumbnail_height: long (nullable = true)
 |    |    |-- thumbnail_url: string (nullable = true)
 |    |    |-- thumbnail_width: long (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- version: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- type: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- archived: boolean (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- crosspost_parent: string (nullable = true)
 |-- crosspost_parent_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- approved_at_utc: string (nullable = true)
 |    |    |-- approved_by: string (nullable = true)
 |    |    |-- archived: boolean (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- author_flair_css_class: string (nullable = true)
 |    |    |-- author_flair_text: string (nullable = true)
 |    |    |-- banned_at_utc: string (nullable = true)
 |    |    |-- banned_by: string (nullable = true)
 |    |    |-- brand_safe: boolean (nullable = true)
 |    |    |-- can_gild: boolean (nullable = true)
 |    |    |-- can_mod_post: boolean (nullable = true)
 |    |    |-- clicked: boolean (nullable = true)
 |    |    |-- contest_mode: boolean (nullable = true)
 |    |    |-- created: double (nullable = true)
 |    |    |-- created_utc: double (nullable = true)
 |    |    |-- distinguished: string (nullable = true)
 |    |    |-- domain: string (nullable = true)
 |    |    |-- downs: long (nullable = true)
 |    |    |-- edited: boolean (nullable = true)
 |    |    |-- gilded: long (nullable = true)
 |    |    |-- hidden: boolean (nullable = true)
 |    |    |-- hide_score: boolean (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- is_crosspostable: boolean (nullable = true)
 |    |    |-- is_reddit_media_domain: boolean (nullable = true)
 |    |    |-- is_self: boolean (nullable = true)
 |    |    |-- is_video: boolean (nullable = true)
 |    |    |-- likes: string (nullable = true)
 |    |    |-- link_flair_css_class: string (nullable = true)
 |    |    |-- link_flair_text: string (nullable = true)
 |    |    |-- locked: boolean (nullable = true)
 |    |    |-- media: string (nullable = true)
 |    |    |-- mod_reports: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_comments: long (nullable = true)
 |    |    |-- num_crossposts: long (nullable = true)
 |    |    |-- num_reports: string (nullable = true)
 |    |    |-- over_18: boolean (nullable = true)
 |    |    |-- parent_whitelist_status: string (nullable = true)
 |    |    |-- permalink: string (nullable = true)
 |    |    |-- pinned: boolean (nullable = true)
 |    |    |-- quarantine: boolean (nullable = true)
 |    |    |-- removal_reason: string (nullable = true)
 |    |    |-- report_reasons: string (nullable = true)
 |    |    |-- saved: boolean (nullable = true)
 |    |    |-- score: long (nullable = true)
 |    |    |-- secure_media: string (nullable = true)
 |    |    |-- selftext: string (nullable = true)
 |    |    |-- selftext_html: string (nullable = true)
 |    |    |-- spoiler: boolean (nullable = true)
 |    |    |-- stickied: boolean (nullable = true)
 |    |    |-- subreddit: string (nullable = true)
 |    |    |-- subreddit_id: string (nullable = true)
 |    |    |-- subreddit_name_prefixed: string (nullable = true)
 |    |    |-- subreddit_type: string (nullable = true)
 |    |    |-- suggested_sort: string (nullable = true)
 |    |    |-- thumbnail: string (nullable = true)
 |    |    |-- thumbnail_height: string (nullable = true)
 |    |    |-- thumbnail_width: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- ups: long (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- user_reports: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- view_count: string (nullable = true)
 |    |    |-- visited: boolean (nullable = true)
 |    |    |-- whitelist_status: string (nullable = true)

Data Cleanliness Check¶

In [ ]:
# necessary imports
from pyspark.sql.functions import col, year, month, concat, date_format, avg, min, max, countDistinct, isnan, when, countss
In [ ]:
# count empty, missing, or NaN values in comms body column (removing empty comments)
comms_missing = df_comments.where(col('body').isNull() | (col('body') == "") | isnan(col('body'))).count()
print(f"Empty, Missing, or NaN values in comms body col: {comms_missing}")

# count empty, missing, or NaN values in subs (where both title and selftext are both empty/missing/NaN; removing empty posts)
subs_missing = df_submissions.where((col('title').isNull() | (col('title') == "")) & (col('selftext').isNull() | (col('selftext') == ""))).count()
print(f"\nEmpty, Missing, or NaN values in subs [title, selftext] cols: {subs_missing}")
                                                                                
Empty, Missing, or NaN values in comms body col: 0
[Stage 17:====================================================>   (15 + 1) / 16]
Empty, Missing, or NaN values in subs [title, selftext] cols: 0
                                                                                
In [ ]: