Setup¶

In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.3.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done
Solving environment: done


==> WARNING: A newer version of conda exists. <==
  current version: 23.3.1
  latest version: 23.9.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.9.0



# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.
Requirement already satisfied: pyspark==3.3.0 in /opt/conda/lib/python3.10/site-packages (3.3.0)
Requirement already satisfied: py4j==0.10.9.5 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.3.0) (0.10.9.5)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Out[ ]:

Create Spark Session¶

In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fa76f279-6531-4934-8c5c-c104c11f8cb3;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 431ms :: artifacts dl 21ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-fa76f279-6531-4934-8c5c-c104c11f8cb3
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/24ms)
23/10/22 20:38:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.3.0

Read in the data¶

Comments¶

In [ ]:
import sagemaker

session = sagemaker.Session()
bucket = session.default_bucket()
print(bucket)

# Create or retrieve a Spark session
spark = SparkSession.builder.appName("ReadS3Parquet").getOrCreate()

# S3 directory path
s3_directory = f"s3a://{bucket}/project/comments/"

# Read all the Parquet files in the directory into a DataFrame
df_comments = spark.read.parquet(s3_directory)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker-us-east-1-711387073580
23/10/22 20:39:01 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
23/10/22 20:39:01 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                
In [ ]:
print("Comments rows: "  , df_comments.count())
print("Comments columns: "  , len(df_comments.columns))

# check counts (ensuring all needed subreddits exist)
df_comments.groupBy('subreddit').count().show()
                                                                                
Comments rows:  1050936
Comments columns:  21
[Stage 4:====================================================>     (9 + 1) / 10]
+------------+------+
|   subreddit| count|
+------------+------+
|changemyview|158144|
|     finance|  8476|
|   socialism| 17527|
| Libertarian|186529|
|Ask_Politics|  8282|
|    centrist| 50853|
|Conservative|570355|
|   Economics| 50770|
+------------+------+

                                                                                

Submissions¶

In [ ]:
# S3 directory path
s3_directory = f"s3a://{bucket}/project/submissions/"

# Read all the Parquet files in the directory into a DataFrame
df_submissions = spark.read.parquet(s3_directory)
23/10/22 20:40:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
In [ ]:
print("Submissions rows: "  , df_submissions.count())
print("Submissions columns: "  , len(df_submissions.columns))

# check counts (ensuring all needed subreddits exist)
df_submissions.groupBy('subreddit').count().show()
                                                                                
Submissions rows:  36353
Submissions columns:  68
[Stage 11:===========================================>              (3 + 1) / 4]
+------------+-----+
|   subreddit|count|
+------------+-----+
|changemyview| 3507|
|     finance|  950|
|   socialism| 2349|
| Libertarian| 4448|
|Ask_Politics| 1027|
|    centrist| 1194|
|Conservative|21520|
|   Economics| 1358|
+------------+-----+

                                                                                

Data Cleaning¶

Submissions¶

In [ ]:
df_submissions.printSchema()
root
 |-- adserver_click_url: string (nullable = true)
 |-- adserver_imp_pixel: string (nullable = true)
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- brand_safe: boolean (nullable = true)
 |-- contest_mode: boolean (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- crosspost_parent: string (nullable = true)
 |-- crosspost_parent_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- approved_at_utc: string (nullable = true)
 |    |    |-- approved_by: string (nullable = true)
 |    |    |-- archived: boolean (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- author_flair_css_class: string (nullable = true)
 |    |    |-- author_flair_text: string (nullable = true)
 |    |    |-- banned_at_utc: string (nullable = true)
 |    |    |-- banned_by: string (nullable = true)
 |    |    |-- brand_safe: boolean (nullable = true)
 |    |    |-- can_gild: boolean (nullable = true)
 |    |    |-- can_mod_post: boolean (nullable = true)
 |    |    |-- clicked: boolean (nullable = true)
 |    |    |-- contest_mode: boolean (nullable = true)
 |    |    |-- created: double (nullable = true)
 |    |    |-- created_utc: double (nullable = true)
 |    |    |-- distinguished: string (nullable = true)
 |    |    |-- domain: string (nullable = true)
 |    |    |-- downs: long (nullable = true)
 |    |    |-- edited: boolean (nullable = true)
 |    |    |-- gilded: long (nullable = true)
 |    |    |-- hidden: boolean (nullable = true)
 |    |    |-- hide_score: boolean (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- is_crosspostable: boolean (nullable = true)
 |    |    |-- is_reddit_media_domain: boolean (nullable = true)
 |    |    |-- is_self: boolean (nullable = true)
 |    |    |-- is_video: boolean (nullable = true)
 |    |    |-- likes: string (nullable = true)
 |    |    |-- link_flair_css_class: string (nullable = true)
 |    |    |-- link_flair_text: string (nullable = true)
 |    |    |-- locked: boolean (nullable = true)
 |    |    |-- media: string (nullable = true)
 |    |    |-- mod_reports: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_comments: long (nullable = true)
 |    |    |-- num_crossposts: long (nullable = true)
 |    |    |-- num_reports: string (nullable = true)
 |    |    |-- over_18: boolean (nullable = true)
 |    |    |-- parent_whitelist_status: string (nullable = true)
 |    |    |-- permalink: string (nullable = true)
 |    |    |-- pinned: boolean (nullable = true)
 |    |    |-- quarantine: boolean (nullable = true)
 |    |    |-- removal_reason: string (nullable = true)
 |    |    |-- report_reasons: string (nullable = true)
 |    |    |-- saved: boolean (nullable = true)
 |    |    |-- score: long (nullable = true)
 |    |    |-- secure_media: string (nullable = true)
 |    |    |-- selftext: string (nullable = true)
 |    |    |-- selftext_html: string (nullable = true)
 |    |    |-- spoiler: boolean (nullable = true)
 |    |    |-- stickied: boolean (nullable = true)
 |    |    |-- subreddit: string (nullable = true)
 |    |    |-- subreddit_id: string (nullable = true)
 |    |    |-- subreddit_name_prefixed: string (nullable = true)
 |    |    |-- subreddit_type: string (nullable = true)
 |    |    |-- suggested_sort: string (nullable = true)
 |    |    |-- thumbnail: string (nullable = true)
 |    |    |-- thumbnail_height: string (nullable = true)
 |    |    |-- thumbnail_width: string (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- ups: long (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- user_reports: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- view_count: string (nullable = true)
 |    |    |-- visited: boolean (nullable = true)
 |    |    |-- whitelist_status: string (nullable = true)
 |-- disable_comments: boolean (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- domain_override: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- embed_type: string (nullable = true)
 |-- embed_url: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- hidden: boolean (nullable = true)
 |-- hide_score: boolean (nullable = true)
 |-- href_url: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imp_pixel: string (nullable = true)
 |-- is_crosspostable: boolean (nullable = true)
 |-- is_reddit_media_domain: boolean (nullable = true)
 |-- is_self: boolean (nullable = true)
 |-- is_video: boolean (nullable = true)
 |-- link_flair_css_class: string (nullable = true)
 |-- link_flair_text: string (nullable = true)
 |-- locked: boolean (nullable = true)
 |-- media: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- oembed: struct (nullable = true)
 |    |    |-- author_name: string (nullable = true)
 |    |    |-- author_url: string (nullable = true)
 |    |    |-- cache_age: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- provider_name: string (nullable = true)
 |    |    |-- provider_url: string (nullable = true)
 |    |    |-- thumbnail_height: long (nullable = true)
 |    |    |-- thumbnail_url: string (nullable = true)
 |    |    |-- thumbnail_width: long (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- version: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- reddit_video: struct (nullable = true)
 |    |    |-- dash_url: string (nullable = true)
 |    |    |-- duration: long (nullable = true)
 |    |    |-- fallback_url: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- hls_url: string (nullable = true)
 |    |    |-- is_gif: boolean (nullable = true)
 |    |    |-- scrubber_media_url: string (nullable = true)
 |    |    |-- transcoding_status: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- type: string (nullable = true)
 |-- media_embed: struct (nullable = true)
 |    |-- content: string (nullable = true)
 |    |-- height: long (nullable = true)
 |    |-- scrolling: boolean (nullable = true)
 |    |-- width: long (nullable = true)
 |-- mobile_ad_url: string (nullable = true)
 |-- num_comments: long (nullable = true)
 |-- num_crossposts: long (nullable = true)
 |-- original_link: string (nullable = true)
 |-- over_18: boolean (nullable = true)
 |-- parent_whitelist_status: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- pinned: boolean (nullable = true)
 |-- post_hint: string (nullable = true)
 |-- preview: struct (nullable = true)
 |    |-- enabled: boolean (nullable = true)
 |    |-- images: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |-- variants: struct (nullable = true)
 |    |    |    |    |-- gif: struct (nullable = true)
 |    |    |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- mp4: struct (nullable = true)
 |    |    |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- nsfw: struct (nullable = true)
 |    |    |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |-- obfuscated: struct (nullable = true)
 |    |    |    |    |    |-- resolutions: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |    |-- width: long (nullable = true)
 |    |    |    |    |    |-- source: struct (nullable = true)
 |    |    |    |    |    |    |-- height: long (nullable = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- width: long (nullable = true)
 |-- promoted: boolean (nullable = true)
 |-- promoted_by: string (nullable = true)
 |-- promoted_display_name: string (nullable = true)
 |-- promoted_url: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- secure_media: struct (nullable = true)
 |    |-- event_id: string (nullable = true)
 |    |-- oembed: struct (nullable = true)
 |    |    |-- author_name: string (nullable = true)
 |    |    |-- author_url: string (nullable = true)
 |    |    |-- cache_age: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- html: string (nullable = true)
 |    |    |-- provider_name: string (nullable = true)
 |    |    |-- provider_url: string (nullable = true)
 |    |    |-- thumbnail_height: long (nullable = true)
 |    |    |-- thumbnail_url: string (nullable = true)
 |    |    |-- thumbnail_width: long (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- version: string (nullable = true)
 |    |    |-- width: long (nullable = true)
 |    |-- type: string (nullable = true)
 |-- secure_media_embed: struct (nullable = true)
 |    |-- content: string (nullable = true)
 |    |-- height: long (nullable = true)
 |    |-- media_domain_url: string (nullable = true)
 |    |-- scrolling: boolean (nullable = true)
 |    |-- width: long (nullable = true)
 |-- selftext: string (nullable = true)
 |-- spoiler: boolean (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- suggested_sort: string (nullable = true)
 |-- third_party_trackers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- third_party_tracking: string (nullable = true)
 |-- third_party_tracking_2: string (nullable = true)
 |-- thumbnail: string (nullable = true)
 |-- thumbnail_height: long (nullable = true)
 |-- thumbnail_width: long (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- whitelist_status: string (nullable = true)

The initial cleaning involves reducing the total number of columns, as there are defnitely a lot of columns that are not very useful for the analysis. The main columns to be included (for now) are:

  1. author: The user who made the post. This can be useful for understanding user activity patterns and identifying influential users.

  2. title: The title of the post. This is essential for content analysis, sentiment analysis, and topic modeling.

  3. selftext: The body text of the post (if it's a text post). Like the title, this is crucial for content-based analyses.

  4. subreddit: The community where the post was made. This helps in community-specific analyses and understanding the dynamics of different subreddits.

  5. score: The net upvotes minus downvotes the post has received. It gives an idea of the popularity and acceptance of the post.

  6. num_comments: The number of comments on the post. This can be an indicator of the post's engagement level.

  7. permalink: The URL path to the post on Reddit. Useful for referencing and direct access.

  8. created_utc: The timestamp when the post was created. Time-based analysis, trends, and patterns can be derived using this.

  9. url: The URL linked in the post (if it's a link post). Helps in understanding external content being shared.

  10. domain: The domain of the linked content. This can give insights into the most commonly shared websites/sources in a subreddit.

  11. is_video, is_self, is_reddit_media_domain: Flags indicating the type of the post (video, text, or media from Reddit's domain).

  12. spoiler and over_18: Flags indicating if the post is marked as a spoiler or is NSFW (Not Safe For Work).

  13. stickied: Indicates if the post is pinned to the top of the subreddit.

  14. thumbnail: If available, the thumbnail can give a quick visual summary of linked content.

  15. media and secure_media: Details about any embedded media in the post, like videos.

  16. gilded: Indicates how many times the post has received a gold award.

  17. archived: Tells if the post is archived (meaning no new comments can be added).

  18. distinguished: Indicates if the post is made by a moderator or admin of Reddit.

  19. crosspost_parent and crosspost_parent_list: Information about the original post if the current post is a crosspost.

In [ ]:
important_features = [
    "author",
    "title",
    "selftext",
    "subreddit",
    "score",
    "num_comments",
    "permalink",
    "created_utc",
    "url",
    "domain",
    "is_video",
    "is_self",
    "is_reddit_media_domain",
    "spoiler",
    "over_18",
    "stickied",
    "thumbnail",
    "media",
    "secure_media",
    "gilded",
    "archived",
    "distinguished",
    "crosspost_parent",
    "crosspost_parent_list"
]


df_submissions_filtered = df_submissions.select(important_features)
In [ ]:
# remove non-alphanumeric from title and selftext

from pyspark.sql.functions import when, regexp_replace

for col in ['title', 'selftext']:
    df_submissions_filtered = df_submissions_filtered.withColumn(col,
        when((df_submissions_filtered[col] != '[deleted]') & (df_submissions_filtered[col] != '[removed]'), # remove non-alphanumeric if NOT [deleted] or [removed]
            regexp_replace(df_submissions_filtered[col], '[^a-zA-Z0-9]', ' ')
        ).otherwise(df_submissions_filtered[col]) # leave [deleted] or [removed] as is
    )
In [ ]:
# remove leading / trailing whitespaces for all string cols

from pyspark.sql.functions import trim

string_cols = [col_name for col_name, col_type in df_submissions_filtered.dtypes if col_type == 'string']

for col in string_cols:
    df_submissions_filtered = df_submissions_filtered.withColumn(col, trim(df_submissions_filtered[col]))
In [ ]:
# value counts for columns of interest 

cols = ['subreddit', 'author', 'title', 'selftext']

for column in cols:
    print(f"Value counts for column: {column}")
    df_submissions_filtered.groupBy(column).count().orderBy("count", ascending=False).show()
Value counts for column: subreddit
                                                                                
+------------+-----+
|   subreddit|count|
+------------+-----+
|Conservative|21520|
| Libertarian| 4448|
|changemyview| 3507|
|   socialism| 2349|
|   Economics| 1358|
|    centrist| 1194|
|Ask_Politics| 1027|
|     finance|  950|
+------------+-----+

Value counts for column: author
                                                                                
+-------------------+-----+
|             author|count|
+-------------------+-----+
|          [deleted]|10215|
|     Winterhold2000|  642|
|          guanaco55|  548|
|             f1sh98|  508|
|            canfbar|  442|
|             nimobo|  351|
|     JesusCumelette|  254|
|      ChunkyArsenio|  216|
|       Foubar_ghost|  208|
|    fintechinshorts|  183|
|             XIIXOO|  135|
|    ThePoliticalHat|  132|
|        geocentrist|  123|
|           bobbyw24|  121|
|      theradiostarz|  113|
|          cyanobyte|  104|
|          Beliavsky|  102|
|         billgigs55|  101|
|              oz4ut|   99|
|FullmetalGameMaster|   95|
+-------------------+-----+
only showing top 20 rows

Value counts for column: title
                                                                                
+--------------------+-----+
|               title|count|
+--------------------+-----+
|                    |   57|
|I just want to fi...|   20|
|You think hunter ...|   13|
|Lowering the Bar ...|   12|
|                 Lol|   12|
|Get free  200 Ama...|   11|
|               Pesto|   10|
|Audio  Trump bera...|   10|
|uThermo just rest...|   10|
|            Question|    9|
|            Thoughts|    9|
|               Truth|    9|
|          Censorship|    9|
|Chaos on Capitol ...|    8|
|Twitter permanent...|    8|
|                 Gab|    8|
|               Facts|    8|
|                True|    7|
|North Idaho inter...|    7|
|                  Hi|    7|
+--------------------+-----+
only showing top 20 rows

Value counts for column: selftext
[Stage 23:===========================================>              (3 + 1) / 4]
+--------------------+-----+
|            selftext|count|
+--------------------+-----+
|                    |18283|
|           [removed]| 8117|
|           [deleted]| 6499|
|         That is all|    5|
|Feel free to disc...|    2|
|All violence shou...|    2|
|               Title|    2|
|Hello  I consider...|    2|
|After Capitol rio...|    2|
|Sorry if this is ...|    2|
|I have seen the s...|    1|
|I would like to c...|    1|
|In recent years a...|    1|
|Socialist Alterna...|    1|
|He would obviousl...|    1|
|The r conservativ...|    1|
|There s a lot of ...|    1|
|If such a clearly...|    1|
|What would you li...|    1|
|If impeachment is...|    1|
+--------------------+-----+
only showing top 20 rows

                                                                                
In [ ]:
'''
get the count of deleted / empty posts
empty posts (meets all of the following): 
    - title = blank
    - selftext = removed, deleted, or blank
'''

from pyspark.sql.functions import length, col

is_empty = (
    (length(col('title')) < 1) &
    (
        (col('selftext') == 'removed') | 
        (col('selftext') == 'deleted') | 
        (length(col('selftext')) < 1)
    )
)

df_submissions_filtered = df_submissions_filtered.filter(~is_empty)
In [ ]:
# value counts for columns of interest AFTER filtering
for column in cols:
    print(f"Value counts for column: {column}")
    df_submissions_filtered.groupBy(column).count().orderBy("count", ascending=False).show()
    
# reference: uncleaned counts
# +------------+-----+
# |   subreddit|count|
# +------------+-----+
# |changemyview| 3507|
# |     finance|  950|
# |   socialism| 2349|
# | Libertarian| 4448|
# |Ask_Politics| 1027|
# |    centrist| 1194|
# |Conservative|21520|
# |   Economics| 1358|
# +------------+-----+
Value counts for column: subreddit
                                                                                
+------------+-----+
|   subreddit|count|
+------------+-----+
|Conservative|21493|
| Libertarian| 4448|
|changemyview| 3507|
|   socialism| 2343|
|   Economics| 1354|
|    centrist| 1194|
|Ask_Politics| 1027|
|     finance|  948|
+------------+-----+

Value counts for column: author
                                                                                
+-------------------+-----+
|             author|count|
+-------------------+-----+
|          [deleted]|10213|
|     Winterhold2000|  642|
|          guanaco55|  548|
|             f1sh98|  508|
|            canfbar|  442|
|             nimobo|  351|
|     JesusCumelette|  254|
|      ChunkyArsenio|  216|
|       Foubar_ghost|  208|
|    fintechinshorts|  183|
|             XIIXOO|  135|
|    ThePoliticalHat|  132|
|        geocentrist|  123|
|           bobbyw24|  121|
|      theradiostarz|  113|
|          cyanobyte|  104|
|          Beliavsky|  102|
|         billgigs55|  101|
|              oz4ut|   99|
|FullmetalGameMaster|   95|
+-------------------+-----+
only showing top 20 rows

Value counts for column: title
                                                                                
+--------------------+-----+
|               title|count|
+--------------------+-----+
|I just want to fi...|   20|
|                    |   18|
|You think hunter ...|   13|
|Lowering the Bar ...|   12|
|                 Lol|   12|
|Get free  200 Ama...|   11|
|               Pesto|   10|
|Audio  Trump bera...|   10|
|uThermo just rest...|   10|
|            Question|    9|
|            Thoughts|    9|
|               Truth|    9|
|          Censorship|    9|
|Chaos on Capitol ...|    8|
|Twitter permanent...|    8|
|                 Gab|    8|
|               Facts|    8|
|                True|    7|
|North Idaho inter...|    7|
|                  Hi|    7|
+--------------------+-----+
only showing top 20 rows

Value counts for column: selftext
[Stage 35:===========================================>              (3 + 1) / 4]
+--------------------+-----+
|            selftext|count|
+--------------------+-----+
|                    |18244|
|           [removed]| 8117|
|           [deleted]| 6499|
|         That is all|    5|
|Feel free to disc...|    2|
|All violence shou...|    2|
|               Title|    2|
|Hello  I consider...|    2|
|After Capitol rio...|    2|
|Sorry if this is ...|    2|
|I have seen the s...|    1|
|I would like to c...|    1|
|In recent years a...|    1|
|Socialist Alterna...|    1|
|He would obviousl...|    1|
|The r conservativ...|    1|
|There s a lot of ...|    1|
|If such a clearly...|    1|
|What would you li...|    1|
|If impeachment is...|    1|
+--------------------+-----+
only showing top 20 rows

                                                                                

Comments¶

In [ ]:
df_comments.printSchema()
root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)

Relevant columns to be kept (for now):

  1. author: Username of the person who wrote the comment. Relevant for tracking specific users or identifying patterns among users.

  2. author_cakeday: Indicates if it's the anniversary of the author joining Reddit. Typically not critical unless you're looking into the behavior of users on their "cakeday".

  3. author_flair_css_class & author_flair_text: Flair associated with the user in that subreddit. Might be relevant for categorizing users in certain subreddits.

  4. body: The content of the comment. Crucial for most analyses involving comments.

  5. can_gild: Indicates if the comment can be given a gold award. Might not be necessary unless you're studying gilding behavior.

  6. controversiality: Indicates if a comment is controversial (i.e., has a nearly equal amount of upvotes and downvotes). Can be useful to identify divisive comments.

  7. created_utc: Timestamp of the comment. Relevant for time series analysis or understanding the data's time distribution.

  8. distinguished: Whether a comment is distinguished by moderators/admins. Can be useful in subreddits with active moderation.

  9. edited: Indicates if the comment was edited after posting. Can be useful to track changes.

  10. gilded: Number of gold awards the comment received. Relevant if you're interested in the appreciation of comments.

  11. id: Unique identifier for the comment. Essential for data integrity and referencing.

  12. is_submitter: Indicates if the commenter is also the original poster. Can be useful to track interactions.

  13. link_id: ID of the post to which the comment belongs. Vital for mapping comments to posts.

  14. parent_id: ID of the parent comment or post. Relevant for understanding comment threads and hierarchies.

  15. permalink: Direct link to the comment. Useful for referencing.

  16. retrieved_on: Timestamp of when the comment was pulled. Useful for data tracking and understanding the time frame of data collection.

  17. score: Net score (upvotes - downvotes) of the comment. Critical for understanding the reception of a comment.

  18. stickied: Indicates if the comment is pinned. Relevant in subreddits with active moderation or announcements.

  19. subreddit & subreddit_id: The subreddit where the comment was posted. Essential for any subreddit-based analysis.

In [ ]:
important_features_comments = ['author' , 'author_cakeday' , 'author_flair_css_class' , 'body' , 
                               'can_gild' , 'controversiality' , 'created_utc' , 'distinguished' , 
                               'edited' , 'gilded' , 'id', 'is_submitter' , 'link_id', 'parent_id', 
                               'permalink' , 'retrieved_on', 'score', 'stickied' , 'subreddit', 
                               'subreddit_id']

df_comments_filtered = df_comments.select(important_features_comments)
In [ ]:
# remove non-alphanumeric from body

from pyspark.sql.functions import when, regexp_replace

for col in ['body']:
    df_comments_filtered = df_comments_filtered.withColumn(col,
        when((df_comments_filtered[col] != '[deleted]') & (df_comments_filtered[col] != '[removed]'), # remove non-alphanumeric if NOT [deleted] or [removed]
            regexp_replace(df_comments_filtered[col], '[^a-zA-Z0-9]', ' ')
        ).otherwise(df_comments_filtered[col]) # leave [deleted] or [removed] as is
    )
In [ ]:
# remove leading / trailing whitespaces for all string cols

from pyspark.sql.functions import trim

string_cols = [col_name for col_name, col_type in df_comments_filtered.dtypes if col_type == 'string']

for col in string_cols:
    df_comments_filtered = df_comments_filtered.withColumn(col, trim(df_comments_filtered[col]))
In [ ]:
# value counts for columns of interest 

cols = ['subreddit', 'author', 'body']

for column in cols:
    print(f"Value counts for column: {column}")
    df_comments_filtered.groupBy(column).count().orderBy("count", ascending=False).show()
Value counts for column: subreddit
                                                                                
+------------+------+
|   subreddit| count|
+------------+------+
|Conservative|570355|
| Libertarian|186529|
|changemyview|158144|
|    centrist| 50853|
|   Economics| 50770|
|   socialism| 17527|
|     finance|  8476|
|Ask_Politics|  8282|
+------------+------+

Value counts for column: author
                                                                                
+--------------------+------+
|              author| count|
+--------------------+------+
|           [deleted]|295376|
|       AutoModerator|  7740|
|            DeltaBot|  2332|
|           premer777|  1726|
|             Ansuz07|  1018|
|       russiabot1776|   973|
|           Slabraton|   958|
|      JemiSilverhand|   949|
|        donutholster|   859|
|       TinyNuggins92|   850|
|Technical-Citron-750|   847|
|       ShoutingMatch|   838|
|    Ransom__Stoddard|   837|
|          ihatedogs2|   820|
|       ConsensusHawk|   773|
|              mc2222|   738|
|             swayz38|   716|
|         notoyrobots|   706|
|      XDarkstarX1138|   669|
|    Commonusername89|   630|
+--------------------+------+
only showing top 20 rows

Value counts for column: body
[Stage 46:======================================>                   (2 + 1) / 3]
+--------------------+------+
|                body| count|
+--------------------+------+
|           [removed]|211826|
|           [deleted]| 54003|
|Rule VI    All co...|  2032|
|Looking for debat...|  1133|
|Tired of reportin...|  1088|
|                    |  1037|
|Welcome to  r ask...|   943|
|New accounts less...|   711|
|                 Yes|   454|
|                Good|   415|
|                 Lol|   351|
|We are currently ...|   318|
|                  No|   298|
|             Exactly|   260|
|              Agreed|   254|
|           Thank you|   240|
|                This|   180|
|              Thanks|   169|
|                What|   164|
|This post has bee...|   163|
+--------------------+------+
only showing top 20 rows

                                                                                
In [ ]:
'''
get the count of deleted / empty comments
empty posts: body = removed, deleted, or blank
'''

from pyspark.sql.functions import length, col

is_empty = (
    (col('body') == '[removed]') | 
    (col('body') == '[deleted]') | 
    (length(col('body')) < 1)
)

df_comments_filtered = df_comments_filtered.filter(~is_empty)
In [ ]:
# value counts for columns of interest AFTER filtering
for column in cols:
    print(f"Value counts for column: {column}")
    df_comments_filtered.groupBy(column).count().orderBy("count", ascending=False).show()
    
# reference: uncleaned counts
# +------------+------+
# |   subreddit| count|
# +------------+------+
# |changemyview|158144|
# |     finance|  8476|
# |   socialism| 17527|
# | Libertarian|186529|
# |Ask_Politics|  8282|
# |    centrist| 50853|
# |Conservative|570355|
# |   Economics| 50770|
# +------------+------+
Value counts for column: subreddit
                                                                                
+------------+------+
|   subreddit| count|
+------------+------+
|Conservative|355752|
| Libertarian|172203|
|changemyview|143094|
|    centrist| 46966|
|   Economics| 38253|
|   socialism| 14007|
|Ask_Politics|  7388|
|     finance|  6407|
+------------+------+

Value counts for column: author
                                                                                
+--------------------+-----+
|              author|count|
+--------------------+-----+
|           [deleted]|29435|
|       AutoModerator| 7740|
|            DeltaBot| 2332|
|           premer777| 1726|
|             Ansuz07| 1018|
|       russiabot1776|  971|
|           Slabraton|  958|
|      JemiSilverhand|  949|
|        donutholster|  859|
|       TinyNuggins92|  850|
|Technical-Citron-750|  846|
|       ShoutingMatch|  838|
|    Ransom__Stoddard|  835|
|          ihatedogs2|  820|
|       ConsensusHawk|  773|
|              mc2222|  738|
|             swayz38|  712|
|         notoyrobots|  700|
|      XDarkstarX1138|  669|
|    Commonusername89|  630|
+--------------------+-----+
only showing top 20 rows

Value counts for column: body
[Stage 55:>                                                         (0 + 2) / 3]
+--------------------+-----+
|                body|count|
+--------------------+-----+
|Rule VI    All co...| 2032|
|Looking for debat...| 1133|
|Tired of reportin...| 1088|
|Welcome to  r ask...|  943|
|New accounts less...|  711|
|                 Yes|  454|
|                Good|  415|
|                 Lol|  351|
|We are currently ...|  318|
|                  No|  298|
|             Exactly|  260|
|              Agreed|  254|
|           Thank you|  240|
|                This|  180|
|              Thanks|  169|
|                What|  164|
|This post has bee...|  163|
|                 Why|  158|
|Please note Reddi...|  149|
|               Based|  145|
+--------------------+-----+
only showing top 20 rows

                                                                                

Save DFs to S3¶

In [ ]:
s3_path = f"s3a://{bucket}/project/cleaned"

df_submissions_filtered.write.mode("overwrite").parquet(f'{s3_path}/submissions')
df_comments_filtered.write.mode("overwrite").parquet(f'{s3_path}/comments')
                                                                                
In [ ]:

In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]: