spark
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 6, Finished, Available)
SparkSession - hive
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 20, Finished, Available)
import os
from pyspark.sql.functions import col, count, when
import pandas as pd
from pyspark.sql.functions import length
from pyspark.sql import functions as F
datastore = 'azureml://datastores/workspaceblobstore/paths/'
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 21, Finished, Available)
submissions_path = 'filtered-submissions'
submissions_df = spark.read.parquet(f"{datastore}{submissions_path}")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 22, Finished, Available)
PLOT_DIR = os.path.join("Users/sk2224/fall-2023-reddit-project-team-34/data", "plots")
CSV_DIR = os.path.join("Users/sk2224/fall-2023-reddit-project-team-34/data", "csv")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 11, Finished, Available)
Drop unnecessary columns:
# # List of columns to be dropped
# columns_to_drop = [
# 'promoted', 'promoted_by', 'promoted_display_name', 'promoted_url',
# 'secure_media', 'secure_media_embed', 'thumbnail', 'thumbnail_height',
# 'thumbnail_width', 'third_party_trackers', 'third_party_tracking',
# 'third_party_tracking_2', 'source', 'retrieved_on', 'suggested_sort',
# 'spoiler', 'stickied', 'whitelist_status'
# ]
# # Dropping the columns
# submissions_df = submissions_df.drop(*columns_to_drop)
StatementMeta(87608f1f-5281-477c-8a41-f00a9ddf3071, 2, 10, Finished, Available)
Shape of the data:
# submissions_row_count = submissions_df.count()
# submissions_col_count = len(submissions_df.columns)
# print(f"Shape of the submissions dataframe is {submissions_row_count:,}x{submissions_col_count}")
StatementMeta(87608f1f-5281-477c-8a41-f00a9ddf3071, 2, 9, Finished, Available)
Breakdown of subreddit counts:
# subreddit_count_df = submissions_df.groupBy('subreddit').count().toPandas()
StatementMeta(87608f1f-5281-477c-8a41-f00a9ddf3071, 2, 67, Finished, Available)
submissions_df.groupBy('subreddit').count().show()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 35, 11, Finished, Available)
+--------------------+------+ | subreddit| count| +--------------------+------+ | anime|404298| | television| 89586| |televisionsuggest...| 7991| | movies|382085| | Animesuggest| 74101| | MovieSuggestions| 58907| +--------------------+------+
# subreddit_count_df.to_csv(f"{CSV_DIR}/subreddit_count.csv")
StatementMeta(87608f1f-5281-477c-8a41-f00a9ddf3071, 2, 70, Finished, Available)
Number of partitions in RDD:
# get number of partitions
submissions_df.rdd.getNumPartitions()
StatementMeta(87608f1f-5281-477c-8a41-f00a9ddf3071, 2, 11, Finished, Available)
100
Columns in the data:
# print schema
submissions_df.printSchema()
StatementMeta(87608f1f-5281-477c-8a41-f00a9ddf3071, 2, 12, Finished, Available)
root |-- adserver_click_url: string (nullable = true) |-- adserver_imp_pixel: string (nullable = true) |-- archived: boolean (nullable = true) |-- author: string (nullable = true) |-- author_cakeday: boolean (nullable = true) |-- author_flair_css_class: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_id: string (nullable = true) |-- brand_safe: boolean (nullable = true) |-- contest_mode: boolean (nullable = true) |-- created_utc: timestamp (nullable = true) |-- crosspost_parent: string (nullable = true) |-- crosspost_parent_list: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- approved_at_utc: string (nullable = true) | | |-- approved_by: string (nullable = true) | | |-- archived: boolean (nullable = true) | | |-- author: string (nullable = true) | | |-- author_flair_css_class: string (nullable = true) | | |-- author_flair_text: string (nullable = true) | | |-- banned_at_utc: string (nullable = true) | | |-- banned_by: string (nullable = true) | | |-- brand_safe: boolean (nullable = true) | | |-- can_gild: boolean (nullable = true) | | |-- can_mod_post: boolean (nullable = true) | | |-- clicked: boolean (nullable = true) | | |-- contest_mode: boolean (nullable = true) | | |-- created: double (nullable = true) | | |-- created_utc: double (nullable = true) | | |-- distinguished: string (nullable = true) | | |-- domain: string (nullable = true) | | |-- downs: long (nullable = true) | | |-- edited: boolean (nullable = true) | | |-- gilded: long (nullable = true) | | |-- hidden: boolean (nullable = true) | | |-- hide_score: boolean (nullable = true) | | |-- id: string (nullable = true) | | |-- is_crosspostable: boolean (nullable = true) | | |-- is_reddit_media_domain: boolean (nullable = true) | | |-- is_self: boolean (nullable = true) | | |-- is_video: boolean (nullable = true) | | |-- likes: string (nullable = true) | | |-- link_flair_css_class: string (nullable = true) | | |-- link_flair_text: string (nullable = true) | | |-- locked: boolean (nullable = true) | | |-- media: string (nullable = true) | | |-- mod_reports: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- name: string (nullable = true) | | |-- num_comments: long (nullable = true) | | |-- num_crossposts: long (nullable = true) | | |-- num_reports: string (nullable = true) | | |-- over_18: boolean (nullable = true) | | |-- parent_whitelist_status: string (nullable = true) | | |-- permalink: string (nullable = true) | | |-- pinned: boolean (nullable = true) | | |-- quarantine: boolean (nullable = true) | | |-- removal_reason: string (nullable = true) | | |-- report_reasons: string (nullable = true) | | |-- saved: boolean (nullable = true) | | |-- score: long (nullable = true) | | |-- secure_media: string (nullable = true) | | |-- selftext: string (nullable = true) | | |-- selftext_html: string (nullable = true) | | |-- spoiler: boolean (nullable = true) | | |-- stickied: boolean (nullable = true) | | |-- subreddit: string (nullable = true) | | |-- subreddit_id: string (nullable = true) | | |-- subreddit_name_prefixed: string (nullable = true) | | |-- subreddit_type: string (nullable = true) | | |-- suggested_sort: string (nullable = true) | | |-- thumbnail: string (nullable = true) | | |-- thumbnail_height: string (nullable = true) | | |-- thumbnail_width: string (nullable = true) | | |-- title: string (nullable = true) | | |-- ups: long (nullable = true) | | |-- url: string (nullable = true) | | |-- user_reports: array (nullable = true) | | | |-- element: string (containsNull = true) | | |-- view_count: string (nullable = true) | | |-- visited: boolean (nullable = true) | | |-- whitelist_status: string (nullable = true) |-- disable_comments: boolean (nullable = true) |-- distinguished: string (nullable = true) |-- domain: string (nullable = true) |-- domain_override: string (nullable = true) |-- edited: string (nullable = true) |-- embed_type: string (nullable = true) |-- embed_url: string (nullable = true) |-- gilded: long (nullable = true) |-- hidden: boolean (nullable = true) |-- hide_score: boolean (nullable = true) |-- href_url: string (nullable = true) |-- id: string (nullable = true) |-- imp_pixel: string (nullable = true) |-- is_crosspostable: boolean (nullable = true) |-- is_reddit_media_domain: boolean (nullable = true) |-- is_self: boolean (nullable = true) |-- is_video: boolean (nullable = true) |-- link_flair_css_class: string (nullable = true) |-- link_flair_text: string (nullable = true) |-- locked: boolean (nullable = true) |-- media: struct (nullable = true) | |-- event_id: string (nullable = true) | |-- oembed: struct (nullable = true) | | |-- author_name: string (nullable = true) | | |-- author_url: string (nullable = true) | | |-- cache_age: long (nullable = true) | | |-- description: string (nullable = true) | | |-- height: long (nullable = true) | | |-- html: string (nullable = true) | | |-- provider_name: string (nullable = true) | | |-- provider_url: string (nullable = true) | | |-- thumbnail_height: long (nullable = true) | | |-- thumbnail_url: string (nullable = true) | | |-- thumbnail_width: long (nullable = true) | | |-- title: string (nullable = true) | | |-- type: string (nullable = true) | | |-- url: string (nullable = true) | | |-- version: string (nullable = true) | | |-- width: long (nullable = true) | |-- reddit_video: struct (nullable = true) | | |-- dash_url: string (nullable = true) | | |-- duration: long (nullable = true) | | |-- fallback_url: string (nullable = true) | | |-- height: long (nullable = true) | | |-- hls_url: string (nullable = true) | | |-- is_gif: boolean (nullable = true) | | |-- scrubber_media_url: string (nullable = true) | | |-- transcoding_status: string (nullable = true) | | |-- width: long (nullable = true) | |-- type: string (nullable = true) |-- media_embed: struct (nullable = true) | |-- content: string (nullable = true) | |-- height: long (nullable = true) | |-- scrolling: boolean (nullable = true) | |-- width: long (nullable = true) |-- mobile_ad_url: string (nullable = true) |-- num_comments: long (nullable = true) |-- num_crossposts: long (nullable = true) |-- original_link: string (nullable = true) |-- over_18: boolean (nullable = true) |-- parent_whitelist_status: string (nullable = true) |-- permalink: string (nullable = true) |-- pinned: boolean (nullable = true) |-- post_hint: string (nullable = true) |-- preview: struct (nullable = true) | |-- enabled: boolean (nullable = true) | |-- images: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- id: string (nullable = true) | | | |-- resolutions: array (nullable = true) | | | | |-- element: struct (containsNull = true) | | | | | |-- height: long (nullable = true) | | | | | |-- url: string (nullable = true) | | | | | |-- width: long (nullable = true) | | | |-- source: struct (nullable = true) | | | | |-- height: long (nullable = true) | | | | |-- url: string (nullable = true) | | | | |-- width: long (nullable = true) | | | |-- variants: struct (nullable = true) | | | | |-- gif: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- mp4: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- nsfw: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) | | | | |-- obfuscated: struct (nullable = true) | | | | | |-- resolutions: array (nullable = true) | | | | | | |-- element: struct (containsNull = true) | | | | | | | |-- height: long (nullable = true) | | | | | | | |-- url: string (nullable = true) | | | | | | | |-- width: long (nullable = true) | | | | | |-- source: struct (nullable = true) | | | | | | |-- height: long (nullable = true) | | | | | | |-- url: string (nullable = true) | | | | | | |-- width: long (nullable = true) |-- score: long (nullable = true) |-- selftext: string (nullable = true) |-- subreddit: string (nullable = true) |-- subreddit_id: string (nullable = true) |-- title: string (nullable = true) |-- url: string (nullable = true)
Subset the data for faster processing:
submissions_df_small = submissions_df.sample(withReplacement=False, fraction=0.01)
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 40, 11, Finished, Available)
# take a subset of columns
df = submissions_df_small.select("subreddit", "author", "title", "selftext",
"created_utc", "num_comments", "score",
"over_18", "media", "pinned", "locked",
"disable_comments", "domain", "hidden",
"distinguished", "hide_score")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 40, 12, Finished, Available)
# take a subset of columns
df = submissions_df.select("subreddit", "author", "title", "selftext",
"created_utc", "num_comments", "score",
"over_18", "media", "pinned", "locked",
"disable_comments", "domain", "hidden",
"distinguished", "hide_score")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 23, Finished, Available)
Count the missing values in data:
# find missing values
missing_vals = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 36, 17, Finished, Available)
missing_vals.show()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 36, 18, Finished, Available)
+---------+------+-----+--------+-----------+------------+-----+-------+------+------+------+----------------+------+------+-------------+----------+ |subreddit|author|title|selftext|created_utc|num_comments|score|over_18| media|pinned|locked|disable_comments|domain|hidden|distinguished|hide_score| +---------+------+-----+--------+-----------+------------+-----+-------+------+------+------+----------------+------+------+-------------+----------+ | 0| 0| 0| 0| 0| 0| 0| 0|892405| 0| 0| 1016968| 8952| 0| 1015530| 0| +---------+------+-----+--------+-----------+------------+-----+-------+------+------+------+----------------+------+------+-------------+----------+
# Manually inputting data
data = {
'subreddit': [0],
'author': [0],
'title': [0],
'selftext': [0],
'created_utc': [0],
'num_comments': [0],
'score': [0],
'over_18': [0],
'media': [892405],
'pinned': [0],
'locked': [0],
'disable_comments': [1016968],
'domain': [8952],
'hidden': [0],
'distinguished': [1015530],
'hide_score': [0]
}
# Creating a Pandas DataFrame
missing_vals_pd = pd.DataFrame(data)
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 36, 20, Finished, Available)
missing_vals_pd
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 36, 21, Finished, Available)
subreddit | author | title | selftext | created_utc | num_comments | score | over_18 | media | pinned | locked | disable_comments | domain | hidden | distinguished | hide_score | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 892405 | 0 | 0 | 1016968 | 8952 | 0 | 1015530 | 0 |
# missing_vals_pd = missing_vals.toPandas()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 35, 18, Finished, Available)
# Convert the DataFrame to long format
df_long = missing_vals_pd.melt(var_name='Column', value_name='Missing Values')
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 36, 22, Finished, Available)
df_long
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 36, 23, Finished, Available)
Column | Missing Values | |
---|---|---|
0 | subreddit | 0 |
1 | author | 0 |
2 | title | 0 |
3 | selftext | 0 |
4 | created_utc | 0 |
5 | num_comments | 0 |
6 | score | 0 |
7 | over_18 | 0 |
8 | media | 892405 |
9 | pinned | 0 |
10 | locked | 0 |
11 | disable_comments | 1016968 |
12 | domain | 8952 |
13 | hidden | 0 |
14 | distinguished | 1015530 |
15 | hide_score | 0 |
df_long.to_csv(f"{CSV_DIR}/num_missing_val.csv", index=False)
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 36, 25, Finished, Available)
Counting the length of the posted submission:
# calculate post length
df = df.withColumn('post_length', length(df.title) + length(df.selftext))
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 24, Finished, Available)
Extracting meaningful information from datetime column:
# convert to datetime
df = df.withColumn('created_utc', F.to_timestamp('created_utc'))
# Extract time-based features
df = df.withColumn('hour_of_day', F.hour('created_utc'))
df = df.withColumn('day_of_week', F.dayofweek('created_utc')) # 1 (Sunday) to 7 (Saturday)
# Map each day of the week from numeric to string
df = df.withColumn('day_of_week_str', F.expr("""
CASE day_of_week
WHEN 1 THEN 'Sunday'
WHEN 2 THEN 'Monday'
WHEN 3 THEN 'Tuesday'
WHEN 4 THEN 'Wednesday'
WHEN 5 THEN 'Thursday'
WHEN 6 THEN 'Friday'
WHEN 7 THEN 'Saturday'
END
"""))
df = df.withColumn('day_of_month', F.dayofmonth('created_utc'))
df = df.withColumn('month', F.month('created_utc'))
df = df.withColumn('year', F.year('created_utc'))
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 25, Finished, Available)
Converting media column to boolean:
# convert missing values to False
df = df.withColumn('has_media', F.col('media').isNotNull())
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 26, Finished, Available)
# drop unnecessary columns
df = df.drop(*["media", "created_utc", "disable_comments", "distinguished"])
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 27, Finished, Available)
df.printSchema()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 28, Finished, Available)
root |-- subreddit: string (nullable = true) |-- author: string (nullable = true) |-- title: string (nullable = true) |-- selftext: string (nullable = true) |-- num_comments: long (nullable = true) |-- score: long (nullable = true) |-- over_18: boolean (nullable = true) |-- pinned: boolean (nullable = true) |-- locked: boolean (nullable = true) |-- domain: string (nullable = true) |-- hidden: boolean (nullable = true) |-- hide_score: boolean (nullable = true) |-- post_length: integer (nullable = true) |-- hour_of_day: integer (nullable = true) |-- day_of_week: integer (nullable = true) |-- day_of_week_str: string (nullable = true) |-- day_of_month: integer (nullable = true) |-- month: integer (nullable = true) |-- year: integer (nullable = true) |-- has_media: boolean (nullable = false)
# Define the path to your CSV file
csv_file_path_1 = f"{datastore}csv/subreddit_count_eda.csv"
csv_file_path_2 = f"{datastore}csv/num_missing_val_eda.csv"
csv_file_path_3 = f"{datastore}csv/datetime_counts_eda.csv"
# Read the CSV file using PySpark
df_subreddit_count = spark.read.csv(csv_file_path_1, header=True, inferSchema=True)
df_num_missing = spark.read.csv(csv_file_path_2, header=True, inferSchema=True)
df_datetime = spark.read.csv(csv_file_path_3, header=True, inferSchema=True)
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 16, Finished, Available)
df_subreddit_count_pd = df_subreddit_count.toPandas()
df_num_missing_pd = df_num_missing.toPandas()
df_datetime_pd = df_datetime.toPandas()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 47, 17, Finished, Available)
# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.4.0
# install spark-nlp
%pip install spark-nlp==5.1.3
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Retrieving notices: ...working... done Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.11.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.11.0 ## Package Plan ## environment location: /opt/conda added / updated specs: - openjdk The following packages will be downloaded: package | build ---------------------------|----------------- ca-certificates-2023.08.22 | h06a4308_0 123 KB certifi-2023.11.17 | py310h06a4308_0 158 KB openjdk-11.0.13 | h87a67e3_0 341.0 MB ------------------------------------------------------------ Total: 341.3 MB The following NEW packages will be INSTALLED: openjdk pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 The following packages will be UPDATED: ca-certificates conda-forge::ca-certificates-2023.7.2~ --> pkgs/main::ca-certificates-2023.08.22-h06a4308_0 certifi conda-forge/noarch::certifi-2023.7.22~ --> pkgs/main/linux-64::certifi-2023.11.17-py310h06a4308_0 Downloading and Extracting Packages openjdk-11.0.13 | 341.0 MB | | 0% certifi-2023.11.17 | 158 KB | | 0% ca-certificates-2023 | 123 KB | | 0% ca-certificates-2023 | 123 KB | ##################################### | 100% Preparing transaction: done Verifying transaction: done Executing transaction: done Note: you may need to restart the kernel to use updated packages. Collecting pyspark==3.4.0 Using cached pyspark-3.4.0-py2.py3-none-any.whl Collecting py4j==0.10.9.7 (from pyspark==3.4.0) Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB) Installing collected packages: py4j, pyspark Successfully installed py4j-0.10.9.7 pyspark-3.4.0 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages. Collecting spark-nlp==5.1.3 Obtaining dependency information for spark-nlp==5.1.3 from https://files.pythonhosted.org/packages/cd/7d/bc0eca4c9ec4c9c1d9b28c42c2f07942af70980a7d912d0aceebf8db32dd/spark_nlp-5.1.3-py2.py3-none-any.whl.metadata Using cached spark_nlp-5.1.3-py2.py3-none-any.whl.metadata (53 kB) Using cached spark_nlp-5.1.3-py2.py3-none-any.whl (537 kB) Installing collected packages: spark-nlp Successfully installed spark-nlp-5.1.3 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.executor.memory", "12g")\
.config("spark.executor.cores", "3")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider"
)\
.getOrCreate()
print(spark.version)
Warning: Ignoring non-Spark config property: fs.s3a.aws.credentials.provider
:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency org.apache.hadoop#hadoop-aws added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-07b1bbb3-a113-4f07-8f3d-2fe69d5bc684;1.0 confs: [default] found com.johnsnowlabs.nlp#spark-nlp_2.12;5.1.3 in central found com.typesafe#config;1.4.2 in central found org.rocksdb#rocksdbjni;6.29.5 in central found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central found com.github.universal-automata#liblevenshtein;3.0.0 in central found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central found com.google.code.gson#gson;2.3 in central found it.unimi.dsi#fastutil;7.0.12 in central found org.projectlombok#lombok;1.16.8 in central found com.google.cloud#google-cloud-storage;2.20.1 in central found com.google.guava#guava;31.1-jre in central found com.google.guava#failureaccess;1.0.1 in central found com.google.guava#listenablefuture;9999.0-empty-to-avoid-conflict-with-guava in central found com.google.errorprone#error_prone_annotations;2.18.0 in central found com.google.j2objc#j2objc-annotations;1.3 in central found com.google.http-client#google-http-client;1.43.0 in central found io.opencensus#opencensus-contrib-http-util;0.31.1 in central found com.google.http-client#google-http-client-jackson2;1.43.0 in central found com.google.http-client#google-http-client-gson;1.43.0 in central found com.google.api-client#google-api-client;2.2.0 in central found commons-codec#commons-codec;1.15 in central found com.google.oauth-client#google-oauth-client;1.34.1 in central found com.google.http-client#google-http-client-apache-v2;1.43.0 in central found com.google.apis#google-api-services-storage;v1-rev20220705-2.0.0 in central found com.google.code.gson#gson;2.10.1 in central found com.google.cloud#google-cloud-core;2.12.0 in central found io.grpc#grpc-context;1.53.0 in central found com.google.auto.value#auto-value-annotations;1.10.1 in central found com.google.auto.value#auto-value;1.10.1 in central found javax.annotation#javax.annotation-api;1.3.2 in central found commons-logging#commons-logging;1.2 in central found com.google.cloud#google-cloud-core-http;2.12.0 in central found com.google.http-client#google-http-client-appengine;1.43.0 in central found com.google.api#gax-httpjson;0.108.2 in central found com.google.cloud#google-cloud-core-grpc;2.12.0 in central found io.grpc#grpc-alts;1.53.0 in central found io.grpc#grpc-grpclb;1.53.0 in central found org.conscrypt#conscrypt-openjdk-uber;2.5.2 in central found io.grpc#grpc-auth;1.53.0 in central found io.grpc#grpc-protobuf;1.53.0 in central found io.grpc#grpc-protobuf-lite;1.53.0 in central found io.grpc#grpc-core;1.53.0 in central found com.google.api#gax;2.23.2 in central found com.google.api#gax-grpc;2.23.2 in central found com.google.auth#google-auth-library-credentials;1.16.0 in central found com.google.auth#google-auth-library-oauth2-http;1.16.0 in central found com.google.api#api-common;2.6.2 in central found io.opencensus#opencensus-api;0.31.1 in central found com.google.api.grpc#proto-google-iam-v1;1.9.2 in central found com.google.protobuf#protobuf-java;3.21.12 in central found com.google.protobuf#protobuf-java-util;3.21.12 in central found com.google.api.grpc#proto-google-common-protos;2.14.2 in central found org.threeten#threetenbp;1.6.5 in central found com.google.api.grpc#proto-google-cloud-storage-v2;2.20.1-alpha in central found com.google.api.grpc#grpc-google-cloud-storage-v2;2.20.1-alpha in central found com.google.api.grpc#gapic-google-cloud-storage-v2;2.20.1-alpha in central found com.fasterxml.jackson.core#jackson-core;2.14.2 in central found com.google.code.findbugs#jsr305;3.0.2 in central found io.grpc#grpc-api;1.53.0 in central found io.grpc#grpc-stub;1.53.0 in central found org.checkerframework#checker-qual;3.31.0 in central found io.perfmark#perfmark-api;0.26.0 in central found com.google.android#annotations;4.1.1.4 in central found org.codehaus.mojo#animal-sniffer-annotations;1.22 in central found io.opencensus#opencensus-proto;0.2.0 in central found io.grpc#grpc-services;1.53.0 in central found com.google.re2j#re2j;1.6 in central found io.grpc#grpc-netty-shaded;1.53.0 in central found io.grpc#grpc-googleapis;1.53.0 in central found io.grpc#grpc-xds;1.53.0 in central found com.navigamez#greex;1.0 in central found dk.brics.automaton#automaton;1.11-8 in central found com.johnsnowlabs.nlp#tensorflow-cpu_2.12;0.4.4 in central found com.microsoft.onnxruntime#onnxruntime;1.15.0 in central found org.apache.hadoop#hadoop-aws;3.2.2 in central :: resolution report :: resolve 4253ms :: artifacts dl 583ms :: modules in use: com.amazonaws#aws-java-sdk-bundle;1.11.828 from central in [default] com.fasterxml.jackson.core#jackson-core;2.14.2 from central in [default] com.github.universal-automata#liblevenshtein;3.0.0 from central in [default] com.google.android#annotations;4.1.1.4 from central in [default] com.google.api#api-common;2.6.2 from central in [default] com.google.api#gax;2.23.2 from central in [default] com.google.api#gax-grpc;2.23.2 from central in [default] com.google.api#gax-httpjson;0.108.2 from central in [default] com.google.api-client#google-api-client;2.2.0 from central in [default] com.google.api.grpc#gapic-google-cloud-storage-v2;2.20.1-alpha from central in [default] com.google.api.grpc#grpc-google-cloud-storage-v2;2.20.1-alpha from central in [default] com.google.api.grpc#proto-google-cloud-storage-v2;2.20.1-alpha from central in [default] com.google.api.grpc#proto-google-common-protos;2.14.2 from central in [default] com.google.api.grpc#proto-google-iam-v1;1.9.2 from central in [default] com.google.apis#google-api-services-storage;v1-rev20220705-2.0.0 from central in [default] com.google.auth#google-auth-library-credentials;1.16.0 from central in [default] com.google.auth#google-auth-library-oauth2-http;1.16.0 from central in [default] com.google.auto.value#auto-value;1.10.1 from central in [default] com.google.auto.value#auto-value-annotations;1.10.1 from central in [default] com.google.cloud#google-cloud-core;2.12.0 from central in [default] com.google.cloud#google-cloud-core-grpc;2.12.0 from central in [default] com.google.cloud#google-cloud-core-http;2.12.0 from central in [default] com.google.cloud#google-cloud-storage;2.20.1 from central in [default] com.google.code.findbugs#jsr305;3.0.2 from central in [default] com.google.code.gson#gson;2.10.1 from central in [default] com.google.errorprone#error_prone_annotations;2.18.0 from central in [default] com.google.guava#failureaccess;1.0.1 from central in [default] com.google.guava#guava;31.1-jre from central in [default] com.google.guava#listenablefuture;9999.0-empty-to-avoid-conflict-with-guava from central in [default] com.google.http-client#google-http-client;1.43.0 from central in [default] com.google.http-client#google-http-client-apache-v2;1.43.0 from central in [default] com.google.http-client#google-http-client-appengine;1.43.0 from central in [default] com.google.http-client#google-http-client-gson;1.43.0 from central in [default] com.google.http-client#google-http-client-jackson2;1.43.0 from central in [default] com.google.j2objc#j2objc-annotations;1.3 from central in [default] com.google.oauth-client#google-oauth-client;1.34.1 from central in [default] com.google.protobuf#protobuf-java;3.21.12 from central in [default] com.google.protobuf#protobuf-java-util;3.21.12 from central in [default] com.google.re2j#re2j;1.6 from central in [default] com.johnsnowlabs.nlp#spark-nlp_2.12;5.1.3 from central in [default] com.johnsnowlabs.nlp#tensorflow-cpu_2.12;0.4.4 from central in [default] com.microsoft.onnxruntime#onnxruntime;1.15.0 from central in [default] com.navigamez#greex;1.0 from central in [default] com.typesafe#config;1.4.2 from central in [default] commons-codec#commons-codec;1.15 from central in [default] commons-logging#commons-logging;1.2 from central in [default] dk.brics.automaton#automaton;1.11-8 from central in [default] io.grpc#grpc-alts;1.53.0 from central in [default] io.grpc#grpc-api;1.53.0 from central in [default] io.grpc#grpc-auth;1.53.0 from central in [default] io.grpc#grpc-context;1.53.0 from central in [default] io.grpc#grpc-core;1.53.0 from central in [default] io.grpc#grpc-googleapis;1.53.0 from central in [default] io.grpc#grpc-grpclb;1.53.0 from central in [default] io.grpc#grpc-netty-shaded;1.53.0 from central in [default] io.grpc#grpc-protobuf;1.53.0 from central in [default] io.grpc#grpc-protobuf-lite;1.53.0 from central in [default] io.grpc#grpc-services;1.53.0 from central in [default] io.grpc#grpc-stub;1.53.0 from central in [default] io.grpc#grpc-xds;1.53.0 from central in [default] io.opencensus#opencensus-api;0.31.1 from central in [default] io.opencensus#opencensus-contrib-http-util;0.31.1 from central in [default] io.opencensus#opencensus-proto;0.2.0 from central in [default] io.perfmark#perfmark-api;0.26.0 from central in [default] it.unimi.dsi#fastutil;7.0.12 from central in [default] javax.annotation#javax.annotation-api;1.3.2 from central in [default] org.apache.hadoop#hadoop-aws;3.2.2 from central in [default] org.checkerframework#checker-qual;3.31.0 from central in [default] org.codehaus.mojo#animal-sniffer-annotations;1.22 from central in [default] org.conscrypt#conscrypt-openjdk-uber;2.5.2 from central in [default] org.projectlombok#lombok;1.16.8 from central in [default] org.rocksdb#rocksdbjni;6.29.5 from central in [default] org.threeten#threetenbp;1.6.5 from central in [default] :: evicted modules: com.google.protobuf#protobuf-java-util;3.0.0-beta-3 by [com.google.protobuf#protobuf-java-util;3.21.12] in [default] com.google.protobuf#protobuf-java;3.0.0-beta-3 by [com.google.protobuf#protobuf-java;3.21.12] in [default] com.google.code.gson#gson;2.3 by [com.google.code.gson#gson;2.10.1] in [default] com.amazonaws#aws-java-sdk-bundle;1.11.563 by [com.amazonaws#aws-java-sdk-bundle;1.11.828] in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 77 | 0 | 0 | 4 || 73 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-07b1bbb3-a113-4f07-8f3d-2fe69d5bc684 confs: [default] 0 artifacts copied, 73 already retrieved (0kB/188ms) 23/12/07 19:56:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
3.4.0
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, length, expr, to_timestamp, hour, dayofweek, dayofmonth, month, year, avg
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F
from pyspark.sql.functions import desc
from pyspark.sql.functions import rank
from pyspark.sql.functions import length
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
%%time
bucket = "project-group34"
session = sagemaker.Session()
output_prefix_data_submissions = "project/submissions/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_submissions}"
print(f"reading comments from {s3_path}")
submissions = spark.read.parquet(s3_path, header=True)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml reading comments from s3a://project-group34/project/submissions/yyyy=* CPU times: user 156 ms, sys: 6.49 ms, total: 163 ms Wall time: 1.22 s
submissions = submissions.select("subreddit", "score", "created_utc")
submissions.show(5)
[Stage 1:> (0 + 1) / 1]
+----------+-----+-------------------+ | subreddit|score| created_utc| +----------+-----+-------------------+ |television| 0|2021-01-27 22:16:47| | anime| 0|2021-01-27 22:17:30| |television| 4|2021-01-27 22:17:42| | movies| 0|2021-01-27 22:18:23| | anime| 0|2021-01-27 22:19:23| +----------+-----+-------------------+ only showing top 5 rows
# Feature engineering
# submissions = submissions.withColumn('post_length', length(df.title) + length(df.selftext))
submissions = submissions.withColumn('created_utc', to_timestamp('created_utc'))
submissions = submissions.withColumn('hour_of_day', hour('created_utc'))
submissions = submissions.withColumn('day_of_week', dayofweek('created_utc'))
submissions = submissions.withColumn('day_of_week_str', expr("""
CASE day_of_week
WHEN 1 THEN 'Sunday'
WHEN 2 THEN 'Monday'
WHEN 3 THEN 'Tuesday'
WHEN 4 THEN 'Wednesday'
WHEN 5 THEN 'Thursday'
WHEN 6 THEN 'Friday'
WHEN 7 THEN 'Saturday'
END
"""))
submissions = submissions.withColumn('day_of_month', dayofmonth('created_utc'))
submissions = submissions.withColumn('month', month('created_utc'))
submissions = submissions.withColumn('year', year('created_utc'))
submissions.show(5)
+----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ | subreddit|score| created_utc|hour_of_day|day_of_week|day_of_week_str|day_of_month|month|year| +----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ |television| 0|2021-01-27 22:16:47| 22| 4| Wednesday| 27| 1|2021| | anime| 0|2021-01-27 22:17:30| 22| 4| Wednesday| 27| 1|2021| |television| 4|2021-01-27 22:17:42| 22| 4| Wednesday| 27| 1|2021| | movies| 0|2021-01-27 22:18:23| 22| 4| Wednesday| 27| 1|2021| | anime| 0|2021-01-27 22:19:23| 22| 4| Wednesday| 27| 1|2021| +----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ only showing top 5 rows
# EDA on datetime features
# Group by subreddit, year, month and calculate average score
result_df = submissions.groupBy("subreddit", "year", "month").agg(avg("score").alias("average_score"))
# Show the result
result_df.show()
[Stage 3:=======================================================>(99 + 1) / 100]
+----------+----+-----+------------------+ | subreddit|year|month| average_score| +----------+----+-----+------------------+ | movies|2021| 5| 92.46727404587072| | movies|2021| 10|153.31222485607856| |television|2021| 3| 326.3586521987436| |television|2021| 6| 278.5865019011407| | anime|2021| 9| 71.18142039717267| |television|2021| 7| 317.0394736842105| | anime|2021| 11| 73.64954381660661| | movies|2021| 6| 94.09434323148281| | anime|2021| 5| 76.54447121546112| | movies|2021| 11|137.37378399488745| | anime|2021| 10| 75.65115973454235| | anime|2021| 8| 69.77001127395715| | movies|2021| 12|119.85564542219346| |television|2021| 4| 334.8437964891401| | anime|2021| 3| 75.2238120054958| | movies|2021| 4|103.05983768169105| | movies|2021| 8|163.04338364338363| |television|2021| 11| 268.8965517241379| | movies|2021| 2|148.40551962604854| | anime|2021| 7| 71.8894794520548| +----------+----+-----+------------------+ only showing top 20 rows
result_df_pd = result_df.toPandas()
result_df_pd.to_csv("../../data/csv/year_month_day_avgscore_eda.csv", index=False)
submissions.show(5)
+----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ | subreddit|score| created_utc|hour_of_day|day_of_week|day_of_week_str|day_of_month|month|year| +----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ |television| 0|2021-01-27 22:16:47| 22| 4| Wednesday| 27| 1|2021| | anime| 0|2021-01-27 22:17:30| 22| 4| Wednesday| 27| 1|2021| |television| 4|2021-01-27 22:17:42| 22| 4| Wednesday| 27| 1|2021| | movies| 0|2021-01-27 22:18:23| 22| 4| Wednesday| 27| 1|2021| | anime| 0|2021-01-27 22:19:23| 22| 4| Wednesday| 27| 1|2021| +----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ only showing top 5 rows
# Group by subreddit, year, month and calculate average score
result_df1 = submissions.groupBy("subreddit", "hour_of_day", "day_of_week_str").agg(avg("score").alias("average_score"))
result_df1.show()
[Stage 4:=======================================================>(99 + 1) / 100]
+----------+-----------+---------------+------------------+ | subreddit|hour_of_day|day_of_week_str| average_score| +----------+-----------+---------------+------------------+ |television| 0| Wednesday|243.69258589511753| |television| 3| Friday| 174.2452380952381| | anime| 20| Wednesday|22.687591240875914| | anime| 0| Saturday| 46.98122065727699| | anime| 3| Monday| 72.83072713057076| | anime| 6| Monday|33.139367816091955| |television| 5| Monday|206.81736526946108| |television| 9| Friday| 253.1782945736434| |television| 16| Thursday|282.18665464382326| | movies| 9| Friday| 83.85595238095237| | movies| 1| Thursday|106.02503477051461| | anime| 3| Thursday| 74.6719936076708| | anime| 9| Tuesday|175.09882088714207| |television| 21| Saturday|218.06432748538012| | anime| 8| Saturday| 85.56456456456456| | anime| 16| Saturday|237.45001649620588| | movies| 12| Monday| 267.5448065173116| | movies| 1| Friday|128.71376811594203| | anime| 22| Wednesday|39.755630630630634| | movies| 8| Saturday| 92.20946372239747| +----------+-----------+---------------+------------------+ only showing top 20 rows
result_df1_pd = result_df1.toPandas()
result_df1_pd.to_csv("../../data/csv/daily_weekly_avgscore_eda.csv", index=False)
submissions.show(5)
+----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ | subreddit|score| created_utc|hour_of_day|day_of_week|day_of_week_str|day_of_month|month|year| +----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ |television| 0|2021-01-27 22:16:47| 22| 4| Wednesday| 27| 1|2021| | anime| 0|2021-01-27 22:17:30| 22| 4| Wednesday| 27| 1|2021| |television| 4|2021-01-27 22:17:42| 22| 4| Wednesday| 27| 1|2021| | movies| 0|2021-01-27 22:18:23| 22| 4| Wednesday| 27| 1|2021| | anime| 0|2021-01-27 22:19:23| 22| 4| Wednesday| 27| 1|2021| +----------+-----+-------------------+-----------+-----------+---------------+------------+-----+----+ only showing top 5 rows
# Group by subreddit, year, month and calculate average score
result_df2 = submissions.groupBy("subreddit", "day_of_month").count()
result_df2.show()
[Stage 17:======================================================>(99 + 1) / 100]
+----------+------------+-----+ | subreddit|day_of_month|count| +----------+------------+-----+ | movies| 4|12229| | anime| 27|12959| |television| 9| 2893| | movies| 31| 7671| |television| 1| 3017| | movies| 30|11524| | anime| 18|13014| | anime| 31| 7972| | anime| 17|12939| | anime| 10|13332| |television| 23| 2971| |television| 17| 2926| |television| 13| 2942| | movies| 9|12423| |television| 31| 1818| |television| 15| 2857| |television| 26| 2721| | movies| 10|12493| |television| 29| 2586| | movies| 20|12414| +----------+------------+-----+ only showing top 20 rows
result_df2_pd = result_df2.toPandas()
result_df2_pd.to_csv("../../data/csv/daily_weekly_count_eda.csv", index=False)
%%time
bucket = "project-group34"
session = sagemaker.Session()
output_prefix_data_submissions = "project/submissions/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_submissions}"
print(f"reading comments from {s3_path}")
submissions = spark.read.parquet(s3_path, header=True)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml reading comments from s3a://project-group34/project/submissions/yyyy=* CPU times: user 196 ms, sys: 0 ns, total: 196 ms Wall time: 919 ms
# Feature engineering
submissions = submissions.withColumn('post_length', length(submissions.title) + length(submissions.selftext))
submissions = submissions.withColumn('created_utc', to_timestamp('created_utc'))
submissions = submissions.withColumn('hour_of_day', hour('created_utc'))
submissions = submissions.withColumn('day_of_week', dayofweek('created_utc'))
submissions = submissions.withColumn('day_of_week_str', expr("""
CASE day_of_week
WHEN 1 THEN 'Sunday'
WHEN 2 THEN 'Monday'
WHEN 3 THEN 'Tuesday'
WHEN 4 THEN 'Wednesday'
WHEN 5 THEN 'Thursday'
WHEN 6 THEN 'Friday'
WHEN 7 THEN 'Saturday'
END
"""))
submissions = submissions.withColumn('day_of_month', dayofmonth('created_utc'))
submissions = submissions.withColumn('month', month('created_utc'))
submissions = submissions.withColumn('year', year('created_utc'))
submissions = submissions.withColumn('has_media', col('media').isNotNull())
submissions = submissions.select("subreddit", "score", "post_length", "num_comments", "has_media")
submissions.show(5)
+----------+-----+-----------+------------+---------+ | subreddit|score|post_length|num_comments|has_media| +----------+-----+-----------+------------+---------+ |television| 0| 605| 9| false| | anime| 0| 50| 3| false| |television| 4| 86| 11| false| | movies| 0| 42| 4| false| | anime| 0| 64| 9| false| +----------+-----+-----------+------------+---------+ only showing top 5 rows
submissions_pd = submissions.toPandas()
submissions_pd.to_csv("../../data/csv/engagement_with_media.csv", index=False)