Setup¶
In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.3.0
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.9.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.9.0 # All requested packages already installed. Note: you may need to restart the kernel to use updated packages. Requirement already satisfied: pyspark==3.3.0 in /opt/conda/lib/python3.10/site-packages (3.3.0) Requirement already satisfied: py4j==0.10.9.5 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.3.0) (0.10.9.5) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
Out[ ]:
Create Spark Session¶
In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("PySparkApp")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider",
)
.getOrCreate()
)
print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-a796d366-066e-4b3d-ae96-4501093dd79b;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.2.2 in central found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central :: resolution report :: resolve 255ms :: artifacts dl 17ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default] org.apache.hadoop#hadoop-aws;3.2.2 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-a796d366-066e-4b3d-ae96-4501093dd79b confs: [default] 0 artifacts copied, 2 already retrieved (0kB/12ms)
23/11/03 23:16:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/03 23:16:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 23/11/03 23:16:45 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 3.3.0
Check Cross Account S3 Access¶
In [ ]:
! aws s3 ls s3://sagemaker-us-east-1-224518912016/project/
PRE cleaned/ PRE comments/ PRE submissions/
In [ ]:
! aws s3 ls s3://sagemaker-us-east-1-711387073580/project/
PRE cleaned/ PRE comments/ PRE submissions/
In [ ]:
! aws s3 ls s3://sagemaker-us-east-1-433974840707/project/
PRE cleaned/ PRE comments/ PRE submissions/
Read in data¶
Comments¶
In [ ]:
import sagemaker
session = sagemaker.Session()
# Create or retrieve a Spark session
spark = SparkSession.builder.appName("ReadS3Parquet").getOrCreate()
# S3 directory path
s3_directory1 = f"s3a://sagemaker-us-east-1-395393721134/project/cleaned/comments/"
s3_directory2 = f"s3a://sagemaker-us-east-1-433974840707/project/cleaned/comments/"
s3_directory3 = f"s3a://sagemaker-us-east-1-711387073580/project/cleaned/comments/"
s3_directory4 = f"s3a://sagemaker-us-east-1-224518912016/project/cleaned/comments/"
# Read all the Parquet files in the directory into a DataFrame
df_comments1 = spark.read.parquet(s3_directory1)
df_comments2 = spark.read.parquet(s3_directory2)
df_comments3 = spark.read.parquet(s3_directory3)
df_comments4 = spark.read.parquet(s3_directory4)
df_comments = df_comments1.union(df_comments2).union(df_comments3).union(df_comments4)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml 23/11/03 23:16:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect. 23/11/03 23:16:51 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
In [ ]:
# check counts (ensuring all needed subreddits exist)
df_comments.groupBy('subreddit').count().show()
# get cols
print(df_comments.columns)
[Stage 4:========================================================>(58 + 1) / 59]
+------------+-------+ | subreddit| count| +------------+-------+ | socialism| 371369| | Liberal| 96396| | Economics|1428423| | Libertarian|2706903| | centrist| 921871| |changemyview|3909587| |Ask_Politics| 60149| | finance| 137118| |Conservative|5231661| +------------+-------+ ['author', 'author_cakeday', 'author_flair_css_class', 'body', 'can_gild', 'controversiality', 'created_utc', 'distinguished', 'edited', 'gilded', 'id', 'is_submitter', 'link_id', 'parent_id', 'permalink', 'retrieved_on', 'score', 'stickied', 'subreddit', 'subreddit_id']
In [ ]:
df_comments.printSchema()
root |-- author: string (nullable = true) |-- author_cakeday: boolean (nullable = true) |-- author_flair_css_class: string (nullable = true) |-- body: string (nullable = true) |-- can_gild: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- created_utc: timestamp (nullable = true) |-- distinguished: string (nullable = true) |-- edited: string (nullable = true) |-- gilded: long (nullable = true) |-- id: string (nullable = true) |-- is_submitter: boolean (nullable = true) |-- link_id: string (nullable = true) |-- parent_id: string (nullable = true) |-- permalink: string (nullable = true) |-- retrieved_on: timestamp (nullable = true) |-- score: long (nullable = true) |-- stickied: boolean (nullable = true) |-- subreddit: string (nullable = true) |-- subreddit_id: string (nullable = true)
Submissions¶
In [ ]:
# S3 directory path
s3_directory1 = f"s3a://sagemaker-us-east-1-395393721134/project/cleaned/submissions/"
s3_directory2 = f"s3a://sagemaker-us-east-1-433974840707/project/cleaned/submissions/"
s3_directory3 = f"s3a://sagemaker-us-east-1-711387073580/project/cleaned/submissions/"
s3_directory4 = f"s3a://sagemaker-us-east-1-224518912016/project/cleaned/submissions/"
# Read all the Parquet files in the directory into a DataFrame
df_submissions1 = spark.read.parquet(s3_directory1)
df_submissions2 = spark.read.parquet(s3_directory2)
df_submissions3 = spark.read.parquet(s3_directory3)
df_submissions4 = spark.read.parquet(s3_directory4)
df_submissions = df_submissions1.union(df_submissions2).union(df_submissions3).union(df_submissions4)
23/11/03 23:18:28 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
In [ ]:
# check counts (ensuring all needed subreddits exist)
df_submissions.groupBy('subreddit').count().show()
# get cols
print(df_submissions.columns)
[Stage 11:====================================================> (15 + 1) / 16]
+------------+------+ | subreddit| count| +------------+------+ | socialism| 40094| | Liberal| 11086| | Economics| 40604| | Libertarian| 51153| | centrist| 13594| |changemyview| 64632| |Ask_Politics| 5903| | finance| 28904| |Conservative|343938| +------------+------+ ['author', 'title', 'selftext', 'subreddit', 'score', 'num_comments', 'permalink', 'created_utc', 'url', 'domain', 'is_video', 'is_self', 'is_reddit_media_domain', 'spoiler', 'over_18', 'stickied', 'thumbnail', 'media', 'secure_media', 'gilded', 'archived', 'distinguished', 'crosspost_parent', 'crosspost_parent_list']
In [ ]:
df_submissions.printSchema()
root |-- author: string (nullable = true) |-- title: string (nullable = true) |-- selftext: string (nullable = true) |-- subreddit: string (nullable = true) |-- score: long (nullable = true) |-- num_comments: long (nullable = true) |-- permalink: string (nullable = true) |-- created_utc: timestamp (nullable = true) |-- url: string (nullable = true) |-- domain: string (nullable = true) |-- is_video: boolean (nullable = true) |-- is_self: boolean (nullable = true) |-- is_reddit_media_domain: boolean (nullable = true) |-- spoiler: boolean (nullable = true) |-- over_18: boolean (nullable = true) |-- stickied: boolean (nullable = true) |-- thumbnail: string (nullable = true) |-- media: struct (nullable = true) | |-- event_id: string (nullable = true) | |-- oembed: struct (nullable = true) | | |-- author_name: string (nullable = true) | | |-- author_url: string (nullable = true) | | |-- cache_age: long (nullable = true) | | |-- description: string (nullable = true) | | |-- height: long (nullable = true) | | |-- html: string (nullable = true) | | |-- provider_name: string (nullable = true) | | |-- provider_url: string (nullable = true) | | |-- thumbnail_height: long (nullable = true) | | |-- thumbnail_url: string (nullable = true) | | |-- thumbnail_width: long (nullable = true) | | |-- title: string (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- version: string (nullable = true) | | |-- width: long (nullable = true) | |-- reddit_video: struct (nullable = true) | | |-- dash_url: string (nullable = true) | | |-- duration: long (nullable = true) | | |-- fallback_url: string (nullable = true) | | |-- height: long (nullable = true) | | |-- hls_url: string (nullable = true) | | |-- is_gif: boolean (nullable = true) | | |-- scrubber_media_url: string (nullable = true) | | |-- transcoding_status: string (nullable = true) | | |-- width: long (nullable = true) | |-- type: string (nullable = true) |-- secure_media: struct (nullable = true) | |-- event_id: string (nullable = true) | |-- oembed: struct (nullable = true) | | |-- author_name: string (nullable = true) | | |-- author_url: string (nullable = true) | | |-- cache_age: long (nullable = true) | | |-- description: string (nullable = true) | | |-- height: long (nullable = true) | | |-- html: string (nullable = true) | | |-- provider_name: string (nullable = true) | | |-- provider_url: string (nullable = true) | | |-- thumbnail_height: long (nullable = true) | | |-- thumbnail_url: string (nullable = true) | | |-- thumbnail_width: long (nullable = true) | | |-- title: string (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- version: string (nullable = true) | | |-- width: long (nullable = true) | |-- type: string (nullable = true) |-- gilded: long (nullable = true) |-- archived: boolean (nullable = true) |-- distinguished: string (nullable = true) |-- crosspost_parent: string (nullable = true) |-- crosspost_parent_list: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- approved_at_utc: string (nullable = true) | | |-- approved_by: string (nullable = true) | | |-- archived: boolean (nullable = true) | | |-- author: string (nullable = true) | | |-- author_flair_css_class: string (nullable = true) | | |-- author_flair_text: string (nullable = true) | | |-- banned_at_utc: string (nullable = true) | | |-- banned_by: string (nullable = true) | | |-- brand_safe: boolean (nullable = true) | | |-- can_gild: boolean (nullable = true) | | |-- can_mod_post: boolean (nullable = true) | | |-- clicked: boolean (nullable = true) | | |-- contest_mode: boolean (nullable = true) | | |-- created: double (nullable = true) | | |-- created_utc: double (nullable = true) | | |-- distinguished: string (nullable = true) | | |-- domain: string (nullable = true) | | |-- downs: long (nullable = true) | | |-- edited: boolean (nullable = true) | | |-- gilded: long (nullable = true) | | |-- hidden: boolean (nullable = true) | | |-- hide_score: boolean (nullable = true) | | |-- id: string (nullable = true) | | |-- is_crosspostable: boolean (nullable = true) | | |-- is_reddit_media_domain: boolean (nullable = true) | | |-- is_self: boolean (nullable = true) | | |-- is_video: boolean (nullable = true) | | |-- likes: string (nullable = true) | | |-- link_flair_css_class: string (nullable = true) | | |-- link_flair_text: string (nullable = true) | | |-- locked: boolean (nullable = true) | | |-- media: string (nullable = true) | | |-- mod_reports: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- name: string (nullable = true) | | |-- num_comments: long (nullable = true) | | |-- num_crossposts: long (nullable = true) | | |-- num_reports: string (nullable = true) | | |-- over_18: boolean (nullable = true) | | |-- parent_whitelist_status: string (nullable = true) | | |-- permalink: string (nullable = true) | | |-- pinned: boolean (nullable = true) | | |-- quarantine: boolean (nullable = true) | | |-- removal_reason: string (nullable = true) | | |-- report_reasons: string (nullable = true) | | |-- saved: boolean (nullable = true) | | |-- score: long (nullable = true) | | |-- secure_media: string (nullable = true) | | |-- selftext: string (nullable = true) | | |-- selftext_html: string (nullable = true) | | |-- spoiler: boolean (nullable = true) | | |-- stickied: boolean (nullable = true) | | |-- subreddit: string (nullable = true) | | |-- subreddit_id: string (nullable = true) | | |-- subreddit_name_prefixed: string (nullable = true) | | |-- subreddit_type: string (nullable = true) | | |-- suggested_sort: string (nullable = true) | | |-- thumbnail: string (nullable = true) | | |-- thumbnail_height: string (nullable = true) | | |-- thumbnail_width: string (nullable = true) | | |-- title: string (nullable = true) | | |-- ups: long (nullable = true) | | |-- url: string (nullable = true) | | |-- user_reports: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- view_count: string (nullable = true) | | |-- visited: boolean (nullable = true) | | |-- whitelist_status: string (nullable = true)
Data Cleanliness Check¶
In [ ]:
# necessary imports
from pyspark.sql.functions import col, year, month, concat, date_format, avg, min, max, countDistinct, isnan, when, countss
In [ ]:
# count empty, missing, or NaN values in comms body column (removing empty comments)
comms_missing = df_comments.where(col('body').isNull() | (col('body') == "") | isnan(col('body'))).count()
print(f"Empty, Missing, or NaN values in comms body col: {comms_missing}")
# count empty, missing, or NaN values in subs (where both title and selftext are both empty/missing/NaN; removing empty posts)
subs_missing = df_submissions.where((col('title').isNull() | (col('title') == "")) & (col('selftext').isNull() | (col('selftext') == ""))).count()
print(f"\nEmpty, Missing, or NaN values in subs [title, selftext] cols: {subs_missing}")
Empty, Missing, or NaN values in comms body col: 0
[Stage 17:====================================================> (15 + 1) / 16]
Empty, Missing, or NaN values in subs [title, selftext] cols: 0
In [ ]: