Setup¶
# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.3.0
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done
Solving environment: done
==> WARNING: A newer version of conda exists. <==
  current version: 23.3.1
  latest version: 23.9.0
Please update conda by running
    $ conda update -n base -c defaults conda
Or to minimize the number of packages updated during conda update use
     conda install conda=23.9.0
# All requested packages already installed.
Note: you may need to restart the kernel to use updated packages.
Requirement already satisfied: pyspark==3.3.0 in /opt/conda/lib/python3.10/site-packages (3.3.0)
Requirement already satisfied: py4j==0.10.9.5 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.3.0) (0.10.9.5)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
[notice] A new release of pip is available: 23.2.1 -> 23.3
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Create Spark Session¶
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)
print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-fa76f279-6531-4934-8c5c-c104c11f8cb3;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.2.2 in central found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central :: resolution report :: resolve 431ms :: artifacts dl 21ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default] org.apache.hadoop#hadoop-aws;3.2.2 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-fa76f279-6531-4934-8c5c-c104c11f8cb3 confs: [default] 0 artifacts copied, 2 already retrieved (0kB/24ms)
23/10/22 20:38:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.3.0
Read in the data¶
Comments¶
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
print(bucket)
# Create or retrieve a Spark session
spark = SparkSession.builder.appName("ReadS3Parquet").getOrCreate()
# S3 directory path
s3_directory = f"s3a://{bucket}/project/comments/"
# Read all the Parquet files in the directory into a DataFrame
df_comments = spark.read.parquet(s3_directory)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker-us-east-1-711387073580 23/10/22 20:39:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect. 23/10/22 20:39:01 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
print("Comments rows: "  , df_comments.count())
print("Comments columns: "  , len(df_comments.columns))
# check counts (ensuring all needed subreddits exist)
df_comments.groupBy('subreddit').count().show()
Comments rows: 1050936 Comments columns: 21
[Stage 4:====================================================> (9 + 1) / 10]
+------------+------+ | subreddit| count| +------------+------+ |changemyview|158144| | finance| 8476| | socialism| 17527| | Libertarian|186529| |Ask_Politics| 8282| | centrist| 50853| |Conservative|570355| | Economics| 50770| +------------+------+
Submissions¶
# S3 directory path
s3_directory = f"s3a://{bucket}/project/submissions/"
# Read all the Parquet files in the directory into a DataFrame
df_submissions = spark.read.parquet(s3_directory)
23/10/22 20:40:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
print("Submissions rows: "  , df_submissions.count())
print("Submissions columns: "  , len(df_submissions.columns))
# check counts (ensuring all needed subreddits exist)
df_submissions.groupBy('subreddit').count().show()
Submissions rows: 36353 Submissions columns: 68
[Stage 11:===========================================> (3 + 1) / 4]
+------------+-----+ | subreddit|count| +------------+-----+ |changemyview| 3507| | finance| 950| | socialism| 2349| | Libertarian| 4448| |Ask_Politics| 1027| | centrist| 1194| |Conservative|21520| | Economics| 1358| +------------+-----+
Data Cleaning¶
Submissions¶
df_submissions.printSchema()
root |-- adserver_click_url: string (nullable = true) |-- adserver_imp_pixel: string (nullable = true) |-- archived: boolean (nullable = true) |-- author: string (nullable = true) |-- author_cakeday: boolean (nullable = true) |-- author_flair_css_class: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_id: string (nullable = true) |-- brand_safe: boolean (nullable = true) |-- contest_mode: boolean (nullable = true) |-- created_utc: timestamp (nullable = true) |-- crosspost_parent: string (nullable = true) |-- crosspost_parent_list: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- approved_at_utc: string (nullable = true) | | |-- approved_by: string (nullable = true) | | |-- archived: boolean (nullable = true) | | |-- author: string (nullable = true) | | |-- author_flair_css_class: string (nullable = true) | | |-- author_flair_text: string (nullable = true) | | |-- banned_at_utc: string (nullable = true) | | |-- banned_by: string (nullable = true) | | |-- brand_safe: boolean (nullable = true) | | |-- can_gild: boolean (nullable = true) | | |-- can_mod_post: boolean (nullable = true) | | |-- clicked: boolean (nullable = true) | | |-- contest_mode: boolean (nullable = true) | | |-- created: double (nullable = true) | | |-- created_utc: double (nullable = true) | | |-- distinguished: string (nullable = true) | | |-- domain: string (nullable = true) | | |-- downs: long (nullable = true) | | |-- edited: boolean (nullable = true) | | |-- gilded: long (nullable = true) | | |-- hidden: boolean (nullable = true) | | |-- hide_score: boolean (nullable = true) | | |-- id: string (nullable = true) | | |-- is_crosspostable: boolean (nullable = true) | | |-- is_reddit_media_domain: boolean (nullable = true) | | |-- is_self: boolean (nullable = true) | | |-- is_video: boolean (nullable = true) | | |-- likes: string (nullable = true) | | |-- link_flair_css_class: string (nullable = true) | | |-- link_flair_text: string (nullable = true) | | |-- locked: boolean (nullable = true) | | |-- media: string (nullable = true) | | |-- mod_reports: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- name: string (nullable = true) | | |-- num_comments: long (nullable = true) | | |-- num_crossposts: long (nullable = true) | | |-- num_reports: string (nullable = true) | | |-- over_18: boolean (nullable = true) | | |-- parent_whitelist_status: string (nullable = true) | | |-- permalink: string (nullable = true) | | |-- pinned: boolean (nullable = true) | | |-- quarantine: boolean (nullable = true) | | |-- removal_reason: string (nullable = true) | | |-- report_reasons: string (nullable = true) | | |-- saved: boolean (nullable = true) | | |-- score: long (nullable = true) | | |-- secure_media: string (nullable = true) | | |-- selftext: string (nullable = true) | | |-- selftext_html: string (nullable = true) | | |-- spoiler: boolean (nullable = true) | | |-- stickied: boolean (nullable = true) | | |-- subreddit: string (nullable = true) | | |-- subreddit_id: string (nullable = true) | | |-- subreddit_name_prefixed: string (nullable = true) | | |-- subreddit_type: string (nullable = true) | | |-- suggested_sort: string (nullable = true) | | |-- thumbnail: string (nullable = true) | | |-- thumbnail_height: string (nullable = true) | | |-- thumbnail_width: string (nullable = true) | | |-- title: string (nullable = true) | | |-- ups: long (nullable = true) | | |-- url: string (nullable = true) | | |-- user_reports: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- view_count: string (nullable = true) | | |-- visited: boolean (nullable = true) | | |-- whitelist_status: string (nullable = true) |-- disable_comments: boolean (nullable = true) |-- distinguished: string (nullable = true) |-- domain: string (nullable = true) |-- domain_override: string (nullable = true) |-- edited: string (nullable = true) |-- embed_type: string (nullable = true) |-- embed_url: string (nullable = true) |-- gilded: long (nullable = true) |-- hidden: boolean (nullable = true) |-- hide_score: boolean (nullable = true) |-- href_url: string (nullable = true) |-- id: string (nullable = true) |-- imp_pixel: string (nullable = true) |-- is_crosspostable: boolean (nullable = true) |-- is_reddit_media_domain: boolean (nullable = true) |-- is_self: boolean (nullable = true) |-- is_video: boolean (nullable = true) |-- link_flair_css_class: string (nullable = true) |-- link_flair_text: string (nullable = true) |-- locked: boolean (nullable = true) |-- media: struct (nullable = true) | |-- event_id: string (nullable = true) | |-- oembed: struct (nullable = true) | | |-- author_name: string (nullable = true) | | |-- author_url: string (nullable = true) | | |-- cache_age: long (nullable = true) | | |-- description: string (nullable = true) | | |-- height: long (nullable = true) | | |-- html: string (nullable = true) | | |-- provider_name: string (nullable = true) | | |-- provider_url: string (nullable = true) | | |-- thumbnail_height: long (nullable = true) | | |-- thumbnail_url: string (nullable = true) | | |-- thumbnail_width: long (nullable = true) | | |-- title: string (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- version: string (nullable = true) | | |-- width: long (nullable = true) | |-- reddit_video: struct (nullable = true) | | |-- dash_url: string (nullable = true) | | |-- duration: long (nullable = true) | | |-- fallback_url: string (nullable = true) | | |-- height: long (nullable = true) | | |-- hls_url: string (nullable = true) | | |-- is_gif: boolean (nullable = true) | | |-- scrubber_media_url: string (nullable = true) | | |-- transcoding_status: string (nullable = true) | | |-- width: long (nullable = true) | |-- type: string (nullable = true) |-- media_embed: struct (nullable = true) | |-- content: string (nullable = true) | |-- height: long (nullable = true) | |-- scrolling: boolean (nullable = true) | |-- width: long (nullable = true) |-- mobile_ad_url: string (nullable = true) |-- num_comments: long (nullable = true) |-- num_crossposts: long (nullable = true) |-- original_link: string (nullable = true) |-- over_18: boolean (nullable = true) |-- parent_whitelist_status: string (nullable = true) |-- permalink: string (nullable = true) |-- pinned: boolean (nullable = true) |-- post_hint: string (nullable = true) |-- preview: struct (nullable = true) | |-- enabled: boolean (nullable = true) | |-- images: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- id: string (nullable = true) | | | |-- resolutions: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- height: long (nullable = true) | | | | | |-- url: string (nullable = true) | | | | | |-- width: long (nullable = true) | | | |-- source: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- url: string (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- variants: struct (nullable = true) | | | | |-- gif: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- mp4: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- nsfw: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- obfuscated: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) |-- promoted: boolean (nullable = true) |-- promoted_by: string (nullable = true) |-- promoted_display_name: string (nullable = true) |-- promoted_url: string (nullable = true) |-- retrieved_on: timestamp (nullable = true) |-- score: long (nullable = true) |-- secure_media: struct (nullable = true) | |-- event_id: string (nullable = true) | |-- oembed: struct (nullable = true) | | |-- author_name: string (nullable = true) | | |-- author_url: string (nullable = true) | | |-- cache_age: long (nullable = true) | | |-- description: string (nullable = true) | | |-- height: long (nullable = true) | | |-- html: string (nullable = true) | | |-- provider_name: string (nullable = true) | | |-- provider_url: string (nullable = true) | | |-- thumbnail_height: long (nullable = true) | | |-- thumbnail_url: string (nullable = true) | | |-- thumbnail_width: long (nullable = true) | | |-- title: string (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- version: string (nullable = true) | | |-- width: long (nullable = true) | |-- type: string (nullable = true) |-- secure_media_embed: struct (nullable = true) | |-- content: string (nullable = true) | |-- height: long (nullable = true) | |-- media_domain_url: string (nullable = true) | |-- scrolling: boolean (nullable = true) | |-- width: long (nullable = true) |-- selftext: string (nullable = true) |-- spoiler: boolean (nullable = true) |-- stickied: boolean (nullable = true) |-- subreddit: string (nullable = true) |-- subreddit_id: string (nullable = true) |-- suggested_sort: string (nullable = true) |-- third_party_trackers: array (nullable = true) | |-- element: string (containsNull = true) |-- third_party_tracking: string (nullable = true) |-- third_party_tracking_2: string (nullable = true) |-- thumbnail: string (nullable = true) |-- thumbnail_height: long (nullable = true) |-- thumbnail_width: long (nullable = true) |-- title: string (nullable = true) |-- url: string (nullable = true) |-- whitelist_status: string (nullable = true)
The initial cleaning involves reducing the total number of columns, as there are defnitely a lot of columns that are not very useful for the analysis. The main columns to be included (for now) are:
- author: The user who made the post. This can be useful for understanding user activity patterns and identifying influential users. 
- title: The title of the post. This is essential for content analysis, sentiment analysis, and topic modeling. 
- selftext: The body text of the post (if it's a text post). Like the title, this is crucial for content-based analyses. 
- subreddit: The community where the post was made. This helps in community-specific analyses and understanding the dynamics of different subreddits. 
- score: The net upvotes minus downvotes the post has received. It gives an idea of the popularity and acceptance of the post. 
- num_comments: The number of comments on the post. This can be an indicator of the post's engagement level. 
- permalink: The URL path to the post on Reddit. Useful for referencing and direct access. 
- created_utc: The timestamp when the post was created. Time-based analysis, trends, and patterns can be derived using this. 
- url: The URL linked in the post (if it's a link post). Helps in understanding external content being shared. 
- domain: The domain of the linked content. This can give insights into the most commonly shared websites/sources in a subreddit. 
- is_video, is_self, is_reddit_media_domain: Flags indicating the type of the post (video, text, or media from Reddit's domain). 
- spoiler and over_18: Flags indicating if the post is marked as a spoiler or is NSFW (Not Safe For Work). 
- stickied: Indicates if the post is pinned to the top of the subreddit. 
- thumbnail: If available, the thumbnail can give a quick visual summary of linked content. 
- media and secure_media: Details about any embedded media in the post, like videos. 
- gilded: Indicates how many times the post has received a gold award. 
- archived: Tells if the post is archived (meaning no new comments can be added). 
- distinguished: Indicates if the post is made by a moderator or admin of Reddit. 
- crosspost_parent and crosspost_parent_list: Information about the original post if the current post is a crosspost. 
important_features = [
    "author",
    "title",
    "selftext",
    "subreddit",
    "score",
    "num_comments",
    "permalink",
    "created_utc",
    "url",
    "domain",
    "is_video",
    "is_self",
    "is_reddit_media_domain",
    "spoiler",
    "over_18",
    "stickied",
    "thumbnail",
    "media",
    "secure_media",
    "gilded",
    "archived",
    "distinguished",
    "crosspost_parent",
    "crosspost_parent_list"
]
df_submissions_filtered = df_submissions.select(important_features)
# remove non-alphanumeric from title and selftext
from pyspark.sql.functions import when, regexp_replace
for col in ['title', 'selftext']:
    df_submissions_filtered = df_submissions_filtered.withColumn(col,
        when((df_submissions_filtered[col] != '[deleted]') & (df_submissions_filtered[col] != '[removed]'), # remove non-alphanumeric if NOT [deleted] or [removed]
            regexp_replace(df_submissions_filtered[col], '[^a-zA-Z0-9]', ' ')
        ).otherwise(df_submissions_filtered[col]) # leave [deleted] or [removed] as is
    )
# remove leading / trailing whitespaces for all string cols
from pyspark.sql.functions import trim
string_cols = [col_name for col_name, col_type in df_submissions_filtered.dtypes if col_type == 'string']
for col in string_cols:
    df_submissions_filtered = df_submissions_filtered.withColumn(col, trim(df_submissions_filtered[col]))
# value counts for columns of interest 
cols = ['subreddit', 'author', 'title', 'selftext']
for column in cols:
    print(f"Value counts for column: {column}")
    df_submissions_filtered.groupBy(column).count().orderBy("count", ascending=False).show()
Value counts for column: subreddit
+------------+-----+ | subreddit|count| +------------+-----+ |Conservative|21520| | Libertarian| 4448| |changemyview| 3507| | socialism| 2349| | Economics| 1358| | centrist| 1194| |Ask_Politics| 1027| | finance| 950| +------------+-----+ Value counts for column: author
+-------------------+-----+ | author|count| +-------------------+-----+ | [deleted]|10215| | Winterhold2000| 642| | guanaco55| 548| | f1sh98| 508| | canfbar| 442| | nimobo| 351| | JesusCumelette| 254| | ChunkyArsenio| 216| | Foubar_ghost| 208| | fintechinshorts| 183| | XIIXOO| 135| | ThePoliticalHat| 132| | geocentrist| 123| | bobbyw24| 121| | theradiostarz| 113| | cyanobyte| 104| | Beliavsky| 102| | billgigs55| 101| | oz4ut| 99| |FullmetalGameMaster| 95| +-------------------+-----+ only showing top 20 rows Value counts for column: title
+--------------------+-----+ | title|count| +--------------------+-----+ | | 57| |I just want to fi...| 20| |You think hunter ...| 13| |Lowering the Bar ...| 12| | Lol| 12| |Get free 200 Ama...| 11| | Pesto| 10| |Audio Trump bera...| 10| |uThermo just rest...| 10| | Question| 9| | Thoughts| 9| | Truth| 9| | Censorship| 9| |Chaos on Capitol ...| 8| |Twitter permanent...| 8| | Gab| 8| | Facts| 8| | True| 7| |North Idaho inter...| 7| | Hi| 7| +--------------------+-----+ only showing top 20 rows Value counts for column: selftext
[Stage 23:===========================================> (3 + 1) / 4]
+--------------------+-----+ | selftext|count| +--------------------+-----+ | |18283| | [removed]| 8117| | [deleted]| 6499| | That is all| 5| |Feel free to disc...| 2| |All violence shou...| 2| | Title| 2| |Hello I consider...| 2| |After Capitol rio...| 2| |Sorry if this is ...| 2| |I have seen the s...| 1| |I would like to c...| 1| |In recent years a...| 1| |Socialist Alterna...| 1| |He would obviousl...| 1| |The r conservativ...| 1| |There s a lot of ...| 1| |If such a clearly...| 1| |What would you li...| 1| |If impeachment is...| 1| +--------------------+-----+ only showing top 20 rows
'''
get the count of deleted / empty posts
empty posts (meets all of the following): 
    - title = blank
    - selftext = removed, deleted, or blank
'''
from pyspark.sql.functions import length, col
is_empty = (
    (length(col('title')) < 1) &
    (
        (col('selftext') == 'removed') | 
        (col('selftext') == 'deleted') | 
        (length(col('selftext')) < 1)
    )
)
df_submissions_filtered = df_submissions_filtered.filter(~is_empty)
# value counts for columns of interest AFTER filtering
for column in cols:
    print(f"Value counts for column: {column}")
    df_submissions_filtered.groupBy(column).count().orderBy("count", ascending=False).show()
    
# reference: uncleaned counts
# +------------+-----+
# |   subreddit|count|
# +------------+-----+
# |changemyview| 3507|
# |     finance|  950|
# |   socialism| 2349|
# | Libertarian| 4448|
# |Ask_Politics| 1027|
# |    centrist| 1194|
# |Conservative|21520|
# |   Economics| 1358|
# +------------+-----+
Value counts for column: subreddit
+------------+-----+ | subreddit|count| +------------+-----+ |Conservative|21493| | Libertarian| 4448| |changemyview| 3507| | socialism| 2343| | Economics| 1354| | centrist| 1194| |Ask_Politics| 1027| | finance| 948| +------------+-----+ Value counts for column: author
+-------------------+-----+ | author|count| +-------------------+-----+ | [deleted]|10213| | Winterhold2000| 642| | guanaco55| 548| | f1sh98| 508| | canfbar| 442| | nimobo| 351| | JesusCumelette| 254| | ChunkyArsenio| 216| | Foubar_ghost| 208| | fintechinshorts| 183| | XIIXOO| 135| | ThePoliticalHat| 132| | geocentrist| 123| | bobbyw24| 121| | theradiostarz| 113| | cyanobyte| 104| | Beliavsky| 102| | billgigs55| 101| | oz4ut| 99| |FullmetalGameMaster| 95| +-------------------+-----+ only showing top 20 rows Value counts for column: title
+--------------------+-----+ | title|count| +--------------------+-----+ |I just want to fi...| 20| | | 18| |You think hunter ...| 13| |Lowering the Bar ...| 12| | Lol| 12| |Get free 200 Ama...| 11| | Pesto| 10| |Audio Trump bera...| 10| |uThermo just rest...| 10| | Question| 9| | Thoughts| 9| | Truth| 9| | Censorship| 9| |Chaos on Capitol ...| 8| |Twitter permanent...| 8| | Gab| 8| | Facts| 8| | True| 7| |North Idaho inter...| 7| | Hi| 7| +--------------------+-----+ only showing top 20 rows Value counts for column: selftext
[Stage 35:===========================================> (3 + 1) / 4]
+--------------------+-----+ | selftext|count| +--------------------+-----+ | |18244| | [removed]| 8117| | [deleted]| 6499| | That is all| 5| |Feel free to disc...| 2| |All violence shou...| 2| | Title| 2| |Hello I consider...| 2| |After Capitol rio...| 2| |Sorry if this is ...| 2| |I have seen the s...| 1| |I would like to c...| 1| |In recent years a...| 1| |Socialist Alterna...| 1| |He would obviousl...| 1| |The r conservativ...| 1| |There s a lot of ...| 1| |If such a clearly...| 1| |What would you li...| 1| |If impeachment is...| 1| +--------------------+-----+ only showing top 20 rows
Comments¶
df_comments.printSchema()
root |-- author: string (nullable = true) |-- author_cakeday: boolean (nullable = true) |-- author_flair_css_class: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- body: string (nullable = true) |-- can_gild: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- created_utc: timestamp (nullable = true) |-- distinguished: string (nullable = true) |-- edited: string (nullable = true) |-- gilded: long (nullable = true) |-- id: string (nullable = true) |-- is_submitter: boolean (nullable = true) |-- link_id: string (nullable = true) |-- parent_id: string (nullable = true) |-- permalink: string (nullable = true) |-- retrieved_on: timestamp (nullable = true) |-- score: long (nullable = true) |-- stickied: boolean (nullable = true) |-- subreddit: string (nullable = true) |-- subreddit_id: string (nullable = true)
Relevant columns to be kept (for now):
- author: Username of the person who wrote the comment. Relevant for tracking specific users or identifying patterns among users. 
- author_cakeday: Indicates if it's the anniversary of the author joining Reddit. Typically not critical unless you're looking into the behavior of users on their "cakeday". 
- author_flair_css_class & author_flair_text: Flair associated with the user in that subreddit. Might be relevant for categorizing users in certain subreddits. 
- body: The content of the comment. Crucial for most analyses involving comments. 
- can_gild: Indicates if the comment can be given a gold award. Might not be necessary unless you're studying gilding behavior. 
- controversiality: Indicates if a comment is controversial (i.e., has a nearly equal amount of upvotes and downvotes). Can be useful to identify divisive comments. 
- created_utc: Timestamp of the comment. Relevant for time series analysis or understanding the data's time distribution. 
- distinguished: Whether a comment is distinguished by moderators/admins. Can be useful in subreddits with active moderation. 
- edited: Indicates if the comment was edited after posting. Can be useful to track changes. 
- gilded: Number of gold awards the comment received. Relevant if you're interested in the appreciation of comments. 
- id: Unique identifier for the comment. Essential for data integrity and referencing. 
- is_submitter: Indicates if the commenter is also the original poster. Can be useful to track interactions. 
- link_id: ID of the post to which the comment belongs. Vital for mapping comments to posts. 
- parent_id: ID of the parent comment or post. Relevant for understanding comment threads and hierarchies. 
- permalink: Direct link to the comment. Useful for referencing. 
- retrieved_on: Timestamp of when the comment was pulled. Useful for data tracking and understanding the time frame of data collection. 
- score: Net score (upvotes - downvotes) of the comment. Critical for understanding the reception of a comment. 
- stickied: Indicates if the comment is pinned. Relevant in subreddits with active moderation or announcements. 
- subreddit & subreddit_id: The subreddit where the comment was posted. Essential for any subreddit-based analysis. 
important_features_comments = ['author' , 'author_cakeday' , 'author_flair_css_class' , 'body' , 
                               'can_gild' , 'controversiality' , 'created_utc' , 'distinguished' , 
                               'edited' , 'gilded' , 'id', 'is_submitter' , 'link_id', 'parent_id', 
                               'permalink' , 'retrieved_on', 'score', 'stickied' , 'subreddit', 
                               'subreddit_id']
df_comments_filtered = df_comments.select(important_features_comments)
# remove non-alphanumeric from body
from pyspark.sql.functions import when, regexp_replace
for col in ['body']:
    df_comments_filtered = df_comments_filtered.withColumn(col,
        when((df_comments_filtered[col] != '[deleted]') & (df_comments_filtered[col] != '[removed]'), # remove non-alphanumeric if NOT [deleted] or [removed]
            regexp_replace(df_comments_filtered[col], '[^a-zA-Z0-9]', ' ')
        ).otherwise(df_comments_filtered[col]) # leave [deleted] or [removed] as is
    )
# remove leading / trailing whitespaces for all string cols
from pyspark.sql.functions import trim
string_cols = [col_name for col_name, col_type in df_comments_filtered.dtypes if col_type == 'string']
for col in string_cols:
    df_comments_filtered = df_comments_filtered.withColumn(col, trim(df_comments_filtered[col]))
# value counts for columns of interest 
cols = ['subreddit', 'author', 'body']
for column in cols:
    print(f"Value counts for column: {column}")
    df_comments_filtered.groupBy(column).count().orderBy("count", ascending=False).show()
Value counts for column: subreddit
+------------+------+ | subreddit| count| +------------+------+ |Conservative|570355| | Libertarian|186529| |changemyview|158144| | centrist| 50853| | Economics| 50770| | socialism| 17527| | finance| 8476| |Ask_Politics| 8282| +------------+------+ Value counts for column: author
+--------------------+------+ | author| count| +--------------------+------+ | [deleted]|295376| | AutoModerator| 7740| | DeltaBot| 2332| | premer777| 1726| | Ansuz07| 1018| | russiabot1776| 973| | Slabraton| 958| | JemiSilverhand| 949| | donutholster| 859| | TinyNuggins92| 850| |Technical-Citron-750| 847| | ShoutingMatch| 838| | Ransom__Stoddard| 837| | ihatedogs2| 820| | ConsensusHawk| 773| | mc2222| 738| | swayz38| 716| | notoyrobots| 706| | XDarkstarX1138| 669| | Commonusername89| 630| +--------------------+------+ only showing top 20 rows Value counts for column: body
[Stage 46:======================================> (2 + 1) / 3]
+--------------------+------+ | body| count| +--------------------+------+ | [removed]|211826| | [deleted]| 54003| |Rule VI All co...| 2032| |Looking for debat...| 1133| |Tired of reportin...| 1088| | | 1037| |Welcome to r ask...| 943| |New accounts less...| 711| | Yes| 454| | Good| 415| | Lol| 351| |We are currently ...| 318| | No| 298| | Exactly| 260| | Agreed| 254| | Thank you| 240| | This| 180| | Thanks| 169| | What| 164| |This post has bee...| 163| +--------------------+------+ only showing top 20 rows
'''
get the count of deleted / empty comments
empty posts: body = removed, deleted, or blank
'''
from pyspark.sql.functions import length, col
is_empty = (
    (col('body') == '[removed]') | 
    (col('body') == '[deleted]') | 
    (length(col('body')) < 1)
)
df_comments_filtered = df_comments_filtered.filter(~is_empty)
# value counts for columns of interest AFTER filtering
for column in cols:
    print(f"Value counts for column: {column}")
    df_comments_filtered.groupBy(column).count().orderBy("count", ascending=False).show()
    
# reference: uncleaned counts
# +------------+------+
# |   subreddit| count|
# +------------+------+
# |changemyview|158144|
# |     finance|  8476|
# |   socialism| 17527|
# | Libertarian|186529|
# |Ask_Politics|  8282|
# |    centrist| 50853|
# |Conservative|570355|
# |   Economics| 50770|
# +------------+------+
Value counts for column: subreddit
+------------+------+ | subreddit| count| +------------+------+ |Conservative|355752| | Libertarian|172203| |changemyview|143094| | centrist| 46966| | Economics| 38253| | socialism| 14007| |Ask_Politics| 7388| | finance| 6407| +------------+------+ Value counts for column: author
+--------------------+-----+ | author|count| +--------------------+-----+ | [deleted]|29435| | AutoModerator| 7740| | DeltaBot| 2332| | premer777| 1726| | Ansuz07| 1018| | russiabot1776| 971| | Slabraton| 958| | JemiSilverhand| 949| | donutholster| 859| | TinyNuggins92| 850| |Technical-Citron-750| 846| | ShoutingMatch| 838| | Ransom__Stoddard| 835| | ihatedogs2| 820| | ConsensusHawk| 773| | mc2222| 738| | swayz38| 712| | notoyrobots| 700| | XDarkstarX1138| 669| | Commonusername89| 630| +--------------------+-----+ only showing top 20 rows Value counts for column: body
[Stage 55:> (0 + 2) / 3]
+--------------------+-----+ | body|count| +--------------------+-----+ |Rule VI All co...| 2032| |Looking for debat...| 1133| |Tired of reportin...| 1088| |Welcome to r ask...| 943| |New accounts less...| 711| | Yes| 454| | Good| 415| | Lol| 351| |We are currently ...| 318| | No| 298| | Exactly| 260| | Agreed| 254| | Thank you| 240| | This| 180| | Thanks| 169| | What| 164| |This post has bee...| 163| | Why| 158| |Please note Reddi...| 149| | Based| 145| +--------------------+-----+ only showing top 20 rows
Save DFs to S3¶
s3_path = f"s3a://{bucket}/project/cleaned"
df_submissions_filtered.write.mode("overwrite").parquet(f'{s3_path}/submissions')
df_comments_filtered.write.mode("overwrite").parquet(f'{s3_path}/comments')