Setup¶

In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.4.0

# install spark-nlp
%pip install spark-nlp==5.1.3

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done
Solving environment: done


==> WARNING: A newer version of conda exists. <==
  current version: 23.3.1
  latest version: 23.10.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.10.0



# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.
Requirement already satisfied: pyspark==3.4.0 in /opt/conda/lib/python3.10/site-packages (3.4.0)
Requirement already satisfied: py4j==0.10.9.7 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.4.0) (0.10.9.7)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Requirement already satisfied: spark-nlp==5.1.3 in /opt/conda/lib/python3.10/site-packages (5.1.3)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Out[ ]:

Download JARs¶

In [ ]:
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()

!wget -qO- https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.1.3.jar | aws s3 cp - s3://{bucket}/project/spark-nlp-assembly-5.1.3.jar
!aws s3 ls s3://{bucket}/project/spark-nlp-assembly-5.1.3.jar

# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)

session = sagemaker.Session()
bucket = session.default_bucket()
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
2023-11-19 19:54:15  708534094 spark-nlp-assembly-5.1.3.jar
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dfb4991b-7041-4afa-b0db-63eaa91dfe3b;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 256ms :: artifacts dl 21ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-dfb4991b-7041-4afa-b0db-63eaa91dfe3b
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/10ms)
23/11/19 19:54:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.4.0
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml

Preparing DFs for Sentiment¶

Comments¶

In [ ]:
from pyspark.sql.functions import concat_ws, col

# Tegveer's S3 -- DO NOT CHANGE
s3_directory = f"s3a://sagemaker-us-east-1-433974840707/project/nlp_cleaned_comments/"

df_comments = spark.read.parquet(s3_directory)
df_comments.show(5)

# concat list of words into a single string column
comms = df_comments.withColumn("concat_comments", concat_ws(" ", "cleaned_body"))

# sanity check
comms = comms.select("author", "subreddit", "created_utc", "score", "controversiality", "distinguished", "gilded", "concat_comments")
comms.show(5)

# filter out empty rows
comms_rows = comms.count()
non_empty_comments = comms.filter((col("concat_comments").isNotNull()) & (col("concat_comments") != ""))

# get number of empty rows
empty_comments_count = comms_rows - non_empty_comments.count()
print(f"Number of rows in concat_comments: {comms_rows}")
print(f"Number of empty rows in concat_comments: {empty_comments_count}")

# filter for specific subreddits -- CHANGE BASED ON SUBREDDITS HERE
#subreddits = "socialism, Economics, Liberal, Conservative, Libertarian, centrist, changemyview, Ask_Politics, finance"
subreddits_list = ["centrist", "Libertarian"]
filtered_comms = non_empty_comments.filter(col("subreddit").isin(subreddits_list))

# write to s3
output_path = f"s3a://sagemaker-us-east-1-433974840707/project/nlp/comments/"
filtered_comms.write.mode("overwrite").parquet(output_path)
23/11/19 19:54:30 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                
+------------+--------------------+--------+----------------+-----------+-------------+-------------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------+------------+--------------------+
|      author|                body|can_gild|controversiality|created_utc|distinguished|       edited|gilded|     id|is_submitter|  link_id| parent_id|           permalink|retrieved_on|score|stickied|  subreddit|subreddit_id|        cleaned_body|
+------------+--------------------+--------+----------------+-----------+-------------+-------------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------+------------+--------------------+
|Rude-Way4688|Yee. I like DeSan...|    true|               0| 2021-05-28|         null|1.622224763E9|     0|gzs7gai|       false|t3_nmztty|t1_gzs6nla|/r/Libertarian/co...|        null|  -30|   false|Libertarian|    t5_2qh63|[yee, like, desan...|
|   [deleted]|To be fair, this ...|   false|               0| 2021-07-23|         null|        false|     0|h67r8pp|       false|t3_opr5q9|t1_h67ea09|/r/centrist/comme...|  2022-05-14|    2|   false|   centrist|    t5_2qx8j|[fair, south, ali...|
|   [deleted]|           [deleted]|    true|               0| 2021-02-10|         null|1.613002061E9|     0|gmrrnpn|       false|t3_lgcjd3|t1_gmrr2c5|/r/Libertarian/co...|        null|   17|   false|Libertarian|    t5_2qh63|            [delete]|
|   [deleted]|The point is covi...|   false|               0| 2021-02-07|         null|        false|     0|gmgzk70|       false|t3_leehhu| t3_leehhu|/r/Libertarian/co...|        null|    3|   false|Libertarian|    t5_2qh63|[point, covid, ch...|
| Shmodecious|Believing that tr...|    true|               0| 2021-11-17|         null|        false|     0|hkzj1tr|       false|t3_qvgqyd|t1_hky3fvx|/r/Libertarian/co...|  2022-02-27|    0|   false|Libertarian|    t5_2qh63|[believe, transge...|
+------------+--------------------+--------+----------------+-----------+-------------+-------------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------+------------+--------------------+
only showing top 5 rows

                                                                                
+------------+-----------+-----------+-----+----------------+-------------+------+--------------------+
|      author|  subreddit|created_utc|score|controversiality|distinguished|gilded|     concat_comments|
+------------+-----------+-----------+-----+----------------+-------------+------+--------------------+
|Rude-Way4688|Libertarian| 2021-05-28|  -30|               0|         null|     0|yee like desantis...|
|   [deleted]|   centrist| 2021-07-23|    2|               0|         null|     0|fair south alia m...|
|   [deleted]|Libertarian| 2021-02-10|   17|               0|         null|     0|              delete|
|   [deleted]|Libertarian| 2021-02-07|    3|               0|         null|     0|point covid choos...|
| Shmodecious|Libertarian| 2021-11-17|    0|               0|         null|     0|believe transgend...|
+------------+-----------+-----------+-----+----------------+-------------+------+--------------------+
only showing top 5 rows

                                                                                
Number of rows in concat_comments: 14863477
Number of empty rows in concat_comments: 63302
                                                                                

Submissions¶

In [ ]:
# Tegveer's S3 -- DO NOT CHANGE
s3_directory = f"s3a://sagemaker-us-east-1-433974840707/project/nlp_cleaned_submissions/"

df_submissions = spark.read.parquet(s3_directory)
df_submissions.show(5)

# concat "cleaned_body" and "cleaned_title" new col
subs = df_submissions.withColumn(
    "concat_submissions",
    concat_ws(" ", "cleaned_body", "cleaned_title")
)

# sanity check
subs = subs.select("author", "subreddit", "created_utc", "score", "num_comments", "distinguished", "gilded", "concat_submissions")
subs.show(5)

# filter out empty rows
subs_rows = subs.count()
non_empty_subs = subs.filter((col("concat_submissions").isNotNull()) & (col("concat_submissions") != ""))

# get number of empty rows
empty_comments_subs = subs_rows - non_empty_subs.count()
print(f"Number of rows in concat_submissions: {subs_rows}")
print(f"Number of empty rows in concat_submissions: {empty_comments_subs}")

# filter for specific subreddits
filtered_subs = non_empty_subs.filter(col("subreddit").isin(subreddits_list))

# write to s3
output_path = f"s3a://sagemaker-us-east-1-433974840707/project/nlp/submissions/"
filtered_subs.write.mode("overwrite").parquet(output_path)
+--------------------+--------------------+--------------------+---------+-----+------------+--------------------+-----------+--------------------+------------------+--------+-------+----------------------+-------+-------+--------+--------------------+--------------------+--------------------+------+--------+-------------+--------------------+--------------------+
|              author|               title|            selftext|subreddit|score|num_comments|           permalink|created_utc|                 url|            domain|is_video|is_self|is_reddit_media_domain|spoiler|over_18|stickied|           thumbnail|               media|        secure_media|gilded|archived|distinguished|        cleaned_body|       cleaned_title|
+--------------------+--------------------+--------------------+---------+-----+------------+--------------------+-----------+--------------------+------------------+--------+-------+----------------------+-------+-------+--------+--------------------+--------------------+--------------------+------+--------+-------------+--------------------+--------------------+
|    IsThisReallyNate|Evidence of the W...|I’ve heard it cla...|socialism|    6|           3|/r/socialism/comm...| 2021-07-29|https://www.reddi...|    self.socialism|   false|   true|                 false|  false|  false|   false|                self|                null|                null|     0|   false|         null|[ive, hear, claim...|[evidence, war, t...|
|           [deleted]|Starbucks are blo...|           [deleted]|socialism|   11|           0|/r/socialism/comm...| 2022-04-15|https://twitter.c...|       twitter.com|   false|  false|                 false|  false|  false|   false|             default|{event_id=null, t...|{event_id=null, t...|     0|   false|         null|            [delete]|[starbucks, block...|
|MedicineInevitable48|             Shubham|                    |Economics|    1|           1|/r/Economics/comm...| 2021-02-03|https://bsshahedu...|bsshaheducation.in|   false|  false|                 false|  false|  false|   false|             default|                null|                null|     0|   false|         null|                  []|           [shubham]|
|             NORDLAN|Federal judge tos...|                    |  Liberal|   17|           2|/r/Liberal/commen...| 2021-06-22|https://www.washi...|washingtonpost.com|   false|  false|                 false|  false|  false|   false|https://b.thumbs....|                null|                null|     0|   false|         null|                  []|[federal, judge, ...|
|        haskalah1989|Today marks 214 y...|                    |socialism| 1761|          43|/r/socialism/comm...| 2021-03-25|https://i.redd.it...|         i.redd.it|   false|  false|                  true|  false|  false|   false|https://b.thumbs....|                null|                null|     0|   false|         null|                  []|[today, mark, yea...|
+--------------------+--------------------+--------------------+---------+-----+------------+--------------------+-----------+--------------------+------------------+--------+-------+----------------------+-------+-------+--------+--------------------+--------------------+--------------------+------+--------+-------------+--------------------+--------------------+
only showing top 5 rows

+--------------------+---------+-----------+-----+------------+-------------+------+--------------------+
|              author|subreddit|created_utc|score|num_comments|distinguished|gilded|  concat_submissions|
+--------------------+---------+-----------+-----+------------+-------------+------+--------------------+
|    IsThisReallyNate|socialism| 2021-07-29|    6|           3|         null|     0|ive hear claim ma...|
|           [deleted]|socialism| 2022-04-15|   11|           0|         null|     0|delete starbucks ...|
|MedicineInevitable48|Economics| 2021-02-03|    1|           1|         null|     0|             shubham|
|             NORDLAN|  Liberal| 2021-06-22|   17|           2|         null|     0|federal judge tos...|
|        haskalah1989|socialism| 2021-03-25| 1761|          43|         null|     0|today mark year s...|
+--------------------+---------+-----------+-----+------------+-------------+------+--------------------+
only showing top 5 rows

                                                                                
Number of rows in concat_submissions: 599908
Number of empty rows in concat_submissions: 565
                                                                                

Process S3 data with SageMaker Processing Job PySparkProcessor¶

In [ ]:
%%writefile ./sentiment_centrist_libertarian.py

import os
import sys
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)

import json
import sparknlp
import numpy as np
import pandas as pd
from sparknlp.base import *
from pyspark.ml import Pipeline
from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sparknlp.pretrained import PretrainedPipeline

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--df_target_col", type=str, help="Target Column for Sentiment Analysis")
    parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()
    logger.info(f"args={args}")
    
    spark = SparkSession.builder \
        .appName("Spark NLP")\
        .config("spark.driver.memory","16G")\
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.kryoserializer.buffer.max", "2000M")\
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
        .getOrCreate()
    
    logger.info(f"Spark version: {spark.version}")
    logger.info(f"sparknlp version: {sparknlp.version()}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )
    
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path}")
    df = spark.read.parquet(args.s3_dataset_path, header=True)
    
    # sentiment analysis
    target_col = args.df_target_col
    MODEL_NAME = 'sentimentdl_use_twitter'
    logger.info(f"setting up an nlp pipeline with model={MODEL_NAME}")
    documentAssembler = DocumentAssembler()\
    .setInputCol(target_col)\
    .setOutputCol(f"document_{target_col}")
    
    use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
     .setInputCols([f"document_{target_col}"])\
     .setOutputCol(f"sentence_embeddings_{target_col}")

    sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
    .setInputCols([f"sentence_embeddings_{target_col}"])\
    .setOutputCol(f"sentiment_{target_col}")

    nlpPipeline = Pipeline(
      stages = [
          documentAssembler,
          use,
          sentimentdl
      ])

    logger.info(f"going to fit pipeline on dataframe")
    nlpModel = nlpPipeline.fit(df)
    
    logger.info(f"going to transform pipeline on dataframe")
    result = nlpModel.transform(df)
    
    # save the cleaned dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}"
    logger.info(f"going to write data in {s3_path}")
    logger.info(f"shape of the df_filtered dataframe is {result.count():,}x{len(result.columns)}")
    result.write.mode("overwrite").parquet(s3_path)
    
if __name__ == "__main__":
    main()
Writing ./sentiment_centrist_libertarian.py
In [ ]:
import boto3
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

account_id = boto3.client('sts').get_caller_identity()['Account']

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project-sentiment",
    image_uri=f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark:latest",
    role=role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=7200,
)

# s3 paths
output_prefix = f"project/sentiment"
output_prefix_logs = f"spark_logs/sentiment"
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
In [ ]:
import time

# comments
df_target_col = "concat_comments"
print(f"Working on Comments Col {df_target_col}")
spark_processor.run(
    submit_app="./sentiment_centrist_libertarian.py",
    submit_jars=[f"s3://{bucket}/project/spark-nlp-assembly-5.1.3.jar"],
    arguments=[
        "--df_target_col",
        df_target_col,
        "--s3_dataset_path",
        f"s3://sagemaker-us-east-1-433974840707/project/nlp/comments/",
        "--s3_output_bucket",
        "sagemaker-us-east-1-433974840707",
        "--s3_output_key_prefix",
        f"{output_prefix}/comments/",
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
)

time.sleep(60)

# submissions
df_target_col = "concat_submissions"
print(f"Working on Submissions Col {df_target_col}")
spark_processor.run(
    submit_app="./sentiment_centrist_libertarian.py",
    submit_jars=[f"s3://{bucket}/project/spark-nlp-assembly-5.1.3.jar"],
    arguments=[
        "--df_target_col",
        df_target_col,
        "--s3_dataset_path",
        f"s3://sagemaker-us-east-1-433974840707/project/nlp/submissions/",
        "--s3_output_bucket",
        "sagemaker-us-east-1-433974840707",
        "--s3_output_key_prefix",
        f"{output_prefix}/submissions/",
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
)
Working on Comments Col concat_comments
INFO:sagemaker:Creating processing-job with name sm-spark-project-sentiment-2023-11-19-19-57-26-245
...................................................................................................................................................................................................................!
INFO:sagemaker:Creating processing-job with name sm-spark-project-sentiment-2023-11-19-20-16-15-379
Working on Submissions Col concat_submissions
............................................................................................................................!

Read in Sentiment Data¶

Comments¶

In [ ]:
# Read all the Parquet files in the directory into a DataFrame
df_comments = spark.read.parquet(f"s3a://sagemaker-us-east-1-433974840707/project/nlp/comments/")

# check counts (ensuring all needed subreddits exist)
df_comments.groupBy('subreddit').count().show()

# get cols
print(df_comments.columns)
[Stage 21:==============================================>           (4 + 1) / 5]
+-----------+-------+
|  subreddit|  count|
+-----------+-------+
|Libertarian|2691208|
|   centrist| 917568|
+-----------+-------+

['author', 'subreddit', 'created_utc', 'score', 'controversiality', 'distinguished', 'gilded', 'concat_comments']
                                                                                

Submissions¶

In [ ]:
# Read all the Parquet files in the directory into a DataFrame
df_submissions = spark.read.parquet(f"s3a://sagemaker-us-east-1-433974840707/project/nlp/submissions/")

# check counts (ensuring all needed subreddits exist)
df_submissions.groupBy('subreddit').count().show()

# get cols
print(df_submissions.columns)
+-----------+-----+
|  subreddit|count|
+-----------+-----+
|Libertarian|51136|
|   centrist|13585|
+-----------+-----+

['author', 'subreddit', 'created_utc', 'score', 'num_comments', 'distinguished', 'gilded', 'concat_submissions']
In [ ]: