Processing large datasets with Apache Spark and Amazon SageMaker¶
This notebook run on Data Science 3.0 - Python 3
kernel on a ml.t3.large
instance.
Amazon SageMaker Processing Jobs are used to analyze data and evaluate machine learning models on Amazon SageMaker. With Processing, you can use a simplified, managed experience on SageMaker to run your data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation. You can also use the Amazon SageMaker Processing APIs during the experimentation phase and after the code is deployed in production to evaluate performance.
The preceding diagram shows how Amazon SageMaker spins up a Processing job. Amazon SageMaker takes your script, copies your data from Amazon Simple Storage Service (Amazon S3), and then pulls a processing container. The processing container image can either be an Amazon SageMaker built-in image or a custom image that you provide. The underlying infrastructure for a Processing job is fully managed by Amazon SageMaker. Cluster resources are provisioned for the duration of your job, and cleaned up when a job completes. The output of the Processing job is stored in the Amazon S3 bucket you specified.
Our workflow for processing large amounts of data with SageMaker¶
We can divide our workflow into two steps:
Work with a small subset of the data with Spark running in local model in a SageMaker Studio Notebook.
Once we are able to work with the small subset of data we can provide the same code (as a Python script rather than a series of interactive steps) to SageMaker Processing which launched a Spark cluster, runs out code and terminates the cluster.
In this notebook...¶
We will analyze the Pushshift Reddit dataset to be used for the project and then we will run a SageMaker Processing Job to filter out the comments and submissions from subreddits of interest. The filtered data will be stored in your account's s3 bucket and it is this filtered data that you will be using for your project.
Setup¶
We need an available Java installation to run pyspark. The easiest way to do this is to install JDK and set the proper paths using conda
import time
# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.3.0
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
IOStream.flush timed out Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.9.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.9.0 # All requested packages already installed. Note: you may need to restart the kernel to use updated packages. IOStream.flush timed out Requirement already satisfied: pyspark==3.3.0 in /opt/conda/lib/python3.10/site-packages (3.3.0) Requirement already satisfied: py4j==0.10.9.5 in /opt/conda/lib/python3.10/site-packages (from pyspark==3.3.0) (0.10.9.5) WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
Utilize S3 Data within local PySpark¶
- By specifying the
hadoop-aws
jar in our Spark config we're able to access S3 datasets using the s3a file prefix. - Since we've already authenticated ourself to SageMaker Studio , we can use our assumed SageMaker ExecutionRole for any S3 reads/writes by setting the credential provider as
ContainerCredentialsProvider
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
import time
spark = (
SparkSession.builder.appName("PySparkApp")
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider",
)
.getOrCreate()
)
print(spark.version)
3.3.0
INFO:py4j.clientserver:Error while sending or receiving. Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 503, in send_command self.socket.sendall(command.encode("utf-8")) ConnectionResetError: [Errno 104] Connection reset by peer INFO:py4j.clientserver:Closing down clientserver connection INFO:root:Exception while sending command. Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 503, in send_command self.socket.sendall(command.encode("utf-8")) ConnectionResetError: [Errno 104] Connection reset by peer During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command response = connection.send_command(command) File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 506, in send_command raise Py4JNetworkError( py4j.protocol.Py4JNetworkError: Error while sending INFO:py4j.clientserver:Closing down clientserver connection
Process S3 data with SageMaker Processing Job PySparkProcessor
¶
We are going to move the above processing code in a Python file and then submit that file to SageMaker Processing Job's PySparkProcessor
.
%%writefile ./process_askpolitics_changemyview.py
import os
import logging
import argparse
import time
# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
def main():
parser = argparse.ArgumentParser(description="app inputs and outputs")
parser.add_argument("--s3_dataset_path_commments", type=str, help="Path of dataset in S3 for reddit comments")
parser.add_argument("--s3_dataset_path_submissions", type=str, help="Path of dataset in S3 for reddit submissions")
parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
parser.add_argument("--col_name_for_filtering", type=str, help="Name of the column to filter")
# parser.add_argument("--subreddits", type=str, help="comma separate list of subreddits of interest")
args = parser.parse_args()
spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
logger.info(f"spark version = {spark.version}")
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set(
"mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
)
# Downloading the data from S3 into a Dataframe
logger.info(f"going to read {args.s3_dataset_path_commments}")
comments = spark.read.parquet(args.s3_dataset_path_commments, header=True)
logger.info(f"finished reading files...")
logger.info(f"going to read {args.s3_dataset_path_submissions}")
submissions = spark.read.parquet(args.s3_dataset_path_submissions, header=True)
logger.info(f"finished reading files...")
# filter the dataframe to only keep the subreddits of interest
subreddits = [s.strip() for s in args.subreddits.split(",")]
submissions_filtered = submissions.where(col("subreddit").isin(subreddits)) # removing lower()
comments_filtered = comments.where(col("subreddit").isin(subreddits)) # removing lower()
# save the filtered dataframes so that these files can now be used for future analysis
s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/comments"
logger.info(f"going to write comments for {subreddits} in {s3_path}")
logger.info(f"shape of the comments_filtered dataframe is {comments_filtered.count():,}x{len(comments_filtered.columns)}")
comments_filtered.write.mode("overwrite").parquet(s3_path)
s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/submissions"
logger.info(f"going to write submissions for {subreddits} in {s3_path}")
logger.info(f"shape of the submissions_filtered dataframe is {submissions_filtered.count():,}x{len(submissions_filtered.columns)}")
submissions_filtered.write.mode("overwrite").parquet(s3_path)
if __name__ == "__main__":
main()
Overwriting ./process_askpolitics_changemyview.py
Now submit this code to SageMaker Processing Job.
The code cell below was repeated manually (because of the issues we were facing) for different combinations of year and month to gatherall the data.
Eric's Method¶
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
import time
# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
base_job_name="sm-spark-project",
framework_version="3.3",
role=role,
instance_count=8,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=7200,
)
# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_logs = f"spark_logs"
col_name_for_filtering = "subreddit"
# modify this comma separated list to choose the subreddits of interest
subreddits = "changemyview, Ask_Politics"
configuration = [
{
"Classification": "spark-defaults",
"Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
}
]
years = [2021, 2022, 2023]
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
for year in years:
# comments
print(f"Working on Comments for year {year}")
s3_dataset_path_commments = f"s3://bigdatateaching/reddit-parquet/comments/year={year}/month=*/*.parquet"
output_prefix_data_comments = f"project/comments/year={year}"
spark_processor.run(
submit_app="./process_askpolitics_changemyview.py",
arguments=[
"--s3_dataset_path",
s3_dataset_path_commments,
"--s3_output_bucket",
bucket,
"--s3_output_prefix",
output_prefix_data_comments,
"--col_name_for_filtering",
col_name_for_filtering,
"--values_to_keep",
subreddits,
],
spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
logs=False,
configuration=configuration
)
time.sleep(60)
# submissions
print(f"Working on Submissions for year {year}")
s3_dataset_path_submissions = f"s3://bigdatateaching/reddit-parquet/submissions/year={year}/month=*/*.parquet"
output_prefix_data_submissions = f"project/submissions/year={year}"
spark_processor.run(
submit_app="./process_askpolitics_changemyview.py",
arguments=[
"--s3_dataset_path",
s3_dataset_path_submissions,
"--s3_output_bucket",
bucket,
"--s3_output_prefix",
output_prefix_data_submissions,
"--col_name_for_filtering",
col_name_for_filtering,
"--values_to_keep",
subreddits,
],
spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
logs=False,
configuration=configuration
)
time.sleep(60)
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-10-30-18-53-01-767
Working on Comments for year 2021 .......................................................................................................................................................................................................................!
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-10-30-19-12-11-270
Working on Submissions for year 2021 .............................................................................................................................................................................................................................................!
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-10-30-19-33-12-183
Working on Comments for year 2022 ....................................................................................................................................................................................................................!
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-10-30-19-52-06-375
Working on Submissions for year 2022 ......................................................................................................................................................................................................................................................!
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-10-30-20-13-52-119
Working on Comments for year 2023 ...................................................................................................................!
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-10-30-20-24-37-382
Working on Submissions for year 2023 .................................................................................................................................!
%%time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
base_job_name="sm-spark-project",
framework_version="3.3",
role=role,
instance_count=6,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=7200,
)
# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
s3_dataset_path_commments = "s3://bigdatateaching/reddit-parquet/comments/year=2022/month=9/*.parquet"
s3_dataset_path_submissions = "s3://bigdatateaching/reddit-parquet/submissions/year=2022/month=9/*.parquet"
output_prefix_data = "project"
output_prefix_logs = f"spark_logs"
# modify this comma separated list to choose the subreddits of interest
subreddits = "changemyview, Ask_Politics"
# run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
spark_processor.run(
submit_app="./process_askpolitics_changemyview.py",
arguments=[
"--s3_dataset_path_commments",
s3_dataset_path_commments,
"--s3_dataset_path_submissions",
s3_dataset_path_submissions,
"--s3_output_bucket",
bucket,
"--s3_output_prefix",
output_prefix_data,
"--subreddits",
subreddits,
],
spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
logs=False,
)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-10-29-22-26-44-898
...................................................................................................................!CPU times: user 3.4 s, sys: 350 ms, total: 3.75 s Wall time: 9min 49s
Read the filtered data¶
Now that we have filtered the data to only keep submissions and comments from subreddits of interest. Let us read data from the s3 path where we saved the filtered data.
%%time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
base_job_name="sm-spark-project",
framework_version="3.3",
role=role,
instance_count=6,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=7200,
)
# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
output_prefix_data = "project"
output_prefix_logs = f"spark_logs"
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml CPU times: user 363 ms, sys: 0 ns, total: 363 ms Wall time: 610 ms
%%time
s3_path = f"s3a://{bucket}/{output_prefix_data}/comments"
print(f"reading comments from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}")
reading comments from s3a://sagemaker-us-east-1-711387073580/project/comments 23/10/30 14:41:04 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
[Stage 1:===================================================> (10 + 1) / 11]
shape of the comments dataframe is 137,855x21 CPU times: user 33.8 ms, sys: 900 µs, total: 34.7 ms Wall time: 1min 6s
# check counts (ensuring all needed subreddits exist)
comments.groupBy('subreddit').count().show()
[Stage 11:==================================================> (10 + 1) / 11]
+------------+------+ | subreddit| count| +------------+------+ |changemyview|137444| |Ask_Politics| 411| +------------+------+
comments.printSchema()
root |-- author: string (nullable = true) |-- author_cakeday: boolean (nullable = true) |-- author_flair_css_class: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- body: string (nullable = true) |-- can_gild: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- created_utc: timestamp (nullable = true) |-- distinguished: string (nullable = true) |-- edited: string (nullable = true) |-- gilded: long (nullable = true) |-- id: string (nullable = true) |-- is_submitter: boolean (nullable = true) |-- link_id: string (nullable = true) |-- parent_id: string (nullable = true) |-- permalink: string (nullable = true) |-- retrieved_on: timestamp (nullable = true) |-- score: long (nullable = true) |-- stickied: boolean (nullable = true) |-- subreddit: string (nullable = true) |-- subreddit_id: string (nullable = true)
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "link_id", "id", "created_utc").show()
+------------+-----------------+--------------------+----------+---------+-------+-------------------+ | subreddit| author| body| parent_id| link_id| id| created_utc| +------------+-----------------+--------------------+----------+---------+-------+-------------------+ |Conservative| Thrownaway1211| 2nd dumbest|t1_gjyohnx|t3_l19aok|gjzhisd|2021-01-20 20:44:29| |Conservative| [deleted]| [deleted]|t1_gjzfpyh|t3_l1hhgw|gjzhite|2021-01-20 20:44:30| |Conservative| [deleted]| [removed]|t1_gjzdkd4|t3_l1dlf1|gjzhiuy|2021-01-20 20:44:30| |Conservative| premer777|God helps those w...|t1_gjzd3i6|t3_l19aok|gjzhivc|2021-01-20 20:44:30| |Conservative| Barnyard_Rich|> This country...|t1_gjzax9z|t3_l1g3b9|gjzhiwu|2021-01-20 20:44:31| |Conservative| sailor-jackn|We’re not just ge...|t1_gjzb8mv|t3_l1dlf1|gjzhixn|2021-01-20 20:44:31| |Conservative| [deleted]|You just might be...|t1_gjzgw3m|t3_l1fxyh|gjzhiyc|2021-01-20 20:44:31| |Conservative| lulskadoodle|These are all goo...| t3_l19aok|t3_l19aok|gjzhj0m|2021-01-20 20:44:32| | Libertarian| No_Consequences_|This is not what ...|t1_gjxrxoy|t3_l0zgze|gjzhj1n|2021-01-20 20:44:32| |Conservative| [deleted]| [deleted]|t1_gjzh1a0|t3_l1h8v7|gjzhj1u|2021-01-20 20:44:32| | Libertarian| [deleted]| [deleted]|t1_gjzhcc4|t3_l0oyxu|gjzhj5a|2021-01-20 20:44:34| |Conservative| AutoModerator|Looking for debat...| t3_l1hv88|t3_l1hv88|gjzhj5u|2021-01-20 20:44:34| |Conservative| [deleted]|Please stop sayin...|t1_gjyhdqz|t3_l199d1|gjzhj7r|2021-01-20 20:44:34| |Conservative| [deleted]| [removed]| t3_l1d0r7|t3_l1d0r7|gjzhj7w|2021-01-20 20:44:34| |Conservative| DanPlaysMusic|Why do you suppor...|t1_gjzel12|t3_l1dlf1|gjzhj9s|2021-01-20 20:44:35| | Libertarian| iushciuweiush|Ignorance is blis...|t1_gjz5cl0|t3_l1efor|gjzhjan|2021-01-20 20:44:35| |Conservative| mk21dvr|You forgot the "/s".|t1_gjzejva|t3_l1eoiy|gjzhjb5|2021-01-20 20:44:36| |Conservative| [deleted]| [removed]|t1_gjz9dbo|t3_l1e03j|gjzhjeq|2021-01-20 20:44:37| |Conservative| CastleBravo45|There are problem...|t1_gjzh695|t3_l1d0r7|gjzhjgz|2021-01-20 20:44:38| |Conservative|KilgoreTroutsAnus|The point of Trum...|t1_gjzb2la|t3_l1ftsv|gjzhjh1|2021-01-20 20:44:38| +------------+-----------------+--------------------+----------+---------+-------+-------------------+ only showing top 20 rows
%%time
s3_path = f"s3a://{bucket}/{output_prefix_data}/submissions"
print(f"reading submissions from {s3_path}")
submissions = spark.read.parquet(s3_path, header=True)
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}")
reading submissions from s3a://sagemaker-us-east-1-711387073580/project/submissions 23/10/30 14:42:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 5:===============================================> (4 + 1) / 5]
shape of the submissions dataframe is 2,108x68 CPU times: user 14.5 ms, sys: 3.77 ms, total: 18.3 ms Wall time: 21.8 s
# check counts (ensuring all needed subreddits exist)
submissions.groupBy('subreddit').count().show()
[Stage 8:===============================================> (4 + 1) / 5]
+------------+-----+ | subreddit|count| +------------+-----+ |changemyview| 2025| |Ask_Politics| 83| +------------+-----+
submissions.printSchema()
root |-- adserver_click_url: string (nullable = true) |-- adserver_imp_pixel: string (nullable = true) |-- archived: boolean (nullable = true) |-- author: string (nullable = true) |-- author_cakeday: boolean (nullable = true) |-- author_flair_css_class: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_id: string (nullable = true) |-- brand_safe: boolean (nullable = true) |-- contest_mode: boolean (nullable = true) |-- created_utc: timestamp (nullable = true) |-- crosspost_parent: string (nullable = true) |-- crosspost_parent_list: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- approved_at_utc: string (nullable = true) | | |-- approved_by: string (nullable = true) | | |-- archived: boolean (nullable = true) | | |-- author: string (nullable = true) | | |-- author_flair_css_class: string (nullable = true) | | |-- author_flair_text: string (nullable = true) | | |-- banned_at_utc: string (nullable = true) | | |-- banned_by: string (nullable = true) | | |-- brand_safe: boolean (nullable = true) | | |-- can_gild: boolean (nullable = true) | | |-- can_mod_post: boolean (nullable = true) | | |-- clicked: boolean (nullable = true) | | |-- contest_mode: boolean (nullable = true) | | |-- created: double (nullable = true) | | |-- created_utc: double (nullable = true) | | |-- distinguished: string (nullable = true) | | |-- domain: string (nullable = true) | | |-- downs: long (nullable = true) | | |-- edited: boolean (nullable = true) | | |-- gilded: long (nullable = true) | | |-- hidden: boolean (nullable = true) | | |-- hide_score: boolean (nullable = true) | | |-- id: string (nullable = true) | | |-- is_crosspostable: boolean (nullable = true) | | |-- is_reddit_media_domain: boolean (nullable = true) | | |-- is_self: boolean (nullable = true) | | |-- is_video: boolean (nullable = true) | | |-- likes: string (nullable = true) | | |-- link_flair_css_class: string (nullable = true) | | |-- link_flair_text: string (nullable = true) | | |-- locked: boolean (nullable = true) | | |-- media: string (nullable = true) | | |-- mod_reports: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- name: string (nullable = true) | | |-- num_comments: long (nullable = true) | | |-- num_crossposts: long (nullable = true) | | |-- num_reports: string (nullable = true) | | |-- over_18: boolean (nullable = true) | | |-- parent_whitelist_status: string (nullable = true) | | |-- permalink: string (nullable = true) | | |-- pinned: boolean (nullable = true) | | |-- quarantine: boolean (nullable = true) | | |-- removal_reason: string (nullable = true) | | |-- report_reasons: string (nullable = true) | | |-- saved: boolean (nullable = true) | | |-- score: long (nullable = true) | | |-- secure_media: string (nullable = true) | | |-- selftext: string (nullable = true) | | |-- selftext_html: string (nullable = true) | | |-- spoiler: boolean (nullable = true) | | |-- stickied: boolean (nullable = true) | | |-- subreddit: string (nullable = true) | | |-- subreddit_id: string (nullable = true) | | |-- subreddit_name_prefixed: string (nullable = true) | | |-- subreddit_type: string (nullable = true) | | |-- suggested_sort: string (nullable = true) | | |-- thumbnail: string (nullable = true) | | |-- thumbnail_height: string (nullable = true) | | |-- thumbnail_width: string (nullable = true) | | |-- title: string (nullable = true) | | |-- ups: long (nullable = true) | | |-- url: string (nullable = true) | | |-- user_reports: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- view_count: string (nullable = true) | | |-- visited: boolean (nullable = true) | | |-- whitelist_status: string (nullable = true) |-- disable_comments: boolean (nullable = true) |-- distinguished: string (nullable = true) |-- domain: string (nullable = true) |-- domain_override: string (nullable = true) |-- edited: string (nullable = true) |-- embed_type: string (nullable = true) |-- embed_url: string (nullable = true) |-- gilded: long (nullable = true) |-- hidden: boolean (nullable = true) |-- hide_score: boolean (nullable = true) |-- href_url: string (nullable = true) |-- id: string (nullable = true) |-- imp_pixel: string (nullable = true) |-- is_crosspostable: boolean (nullable = true) |-- is_reddit_media_domain: boolean (nullable = true) |-- is_self: boolean (nullable = true) |-- is_video: boolean (nullable = true) |-- link_flair_css_class: string (nullable = true) |-- link_flair_text: string (nullable = true) |-- locked: boolean (nullable = true) |-- media: struct (nullable = true) | |-- event_id: string (nullable = true) | |-- oembed: struct (nullable = true) | | |-- author_name: string (nullable = true) | | |-- author_url: string (nullable = true) | | |-- cache_age: long (nullable = true) | | |-- description: string (nullable = true) | | |-- height: long (nullable = true) | | |-- html: string (nullable = true) | | |-- provider_name: string (nullable = true) | | |-- provider_url: string (nullable = true) | | |-- thumbnail_height: long (nullable = true) | | |-- thumbnail_url: string (nullable = true) | | |-- thumbnail_width: long (nullable = true) | | |-- title: string (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- version: string (nullable = true) | | |-- width: long (nullable = true) | |-- reddit_video: struct (nullable = true) | | |-- dash_url: string (nullable = true) | | |-- duration: long (nullable = true) | | |-- fallback_url: string (nullable = true) | | |-- height: long (nullable = true) | | |-- hls_url: string (nullable = true) | | |-- is_gif: boolean (nullable = true) | | |-- scrubber_media_url: string (nullable = true) | | |-- transcoding_status: string (nullable = true) | | |-- width: long (nullable = true) | |-- type: string (nullable = true) |-- media_embed: struct (nullable = true) | |-- content: string (nullable = true) | |-- height: long (nullable = true) | |-- scrolling: boolean (nullable = true) | |-- width: long (nullable = true) |-- mobile_ad_url: string (nullable = true) |-- num_comments: long (nullable = true) |-- num_crossposts: long (nullable = true) |-- original_link: string (nullable = true) |-- over_18: boolean (nullable = true) |-- parent_whitelist_status: string (nullable = true) |-- permalink: string (nullable = true) |-- pinned: boolean (nullable = true) |-- post_hint: string (nullable = true) |-- preview: struct (nullable = true) | |-- enabled: boolean (nullable = true) | |-- images: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- id: string (nullable = true) | | | |-- resolutions: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- height: long (nullable = true) | | | | | |-- url: string (nullable = true) | | | | | |-- width: long (nullable = true) | | | |-- source: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- url: string (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- variants: struct (nullable = true) | | | | |-- gif: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- mp4: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- nsfw: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- obfuscated: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) |-- promoted: boolean (nullable = true) |-- promoted_by: string (nullable = true) |-- promoted_display_name: string (nullable = true) |-- promoted_url: string (nullable = true) |-- retrieved_on: timestamp (nullable = true) |-- score: long (nullable = true) |-- secure_media: struct (nullable = true) | |-- event_id: string (nullable = true) | |-- oembed: struct (nullable = true) | | |-- author_name: string (nullable = true) | | |-- author_url: string (nullable = true) | | |-- cache_age: long (nullable = true) | | |-- description: string (nullable = true) | | |-- height: long (nullable = true) | | |-- html: string (nullable = true) | | |-- provider_name: string (nullable = true) | | |-- provider_url: string (nullable = true) | | |-- thumbnail_height: long (nullable = true) | | |-- thumbnail_url: string (nullable = true) | | |-- thumbnail_width: long (nullable = true) | | |-- title: string (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- version: string (nullable = true) | | |-- width: long (nullable = true) | |-- type: string (nullable = true) |-- secure_media_embed: struct (nullable = true) | |-- content: string (nullable = true) | |-- height: long (nullable = true) | |-- media_domain_url: string (nullable = true) | |-- scrolling: boolean (nullable = true) | |-- width: long (nullable = true) |-- selftext: string (nullable = true) |-- spoiler: boolean (nullable = true) |-- stickied: boolean (nullable = true) |-- subreddit: string (nullable = true) |-- subreddit_id: string (nullable = true) |-- suggested_sort: string (nullable = true) |-- third_party_trackers: array (nullable = true) | |-- element: string (containsNull = true) |-- third_party_tracking: string (nullable = true) |-- third_party_tracking_2: string (nullable = true) |-- thumbnail: string (nullable = true) |-- thumbnail_height: long (nullable = true) |-- thumbnail_width: long (nullable = true) |-- title: string (nullable = true) |-- url: string (nullable = true) |-- whitelist_status: string (nullable = true)
# display a subset of columns
submissions.select("subreddit", "author", "title", "selftext", "created_utc", "num_comments").show()
+------------+-------------------+--------------------+--------------------+-------------------+------------+ | subreddit| author| title| selftext| created_utc|num_comments| +------------+-------------------+--------------------+--------------------+-------------------+------------+ |Conservative| Foubar_ghost|Liberal lawyer De...| |2021-01-06 01:19:57| 44| |changemyview| [deleted]|CMV: CallMeCarson...| [removed]|2021-01-06 01:20:24| 31| |Conservative| f1sh98|Hong Kong Police ...| |2021-01-06 01:21:27| 13| |Conservative| BluePath2|Georgia run off t...| |2021-01-06 01:24:40| 0| | Libertarian| [deleted]|Trump supporters ...| [deleted]|2021-01-06 01:25:25| 288| |Conservative| [deleted]|I dont even need ...| [deleted]|2021-01-06 01:25:31| 0| | Libertarian| GruntNumber9902|Learn from histor...|Libertarian: an a...|2021-01-06 01:31:23| 0| |Conservative| ChunkyArsenio|UK: Chief medical...| |2021-01-06 01:32:35| 6| |Conservative| Lionhearted09|Live Updates in G...| |2021-01-06 01:33:05| 847| |Conservative| 1221Wood|Just a reminder t...| |2021-01-06 01:33:30| 0| |Conservative| 3dprinteddildo|Most Georgia runo...| |2021-01-06 01:33:51| 263| |Conservative| weethomas|How much do you w...| [removed]|2021-01-06 01:34:41| 0| |Ask_Politics| nicebol|What does your da...| [removed]|2021-01-06 01:38:05| 1| |Conservative| joystickfantastic|Pence told Trump ...| |2021-01-06 01:39:35| 0| | Libertarian| rgshrey|Blatant plug for ...| |2021-01-06 01:40:13| 0| |Conservative| [deleted]|Liberal Law Profe...| [deleted]|2021-01-06 01:40:42| 0| | Libertarian|anonymous_man842740|Just today I real...|Just today I real...|2021-01-06 01:40:45| 68| |Conservative| Vimes3000|Where in Reddit i...| [removed]|2021-01-06 01:40:52| 0| |Conservative| nimobo|CNN Kicks Off 202...| |2021-01-06 01:41:36| 3| |Conservative| [deleted]|Goodbye /r/Conser...| [removed]|2021-01-06 01:42:19| 0| +------------+-------------------+--------------------+--------------------+-------------------+------------+ only showing top 20 rows
Appendix: Eric's data gathering code¶
# %%time
# import sagemaker
# from sagemaker.spark.processing import PySparkProcessor
# # Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
# role = sagemaker.get_execution_role()
# spark_processor = PySparkProcessor(
# base_job_name="sm-spark-project",
# framework_version="3.3",
# role=role,
# instance_count=6,
# instance_type="ml.m5.xlarge",
# max_runtime_in_seconds=7200,
# )
# # s3 paths
# session = sagemaker.Session()
# bucket = session.default_bucket()
# output_prefix_logs = f"spark_logs"
# col_name_for_filtering = "subreddit"
# # modify this comma separated list to choose the subreddits of interest
# subreddits = "changemyview, Ask_Politics"
# configuration = [
# {
# "Classification": "spark-defaults",
# "Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
# }
# ]
# for year in range(2022, 2024):
# # comments
# print(f"Working on Comments for year {year}")
# s3_dataset_path_commments = f"s3://bigdatateaching/reddit-parquet/comments/year={year}/month=*/*.parquet"
# output_prefix_data_comments = "project/comments"
# spark_processor.run(
# submit_app="./process_cmv_ap.py",
# arguments=[
# "--s3_dataset_path",
# s3_dataset_path_commments,
# "--s3_output_bucket",
# bucket,
# "--s3_output_prefix",
# output_prefix_data_comments,
# "--col_name_for_filtering",
# col_name_for_filtering,
# "--values_to_keep",
# subreddits,
# ],
# spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
# logs=False,
# configuration=configuration
# )
# # submissions
# print(f"Working on Submissions for year {year}")
# s3_dataset_path_submissions = f"s3://bigdatateaching/reddit-parquet/submissions/year={year}/month=*/*.parquet"
# output_prefix_data_submissions = "project/submissions"
# spark_processor.run(
# submit_app="./process_cmv_ap.py",
# arguments=[
# "--s3_dataset_path",
# s3_dataset_path_submissions,
# "--s3_output_bucket",
# bucket,
# "--s3_output_prefix",
# output_prefix_data_submissions,
# "--col_name_for_filtering",
# col_name_for_filtering,
# "--values_to_keep",
# subreddits,
# ],
# spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
# logs=False,
# configuration=configuration
# )