from pyspark.sql.functions import length
from pyspark.sql import functions as F
import os
from pyspark.sql.functions import col
from pyspark.sql.functions import desc
import pandas as pd
datastore = 'azureml://datastores/workspaceblobstore/paths/'
submissions_path = 'filtered-submissions'
submissions_df = spark.read.parquet(f"{datastore}{submissions_path}")
# take a subset of columns
df = submissions_df.select("subreddit", "author", "title", "selftext",
"created_utc", "num_comments", "score",
"over_18", "media", "pinned", "locked",
"disable_comments", "domain", "hidden",
"distinguished", "hide_score")
# Calculate post length
df = df.withColumn('post_length', length(df.title) + length(df.selftext))
df = df.withColumn('created_utc', F.to_timestamp('created_utc'))
# Extract time-based features
df = df.withColumn('hour_of_day', F.hour('created_utc'))
df = df.withColumn('day_of_week', F.dayofweek('created_utc')) # 1 (Sunday) to 7 (Saturday)
# Map each day of the week from numeric to string
df = df.withColumn('day_of_week_str', F.expr("""
CASE day_of_week
WHEN 1 THEN 'Sunday'
WHEN 2 THEN 'Monday'
WHEN 3 THEN 'Tuesday'
WHEN 4 THEN 'Wednesday'
WHEN 5 THEN 'Thursday'
WHEN 6 THEN 'Friday'
WHEN 7 THEN 'Saturday'
END
"""))
df = df.withColumn('day_of_month', F.dayofmonth('created_utc'))
df = df.withColumn('month', F.month('created_utc'))
df = df.withColumn('year', F.year('created_utc'))
df = df.withColumn('has_media', F.col('media').isNotNull())
df = df.drop(*["media", "disable_comments", "distinguished"])
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 107, 6, Finished, Available)
PLOT_DIR = os.path.join("Users/st1140/fall-2023-reddit-project-team-34/data", "plots")
CSV_DIR = os.path.join("Users/st1140/fall-2023-reddit-project-team-34/data", "csv")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 106, 19, Finished, Available)
df.printSchema()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 97, 15, Finished, Available)
root |-- subreddit: string (nullable = true) |-- author: string (nullable = true) |-- title: string (nullable = true) |-- selftext: string (nullable = true) |-- created_utc: timestamp (nullable = true) |-- num_comments: long (nullable = true) |-- score: long (nullable = true) |-- over_18: boolean (nullable = true) |-- pinned: boolean (nullable = true) |-- locked: boolean (nullable = true) |-- domain: string (nullable = true) |-- hidden: boolean (nullable = true) |-- hide_score: boolean (nullable = true) |-- post_length: integer (nullable = true) |-- hour_of_day: integer (nullable = true) |-- day_of_week: integer (nullable = true) |-- day_of_week_str: string (nullable = true) |-- day_of_month: integer (nullable = true) |-- month: integer (nullable = true) |-- year: integer (nullable = true) |-- has_media: boolean (nullable = false)
# Filter the DataFrame for the subreddit 'anime', 'movies', and 'television'
filtered_df = df.filter(df.subreddit.isin('movies', 'anime', 'television'))
# Select columns for Plotly DataFrame
df_plotly = filtered_df.select(["subreddit", "title","num_comments", "selftext","author","score"])
# Order the DataFrame in descending order based on the num_comments column
df_plotly_sorted = df_plotly.orderBy(desc("num_comments"))
# Retrieve the top 20 rows
df_top_20 = df_plotly_sorted.limit(20)
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 98, 8, Finished, Available)
df_top_20_pd=df_top_20.toPandas()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 97, 17, Finished, Available)
df_top_20_pd.to_csv(f"{CSV_DIR}/top_comments_eda_1.csv")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 97, 18, Finished, Available)
# Assuming your DataFrame is named 'df' and has a column named 'author'
authors_to_count = ["dpemerson76", "lionsgate", "LiteraryBoner","AnimeMod","prsnreddit","officialtobeymaguire","KillerQ97","leedavis1987","Jeff_Souza"]
# Filter the DataFrame based on the specified authors
filtered_df = df.filter(col("author").isin(authors_to_count))
# Count the occurrences of each author
author_counts = filtered_df.groupBy("author").count()
# Show the result
author_counts.show()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 98, 9, Finished, Available)
+--------------------+-----+ | author|count| +--------------------+-----+ | LiteraryBoner| 549| | prsnreddit| 519| | AnimeMod| 755| | Jeff_Souza| 4| | lionsgate| 9| | leedavis1987| 4| |officialtobeymaguire| 1| | dpemerson76| 4| | KillerQ97| 8| +--------------------+-----+
joinedDF = df_top_20.join(author_counts.select('author', 'count'), ['author'])
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 98, 10, Finished, Available)
# Filter the DataFrame for the subreddit 'anime', 'movies', and 'television'
filtered_df_movies = df.filter(df.subreddit.isin('movies'))
# Select columns for Plotly DataFrame
df_plotly_movies = filtered_df_movies.select(["subreddit","title", "num_comments", "selftext","author","score"])
# Order the DataFrame in descending order based on the num_comments column
df_plotly_sorted_movies = df_plotly_movies.orderBy(desc("num_comments"))
# Retrieve the top 20 rows
df_top_20_movies = df_plotly_sorted_movies.limit(20)
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 101, 8, Finished, Available)
df_top_20_pd_movies=df_top_20_movies.toPandas()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 101, 9, Finished, Available)
df_top_20_pd_movies.to_csv(f"{CSV_DIR}/top_comments_movies.csv")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 101, 10, Finished, Available)
# Filter the DataFrame for the subreddit 'anime', 'movies', and 'television'
filtered_df_anime = df.filter(df.subreddit.isin('anime'))
# Select columns for Plotly DataFrame
df_plotly_anime = filtered_df_anime.select(["subreddit","title", "num_comments", "selftext","author","score"])
# Order the DataFrame in descending order based on the num_comments column
df_plotly_sorted_anime = df_plotly_anime.orderBy(desc("num_comments"))
# Retrieve the top 20 rows
df_top_20_anime = df_plotly_sorted_anime.limit(20)
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 102, 8, Finished, Available)
df_top_20_pd_anime = df_top_20_anime.toPandas()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 102, 9, Finished, Available)
df_top_20_pd_anime.to_csv(f"{CSV_DIR}/top_comments_anime.csv")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 102, 10, Finished, Available)
# Filter the DataFrame for the subreddit 'anime', 'movies', and 'television'
filtered_df_television = df.filter(df.subreddit.isin('television'))
# Select columns for Plotly DataFrame
df_plotly_television = filtered_df_television.select(["subreddit", "num_comments","title", "selftext","author","score"])
# Order the DataFrame in descending order based on the num_comments column
df_plotly_sorted_television = df_plotly_television.orderBy(desc("num_comments"))
# Retrieve the top 20 rows
df_top_20_television = df_plotly_sorted_television.limit(50)
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 104, 8, Finished, Available)
df_top_20_pd_television=df_top_20_television.toPandas()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 104, 9, Finished, Available)
df_top_20_pd_television.to_csv(f"{CSV_DIR}/top_comments_tv_show.csv")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 104, 10, Finished, Available)
# Group by 'author' and count the occurrences
author_counts = df.groupBy("author").count()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 95, 10, Finished, Available)
# Read all the csvs
df_movies = pd.read_csv("Users/st1140/fall-2023-reddit-project-team-34/data/csv/top_comments_movies.csv")
df_anime = pd.read_csv("Users/st1140/fall-2023-reddit-project-team-34/data/csv/top_comments_anime.csv")
df_tv = pd.read_csv("Users/st1140/fall-2023-reddit-project-team-34/data/csv/top_comments_tv_show.csv")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 107, 7, Finished, Available)
# calculating count of posts per author
result = pd.concat([df_movies, df_anime, df_tv], axis=0)
unique_authors = list(result[["subreddit", "author"]].drop_duplicates()["author"].unique())
count_of_posts_per_author = df.filter(df.author.isin(unique_authors)).groupBy("author").count().toPandas()
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 107, 8, Finished, Available)
# merging and storing to csv
authors_with_top_comments_post_counts = result.merge(count_of_posts_per_author, how="left", on="author")
authors_with_top_comments_post_counts.drop(columns=['Unnamed: 0'], axis=1, inplace=True)
authors_with_top_comments_post_counts = authors_with_top_comments_post_counts[authors_with_top_comments_post_counts['author'] != '[deleted]']
authors_with_top_comments_post_counts.to_csv(f"Users/st1140/fall-2023-reddit-project-team-34/data/csv/authors_with_top_comments_post_counts.csv")
StatementMeta(9d2667d4-d95f-4c64-adf7-bfab734cf5c0, 107, 9, Finished, Available)