Setup¶
In [ ]:
# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.4.0
# install spark-nlp
%pip install spark-nlp==5.1.3
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.10.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.10.0 # All requested packages already installed. Note: you may need to restart the kernel to use updated packages. Requirement already satisfied: pyspark==3.4.0 in /opt/conda/lib/python3.10/site-packages (3.4.0) Requirement already satisfied: py4j==0.10.9.7 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.4.0) (0.10.9.7) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages. Requirement already satisfied: spark-nlp==5.1.3 in /opt/conda/lib/python3.10/site-packages (5.1.3) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
Out[ ]:
Download JARs¶
In [ ]:
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
!wget -qO- https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.1.3.jar | aws s3 cp - s3://{bucket}/project/spark-nlp-assembly-5.1.3.jar
!aws s3 ls s3://{bucket}/project/spark-nlp-assembly-5.1.3.jar
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("PySparkApp")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider",
)
.getOrCreate()
)
print(spark.version)
session = sagemaker.Session()
bucket = session.default_bucket()
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml 2023-11-19 20:28:00 708534094 spark-nlp-assembly-5.1.3.jar
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-f4429907-73aa-435b-896c-eac4ca63bdf0;1.0 confs: [default] found org.apache.hadoop#hadoop-aws;3.2.2 in central found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central :: resolution report :: resolve 247ms :: artifacts dl 17ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default] org.apache.hadoop#hadoop-aws;3.2.2 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-f4429907-73aa-435b-896c-eac4ca63bdf0 confs: [default] 0 artifacts copied, 2 already retrieved (0kB/9ms) 23/11/19 20:28:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.4.0 sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
Preparing DFs for Sentiment¶
Comments¶
In [ ]:
from pyspark.sql.functions import concat_ws, col
# Tegveer's S3 -- DO NOT CHANGE
s3_directory = f"s3a://sagemaker-us-east-1-433974840707/project/nlp_cleaned_comments/"
df_comments = spark.read.parquet(s3_directory)
df_comments.show(5)
# concat list of words into a single string column
comms = df_comments.withColumn("concat_comments", concat_ws(" ", "cleaned_body"))
# sanity check
comms = comms.select("author", "subreddit", "created_utc", "score", "controversiality", "distinguished", "gilded", "concat_comments")
comms.show(5)
# filter out empty rows
comms_rows = comms.count()
non_empty_comments = comms.filter((col("concat_comments").isNotNull()) & (col("concat_comments") != ""))
# get number of empty rows
empty_comments_count = comms_rows - non_empty_comments.count()
print(f"Number of rows in concat_comments: {comms_rows}")
print(f"Number of empty rows in concat_comments: {empty_comments_count}")
# filter for specific subreddits -- CHANGE BASED ON SUBREDDITS HERE
#subreddits = "socialism, Economics, Liberal, Conservative, Libertarian, centrist, changemyview, Ask_Politics, finance"
subreddits_list = ["Conservative", "finance"]
filtered_comms = non_empty_comments.filter(col("subreddit").isin(subreddits_list))
# write to s3
output_path = f"s3a://sagemaker-us-east-1-224518912016/project/nlp/comments/"
filtered_comms.write.mode("overwrite").parquet(output_path)
23/11/19 20:28:13 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
+------------+--------------------+--------+----------------+-----------+-------------+-------------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------+------------+--------------------+ | author| body|can_gild|controversiality|created_utc|distinguished| edited|gilded| id|is_submitter| link_id| parent_id| permalink|retrieved_on|score|stickied| subreddit|subreddit_id| cleaned_body| +------------+--------------------+--------+----------------+-----------+-------------+-------------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------+------------+--------------------+ |Rude-Way4688|Yee. I like DeSan...| true| 0| 2021-05-28| null|1.622224763E9| 0|gzs7gai| false|t3_nmztty|t1_gzs6nla|/r/Libertarian/co...| null| -30| false|Libertarian| t5_2qh63|[yee, like, desan...| | [deleted]|To be fair, this ...| false| 0| 2021-07-23| null| false| 0|h67r8pp| false|t3_opr5q9|t1_h67ea09|/r/centrist/comme...| 2022-05-14| 2| false| centrist| t5_2qx8j|[fair, south, ali...| | [deleted]| [deleted]| true| 0| 2021-02-10| null|1.613002061E9| 0|gmrrnpn| false|t3_lgcjd3|t1_gmrr2c5|/r/Libertarian/co...| null| 17| false|Libertarian| t5_2qh63| [delete]| | [deleted]|The point is covi...| false| 0| 2021-02-07| null| false| 0|gmgzk70| false|t3_leehhu| t3_leehhu|/r/Libertarian/co...| null| 3| false|Libertarian| t5_2qh63|[point, covid, ch...| | Shmodecious|Believing that tr...| true| 0| 2021-11-17| null| false| 0|hkzj1tr| false|t3_qvgqyd|t1_hky3fvx|/r/Libertarian/co...| 2022-02-27| 0| false|Libertarian| t5_2qh63|[believe, transge...| +------------+--------------------+--------+----------------+-----------+-------------+-------------+------+-------+------------+---------+----------+--------------------+------------+-----+--------+-----------+------------+--------------------+ only showing top 5 rows
+------------+-----------+-----------+-----+----------------+-------------+------+--------------------+ | author| subreddit|created_utc|score|controversiality|distinguished|gilded| concat_comments| +------------+-----------+-----------+-----+----------------+-------------+------+--------------------+ |Rude-Way4688|Libertarian| 2021-05-28| -30| 0| null| 0|yee like desantis...| | [deleted]| centrist| 2021-07-23| 2| 0| null| 0|fair south alia m...| | [deleted]|Libertarian| 2021-02-10| 17| 0| null| 0| delete| | [deleted]|Libertarian| 2021-02-07| 3| 0| null| 0|point covid choos...| | Shmodecious|Libertarian| 2021-11-17| 0| 0| null| 0|believe transgend...| +------------+-----------+-----------+-----+----------------+-------------+------+--------------------+ only showing top 5 rows
Number of rows in concat_comments: 14863477 Number of empty rows in concat_comments: 63302
Submissions¶
In [ ]:
# Tegveer's S3 -- DO NOT CHANGE
s3_directory = f"s3a://sagemaker-us-east-1-433974840707/project/nlp_cleaned_submissions/"
df_submissions = spark.read.parquet(s3_directory)
df_submissions.show(5)
# concat "cleaned_body" and "cleaned_title" new col
subs = df_submissions.withColumn(
"concat_submissions",
concat_ws(" ", "cleaned_body", "cleaned_title")
)
# sanity check
subs = subs.select("author", "subreddit", "created_utc", "score", "num_comments", "distinguished", "gilded", "concat_submissions")
subs.show(5)
# filter out empty rows
subs_rows = subs.count()
non_empty_subs = subs.filter((col("concat_submissions").isNotNull()) & (col("concat_submissions") != ""))
# get number of empty rows
empty_comments_subs = subs_rows - non_empty_subs.count()
print(f"Number of rows in concat_submissions: {subs_rows}")
print(f"Number of empty rows in concat_submissions: {empty_comments_subs}")
# filter for specific subreddits
filtered_subs = non_empty_subs.filter(col("subreddit").isin(subreddits_list))
# write to s3
output_path = f"s3a://sagemaker-us-east-1-224518912016/project/nlp/submissions/"
filtered_subs.write.mode("overwrite").parquet(output_path)
+--------------------+--------------------+--------------------+---------+-----+------------+--------------------+-----------+--------------------+------------------+--------+-------+----------------------+-------+-------+--------+--------------------+--------------------+--------------------+------+--------+-------------+--------------------+--------------------+ | author| title| selftext|subreddit|score|num_comments| permalink|created_utc| url| domain|is_video|is_self|is_reddit_media_domain|spoiler|over_18|stickied| thumbnail| media| secure_media|gilded|archived|distinguished| cleaned_body| cleaned_title| +--------------------+--------------------+--------------------+---------+-----+------------+--------------------+-----------+--------------------+------------------+--------+-------+----------------------+-------+-------+--------+--------------------+--------------------+--------------------+------+--------+-------------+--------------------+--------------------+ | IsThisReallyNate|Evidence of the W...|I’ve heard it cla...|socialism| 6| 3|/r/socialism/comm...| 2021-07-29|https://www.reddi...| self.socialism| false| true| false| false| false| false| self| null| null| 0| false| null|[ive, hear, claim...|[evidence, war, t...| | [deleted]|Starbucks are blo...| [deleted]|socialism| 11| 0|/r/socialism/comm...| 2022-04-15|https://twitter.c...| twitter.com| false| false| false| false| false| false| default|{event_id=null, t...|{event_id=null, t...| 0| false| null| [delete]|[starbucks, block...| |MedicineInevitable48| Shubham| |Economics| 1| 1|/r/Economics/comm...| 2021-02-03|https://bsshahedu...|bsshaheducation.in| false| false| false| false| false| false| default| null| null| 0| false| null| []| [shubham]| | NORDLAN|Federal judge tos...| | Liberal| 17| 2|/r/Liberal/commen...| 2021-06-22|https://www.washi...|washingtonpost.com| false| false| false| false| false| false|https://b.thumbs....| null| null| 0| false| null| []|[federal, judge, ...| | haskalah1989|Today marks 214 y...| |socialism| 1761| 43|/r/socialism/comm...| 2021-03-25|https://i.redd.it...| i.redd.it| false| false| true| false| false| false|https://b.thumbs....| null| null| 0| false| null| []|[today, mark, yea...| +--------------------+--------------------+--------------------+---------+-----+------------+--------------------+-----------+--------------------+------------------+--------+-------+----------------------+-------+-------+--------+--------------------+--------------------+--------------------+------+--------+-------------+--------------------+--------------------+ only showing top 5 rows +--------------------+---------+-----------+-----+------------+-------------+------+--------------------+ | author|subreddit|created_utc|score|num_comments|distinguished|gilded| concat_submissions| +--------------------+---------+-----------+-----+------------+-------------+------+--------------------+ | IsThisReallyNate|socialism| 2021-07-29| 6| 3| null| 0|ive hear claim ma...| | [deleted]|socialism| 2022-04-15| 11| 0| null| 0|delete starbucks ...| |MedicineInevitable48|Economics| 2021-02-03| 1| 1| null| 0| shubham| | NORDLAN| Liberal| 2021-06-22| 17| 2| null| 0|federal judge tos...| | haskalah1989|socialism| 2021-03-25| 1761| 43| null| 0|today mark year s...| +--------------------+---------+-----------+-----+------------+-------------+------+--------------------+ only showing top 5 rows
Number of rows in concat_submissions: 599908 Number of empty rows in concat_submissions: 565
Process S3 data with SageMaker Processing Job PySparkProcessor
¶
In [ ]:
%%writefile ./sentiment_conservative_finance.py
import os
import sys
import logging
import argparse
# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
)
import json
import sparknlp
import numpy as np
import pandas as pd
from sparknlp.base import *
from pyspark.ml import Pipeline
from sparknlp.annotator import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from sparknlp.pretrained import PretrainedPipeline
logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
def main():
parser = argparse.ArgumentParser(description="app inputs and outputs")
parser.add_argument("--df_target_col", type=str, help="Target Column for Sentiment Analysis")
parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")
parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
args = parser.parse_args()
logger.info(f"args={args}")
spark = SparkSession.builder \
.appName("Spark NLP")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
.getOrCreate()
logger.info(f"Spark version: {spark.version}")
logger.info(f"sparknlp version: {sparknlp.version()}")
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set(
"mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
)
# Downloading the data from S3 into a Dataframe
logger.info(f"going to read {args.s3_dataset_path}")
df = spark.read.parquet(args.s3_dataset_path, header=True)
# sentiment analysis
target_col = args.df_target_col
MODEL_NAME = 'sentimentdl_use_twitter'
logger.info(f"setting up an nlp pipeline with model={MODEL_NAME}")
documentAssembler = DocumentAssembler()\
.setInputCol(target_col)\
.setOutputCol(f"document_{target_col}")
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
.setInputCols([f"document_{target_col}"])\
.setOutputCol(f"sentence_embeddings_{target_col}")
sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
.setInputCols([f"sentence_embeddings_{target_col}"])\
.setOutputCol(f"sentiment_{target_col}")
nlpPipeline = Pipeline(
stages = [
documentAssembler,
use,
sentimentdl
])
logger.info(f"going to fit pipeline on dataframe")
nlpModel = nlpPipeline.fit(df)
logger.info(f"going to transform pipeline on dataframe")
result = nlpModel.transform(df)
# save the cleaned dataframes so that these files can now be used for future analysis
s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}"
logger.info(f"going to write data in {s3_path}")
logger.info(f"shape of the df_filtered dataframe is {result.count():,}x{len(result.columns)}")
result.write.mode("overwrite").parquet(s3_path)
if __name__ == "__main__":
main()
Writing ./sentiment_conservative_finance.py
In [ ]:
import boto3
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
account_id = boto3.client('sts').get_caller_identity()['Account']
# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
base_job_name="sm-spark-project-sentiment",
image_uri=f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark:latest",
role=role,
instance_count=8,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=7200,
)
# s3 paths
output_prefix = f"project/sentiment"
output_prefix_logs = f"spark_logs/sentiment"
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
In [ ]:
import time
# comments
df_target_col = "concat_comments"
print(f"Working on Comments Col {df_target_col}")
spark_processor.run(
submit_app="./sentiment_conservative_finance.py",
submit_jars=[f"s3://{bucket}/project/spark-nlp-assembly-5.1.3.jar"],
arguments=[
"--df_target_col",
df_target_col,
"--s3_dataset_path",
f"s3://sagemaker-us-east-1-224518912016/project/nlp/comments/",
"--s3_output_bucket",
"sagemaker-us-east-1-224518912016",
"--s3_output_key_prefix",
f"{output_prefix}/comments/",
],
spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
logs=False,
)
time.sleep(60)
# submissions
df_target_col = "concat_submissions"
print(f"Working on Submissions Col {df_target_col}")
spark_processor.run(
submit_app="./sentiment_conservative_finance.py",
submit_jars=[f"s3://{bucket}/project/spark-nlp-assembly-5.1.3.jar"],
arguments=[
"--df_target_col",
df_target_col,
"--s3_dataset_path",
f"s3://sagemaker-us-east-1-224518912016/project/nlp/submissions/",
"--s3_output_bucket",
"sagemaker-us-east-1-224518912016",
"--s3_output_key_prefix",
f"{output_prefix}/submissions/",
],
spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
logs=False,
)
Working on Comments Col concat_comments
INFO:sagemaker:Creating processing-job with name sm-spark-project-sentiment-2023-11-19-20-31-18-590
................................................................................................................................................................................................................................................................................................................................!
INFO:sagemaker:Creating processing-job with name sm-spark-project-sentiment-2023-11-19-20-59-16-709
Working on Submissions Col concat_submissions ..................................................................................................................................................!
In [ ]:
# Read all the Parquet files in the directory into a DataFrame
df_comments = spark.read.parquet(f"s3a://sagemaker-us-east-1-224518912016/project/nlp/comments/")
# check counts (ensuring all needed subreddits exist)
df_comments.groupBy('subreddit').count().show()
# get cols
print(df_comments.columns)
[Stage 21:================================================> (5 + 1) / 6]
+------------+-------+ | subreddit| count| +------------+-------+ | finance| 136308| |Conservative|5204573| +------------+-------+ ['author', 'subreddit', 'created_utc', 'score', 'controversiality', 'distinguished', 'gilded', 'concat_comments']
Submissions¶
In [ ]:
# Read all the Parquet files in the directory into a DataFrame
df_submissions = spark.read.parquet(f"s3a://sagemaker-us-east-1-224518912016/project/nlp/submissions/")
# check counts (ensuring all needed subreddits exist)
df_submissions.groupBy('subreddit').count().show()
# get cols
print(df_submissions.columns)
+------------+------+ | subreddit| count| +------------+------+ | finance| 28817| |Conservative|343660| +------------+------+ ['author', 'subreddit', 'created_utc', 'score', 'num_comments', 'distinguished', 'gilded', 'concat_submissions']
In [ ]: