# Setup - Run only once per Kernel App
%conda install openjdk -y
# install PySpark
%pip install pyspark==3.4.0
# install spark-nlp
%pip install spark-nlp==5.1.3
# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done Solving environment: done ==> WARNING: A newer version of conda exists. <== current version: 23.3.1 latest version: 23.10.0 Please update conda by running $ conda update -n base -c defaults conda Or to minimize the number of packages updated during conda update use conda install conda=23.10.0 ## Package Plan ## environment location: /opt/conda added / updated specs: - openjdk The following packages will be downloaded: package | build ---------------------------|----------------- ca-certificates-2023.08.22 | h06a4308_0 123 KB certifi-2023.11.17 | py310h06a4308_0 158 KB openjdk-11.0.13 | h87a67e3_0 341.0 MB ------------------------------------------------------------ Total: 341.3 MB The following NEW packages will be INSTALLED: openjdk pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 The following packages will be UPDATED: ca-certificates conda-forge::ca-certificates-2023.7.2~ --> pkgs/main::ca-certificates-2023.08.22-h06a4308_0 certifi conda-forge/noarch::certifi-2023.7.22~ --> pkgs/main/linux-64::certifi-2023.11.17-py310h06a4308_0 Downloading and Extracting Packages openjdk-11.0.13 | 341.0 MB | | 0% ca-certificates-2023 | 123 KB | | 0% certifi-2023.11.17 | 158 KB | | 0% ca-certificates-2023 | 123 KB | ##################################### | 100% Preparing transaction: done Verifying transaction: done Executing transaction: done Note: you may need to restart the kernel to use updated packages. Collecting pyspark==3.4.0 Using cached pyspark-3.4.0-py2.py3-none-any.whl Collecting py4j==0.10.9.7 (from pyspark==3.4.0) Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB) Installing collected packages: py4j, pyspark Successfully installed py4j-0.10.9.7 pyspark-3.4.0 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages. Collecting spark-nlp==5.1.3 Obtaining dependency information for spark-nlp==5.1.3 from https://files.pythonhosted.org/packages/cd/7d/bc0eca4c9ec4c9c1d9b28c42c2f07942af70980a7d912d0aceebf8db32dd/spark_nlp-5.1.3-py2.py3-none-any.whl.metadata Using cached spark_nlp-5.1.3-py2.py3-none-any.whl.metadata (53 kB) Using cached spark_nlp-5.1.3-py2.py3-none-any.whl (537 kB) Installing collected packages: spark-nlp Successfully installed spark-nlp-5.1.3 WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip Note: you may need to restart the kernel to use updated packages.
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.executor.memory", "12g")\
.config("spark.executor.cores", "3")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider"
)\
.getOrCreate()
# spark = (
# SparkSession.builder.appName("PySparkApp")
# .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
# .config(
# "fs.s3a.aws.credentials.provider",
# "com.amazonaws.auth.ContainerCredentialsProvider",
# )
# .getOrCreate()
# )
print(spark.version)
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
print(f"Spark version: {spark.version}")
print(f"sparknlp version: {sparknlp.version()}")
Spark version: 3.4.0 sparknlp version: 5.1.3
%%time
bucket = "project-group34"
session = sagemaker.Session()
output_prefix_data_comments = "project/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
print(f"reading comments from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml reading comments from s3a://project-group34/project/comments/yyyy=*
23/11/24 22:32:41 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
CPU times: user 191 ms, sys: 16.4 ms, total: 207 ms Wall time: 6.64 s
# comments = comments.cache()
comments.printSchema()
root |-- author: string (nullable = true) |-- author_cakeday: boolean (nullable = true) |-- author_flair_css_class: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- body: string (nullable = true) |-- can_gild: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- created_utc: timestamp (nullable = true) |-- distinguished: string (nullable = true) |-- edited: string (nullable = true) |-- gilded: long (nullable = true) |-- id: string (nullable = true) |-- is_submitter: boolean (nullable = true) |-- link_id: string (nullable = true) |-- parent_id: string (nullable = true) |-- permalink: string (nullable = true) |-- retrieved_on: timestamp (nullable = true) |-- score: long (nullable = true) |-- stickied: boolean (nullable = true) |-- subreddit: string (nullable = true) |-- subreddit_id: string (nullable = true)
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality").show()
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ | subreddit| author| body| parent_id| id| created_utc|score|controversiality| +----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ | Animesuggest| Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52| 2| 0| | Animesuggest| Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05| 1| 0| | Animesuggest| [deleted]| [deleted]| t3_m3vnjl|gqscjse|2021-03-13 10:18:25| 1| 0| |MovieSuggestions| katnip_fl| Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07| 2| 0| | Animesuggest| Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26| 1| 0| | Animesuggest| Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26| 3| 0| |MovieSuggestions| alienstabler|[Holes (2003)](ht...| t3_l20wrm|gqscozr|2021-03-13 10:21:04| 1| 0| | Animesuggest| Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10| 1| 0| | Animesuggest|crash-scientist|I didn’t mean to ...|t1_gqs4syw|gqscp6i|2021-03-13 10:21:10| 2| 0| | Animesuggest| dorting| Watch Evangelion| t3_m3vnjl|gqscp9a|2021-03-13 10:21:13| 2| 0| |MovieSuggestions| ThalesHedonist|V for Vendetta\n\...| t3_m3rw47|gqscpsz|2021-03-13 10:21:29| 3| 0| | Animesuggest| Athenza|{Vampire Hunter D...| t3_m3wy06|gqscspa|2021-03-13 10:22:51| 2| 0| | Animesuggest| Roboragi|**Vampire Hunter ...|t1_gqscspa|gqsct5l|2021-03-13 10:23:05| 2| 0| |MovieSuggestions| jaymewheeler|Synchronic was co...| t3_m3rw47|gqscu76|2021-03-13 10:23:37| 2| 0| | Animesuggest| Arvidex|- {Wonder Egg Pei...| t3_m3vnjl|gqscuva|2021-03-13 10:23:57| 2| 0| | Animesuggest| Roboragi|**Wonder Egg Prio...|t1_gqscuva|gqscvqx|2021-03-13 10:24:25| 1| 0| | Animesuggest| mgd5800|{Nejimaki Seirei ...| t3_m43dco|gqscxbt|2021-03-13 10:25:12| 2| 0| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...| t3_m3rw47|gqscxjk|2021-03-13 10:25:19| 2| 0| | Animesuggest| Roboragi|**Nejimaki Seirei...|t1_gqscxbt|gqscy94|2021-03-13 10:25:40| 2| 0| | Animesuggest| DyeDye1234|{is the order a r...| t3_m41ujb|gqsd2ku|2021-03-13 10:27:55| 1| 0| +----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ only showing top 20 rows
# Filter out rows where 'body' or 'author' is '[deleted]'
comments_filtered = comments.filter((comments.body != '[deleted]') & (comments.author != '[deleted]'))
# Show the filtered DataFrame
comments_filtered = comments_filtered.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality")
comments_filtered.show()
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ | subreddit| author| body| parent_id| id| created_utc|score|controversiality| +----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ | Animesuggest| Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52| 2| 0| | Animesuggest| Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05| 1| 0| |MovieSuggestions| katnip_fl| Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07| 2| 0| | Animesuggest| Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26| 1| 0| | Animesuggest| Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26| 3| 0| |MovieSuggestions| alienstabler|[Holes (2003)](ht...| t3_l20wrm|gqscozr|2021-03-13 10:21:04| 1| 0| | Animesuggest| Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10| 1| 0| | Animesuggest|crash-scientist|I didn’t mean to ...|t1_gqs4syw|gqscp6i|2021-03-13 10:21:10| 2| 0| | Animesuggest| dorting| Watch Evangelion| t3_m3vnjl|gqscp9a|2021-03-13 10:21:13| 2| 0| |MovieSuggestions| ThalesHedonist|V for Vendetta\n\...| t3_m3rw47|gqscpsz|2021-03-13 10:21:29| 3| 0| | Animesuggest| Athenza|{Vampire Hunter D...| t3_m3wy06|gqscspa|2021-03-13 10:22:51| 2| 0| | Animesuggest| Roboragi|**Vampire Hunter ...|t1_gqscspa|gqsct5l|2021-03-13 10:23:05| 2| 0| |MovieSuggestions| jaymewheeler|Synchronic was co...| t3_m3rw47|gqscu76|2021-03-13 10:23:37| 2| 0| | Animesuggest| Arvidex|- {Wonder Egg Pei...| t3_m3vnjl|gqscuva|2021-03-13 10:23:57| 2| 0| | Animesuggest| Roboragi|**Wonder Egg Prio...|t1_gqscuva|gqscvqx|2021-03-13 10:24:25| 1| 0| | Animesuggest| mgd5800|{Nejimaki Seirei ...| t3_m43dco|gqscxbt|2021-03-13 10:25:12| 2| 0| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...| t3_m3rw47|gqscxjk|2021-03-13 10:25:19| 2| 0| | Animesuggest| Roboragi|**Nejimaki Seirei...|t1_gqscxbt|gqscy94|2021-03-13 10:25:40| 2| 0| | Animesuggest| DyeDye1234|{is the order a r...| t3_m41ujb|gqsd2ku|2021-03-13 10:27:55| 1| 0| | Animesuggest| Roboragi|**Gochuumon wa Us...|t1_gqsd2ku|gqsd3ca|2021-03-13 10:28:18| 1| 0| +----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ only showing top 20 rows
comments_filtered = comments_filtered.where(col("subreddit").isin("MovieSuggestions"))
# # Define the pipeline stages
# document_assembler = DocumentAssembler() \
# .setInputCol("body") \
# .setOutputCol("document")
# sentence_detector = SentenceDetector() \
# .setInputCols(["document"]) \
# .setOutputCol("sentence")
# tokenizer = Tokenizer() \
# .setInputCols(["sentence"]) \
# .setOutputCol("token")
# # Use a pretrained embeddings model, for example, BERT
# embeddings = BertEmbeddings.pretrained("bert_base_cased", "en") \
# .setInputCols(["sentence", "token"]) \
# .setOutputCol("embeddings")
# ner_model = NerDLModel.pretrained("ner_dl_bert", "en") \
# .setInputCols(["sentence", "token", "embeddings"]) \
# .setOutputCol("ner")
# ner_converter = NerConverter() \
# .setInputCols(["sentence", "token", "ner"]) \
# .setOutputCol("ner_chunk")
# # Build the pipeline
# nlp_pipeline = Pipeline(stages=[
# document_assembler,
# sentence_detector,
# tokenizer,
# embeddings,
# ner_model,
# ner_converter
# ])
# # Apply the pipeline to your DataFrame
# model = nlp_pipeline.fit(comments_filtered_movies)
# result = model.transform(comments_filtered_movies)
bert_base_cased download started this may take some time. Approximate size to download 384.9 MB [OK!] ner_dl_bert download started this may take some time. Approximate size to download 15.4 MB [OK!]
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import SentenceDetector, Tokenizer, WordEmbeddingsModel, NerDLModel, NerConverter
from pyspark.ml import Pipeline
# Define the pipeline stages
document_assembler = DocumentAssembler() \
.setInputCol("body") \
.setOutputCol("document")
sentence_detector = SentenceDetector() \
.setInputCols(["document"]) \
.setOutputCol("sentence")
tokenizer = Tokenizer() \
.setInputCols(["sentence"]) \
.setOutputCol("token")
# Use GloVe embeddings
embeddings = WordEmbeddingsModel.pretrained("glove_100d", "en") \
.setInputCols(["sentence", "token"]) \
.setOutputCol("embeddings")
# Use a lighter NER model
ner_model = NerDLModel.pretrained("ner_dl", "en") \
.setInputCols(["sentence", "token", "embeddings"]) \
.setOutputCol("ner")
ner_converter = NerConverter() \
.setInputCols(["sentence", "token", "ner"]) \
.setOutputCol("ner_chunk")
# Build the pipeline
nlp_pipeline = Pipeline(stages=[
document_assembler,
sentence_detector,
tokenizer,
embeddings,
ner_model,
ner_converter
])
# Apply the pipeline to your DataFrame
model = nlp_pipeline.fit(comments_filtered)
result = model.transform(comments_filtered)
result = result.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality", "ner_chunk")
result.show(5)
[Stage 7:> (0 + 1) / 1]
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+ | subreddit| author| body|parent_id| id| created_utc|score|controversiality| ner_chunk| +----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|t3_m3rw47|gqscl5i|2021-03-13 10:19:07| 2| 0|[{chunk, 0, 12, J...| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|t3_l20wrm|gqscozr|2021-03-13 10:21:04| 1| 0| []| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|t3_m3rw47|gqscpsz|2021-03-13 10:21:29| 3| 0|[{chunk, 6, 19, V...| |MovieSuggestions| jaymewheeler|Synchronic was co...|t3_m3rw47|gqscu76|2021-03-13 10:23:37| 2| 0| []| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...|t3_m3rw47|gqscxjk|2021-03-13 10:25:19| 2| 0|[{chunk, 94, 99, ...| +----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+ only showing top 5 rows
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as F
# Define a UDF to filter and extract movie names
def extract_movies(chunks):
movie_names = [chunk.result for chunk in chunks if chunk.metadata['entity'] in ['PERSON', 'ORG']]
return movie_names
extract_movie_names_udf = udf(extract_movies, ArrayType(StringType()))
# Apply the UDF to the DataFrame
movies_df = result.withColumn("movie_names", extract_movie_names_udf(F.col("ner_chunk")))
# # Display the results
# movies_df.select("body", "movie_names").show()
movies_df.show(5)
[Stage 8:> (0 + 1) / 1]
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+ | subreddit| author| body|parent_id| id| created_utc|score|controversiality| ner_chunk| movie_names| +----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|t3_m3rw47|gqscl5i|2021-03-13 10:19:07| 2| 0|[{chunk, 0, 12, J...| []| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|t3_l20wrm|gqscozr|2021-03-13 10:21:04| 1| 0| []| []| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|t3_m3rw47|gqscpsz|2021-03-13 10:21:29| 3| 0|[{chunk, 6, 19, V...|[Vendetta\n\nPulp]| |MovieSuggestions| jaymewheeler|Synchronic was co...|t3_m3rw47|gqscu76|2021-03-13 10:23:37| 2| 0| []| []| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...|t3_m3rw47|gqscxjk|2021-03-13 10:25:19| 2| 0|[{chunk, 94, 99, ...| []| +----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+ only showing top 5 rows
movies_df.write.parquet("s3a://project-group34/project/suggestions/all_movies/ner/", mode="overwrite")
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.executor.memory", "12g")\
.config("spark.executor.cores", "3")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider"
)\
.getOrCreate()
print(spark.version)
3.4.0
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
movies_df = spark.read.parquet("s3a://project-group34/project/suggestions/all_movies/ner/")
23/11/25 19:35:08 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
movies_df.show(5)
[Stage 1:> (0 + 1) / 1]
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+ | subreddit| author| body|parent_id| id| created_utc|score|controversiality| ner_chunk| movie_names| +----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|t3_m3rw47|gqscl5i|2021-03-13 10:19:07| 2| 0|[{chunk, 0, 12, J...| []| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|t3_l20wrm|gqscozr|2021-03-13 10:21:04| 1| 0| []| []| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|t3_m3rw47|gqscpsz|2021-03-13 10:21:29| 3| 0|[{chunk, 6, 19, V...|[Vendetta\n\nPulp]| |MovieSuggestions| jaymewheeler|Synchronic was co...|t3_m3rw47|gqscu76|2021-03-13 10:23:37| 2| 0| []| []| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...|t3_m3rw47|gqscxjk|2021-03-13 10:25:19| 2| 0|[{chunk, 94, 99, ...| []| +----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+ only showing top 5 rows
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField
# import spacy
import re
# # Load spaCy model
# nlp = spacy.load("en_core_web_sm")
# # Define schema for the UDF output
# movie_schema = StructType([
# StructField("movie_positions", ArrayType(ArrayType(StringType()))),
# StructField("movie_names", ArrayType(StringType()))
# ])
# # UDF to extract movie names
# @udf(movie_schema)
# def extract_movie_names_udf(text):
# doc = nlp(text)
# movie_positions = []
# movie_names = []
# for ent in doc.ents:
# if ent.label_ == "ORG" or ent.label_ == "PERSON":
# movie_positions.append([ent.start_char, ent.end_char])
# movie_names.append(ent.text)
# return (movie_positions, movie_names)
# UDF to remove movie names
@udf(StringType())
def remove_movie_names_udf(text, movie_names):
if movie_names:
for name in movie_names:
text = text.replace(name, ' ')
return ' '.join(text.split())
else:
return text
# UDF to extract movie names using regex
@udf(ArrayType(StringType()))
def extract_movie_names_regex_udf(text, movie_names):
movie_name_pattern = r'(?:\"([^\"]+)\"|([A-Z][a-z]*(?:\s+(?:[a-z]+\s+)*[A-Z][a-z]*)*)(?: \(\d{4}\))?)'
movie_matches = re.findall(movie_name_pattern, text)
movies = [match[0] or match[1] or match[2] for match in movie_matches]
return movie_names + movies
# Remove movie names from the 'body' text
df_removed_movie_names = movies_df.withColumn("body_no_movies", remove_movie_names_udf(movies_df["body"], movies_df["movie_names"]))
# If you still want to use the regex method to supplement the NER extraction
df_final = df_removed_movie_names.withColumn("additional_movie_names", extract_movie_names_regex_udf(df_removed_movie_names["body_no_movies"], df_removed_movie_names["movie_names"]))
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "additional_movie_names")
df_final.show(5)
[Stage 2:> (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+----------------------+ | subreddit| author| body| created_utc|score|controversiality|additional_movie_names| +----------------+--------------+--------------------+-------------------+-----+----------------+----------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|2021-03-13 10:19:07| 2| 0| [Jacobs Ladder]| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04| 1| 0| [Holes]| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...| |MovieSuggestions| jaymewheeler|Synchronic was co...|2021-03-13 10:23:37| 2| 0| [Synchronic]| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...|2021-03-13 10:25:19| 2| 0| [Climax, Sounds, ...| +----------------+--------------+--------------------+-------------------+-----+----------------+----------------------+ only showing top 5 rows
import nltk
#nltk.download('stopwords')
eng_stopwords = nltk.corpus.stopwords.words('english')
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
def remove_stop_word_from_movie_names(movies):
# Filter out single-word movie names that are in the stop words list
return [movie for movie in movies if not (len(movie.split()) == 1 and movie.lower() in eng_stopwords)]
remove_stop_word_udf = udf(remove_stop_word_from_movie_names, ArrayType(StringType()))
df_final = df_final.withColumn("movie_names_final", remove_stop_word_udf(df_final["additional_movie_names"]))
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "movie_names_final")
# Define a UDF to filter out single-letter movie names
def remove_single_letter_movies(movie_names):
return [name for name in movie_names if len(name.strip()) > 1]
remove_single_letter_movies_udf = udf(remove_single_letter_movies, ArrayType(StringType()))
# Apply the UDF to the DataFrame
df_final = df_final.withColumn("movie_names_final_cleaned", remove_single_letter_movies_udf("movie_names_final"))
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "movie_names_final_cleaned")
df_final.show()
+----------------+-------------------+--------------------+-------------------+-----+----------------+-------------------------+ | subreddit| author| body| created_utc|score|controversiality|movie_names_final_cleaned| +----------------+-------------------+--------------------+-------------------+-----+----------------+-------------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|2021-03-13 10:19:07| 2| 0| [Jacobs Ladder]| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04| 1| 0| [Holes]| |MovieSuggestions| ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...| |MovieSuggestions| jaymewheeler|Synchronic was co...|2021-03-13 10:23:37| 2| 0| [Synchronic]| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...|2021-03-13 10:25:19| 2| 0| [Climax, Sounds, ...| |MovieSuggestions| frankocozzo|• Sorry To Bother...|2021-03-13 10:29:35| 2| 0| [Bacurau \n• Coh...| |MovieSuggestions|Organized-Konfusion|I would also sugg...|2021-03-13 10:31:39| 2| 0| [I would also sug...| |MovieSuggestions| jonny_designs|Midsommar. Was no...|2021-03-13 10:37:18| 2| 0| [Midsommar, Was n...| |MovieSuggestions| LaughingGor108|Reservoir Dogs\n\...|2021-03-13 10:39:56| 1| 0| [Reservoir Dogs\n...| |MovieSuggestions| jonnyzat| Hoodlum|2021-03-13 10:43:46| 1| 0| [Hoodlum]| |MovieSuggestions| LaughingGor108|Crime/Thriller:\n...|2021-03-13 10:46:55| 1| 0| [No Mercy, Asura,...| |MovieSuggestions| LaughingGor108|Don't Breathe\n\n...|2021-03-13 10:50:09| 1| 0| [Don't Breathe, O...| |MovieSuggestions| ALT236-1|This is a new one...|2021-01-18 23:56:21| 1| 0| [Thank]| |MovieSuggestions| ChickenInAPotpie|Just realized the...|2021-01-18 23:58:12| 2| 0| []| |MovieSuggestions| maaximo| Children of Men|2021-01-18 23:59:41| 5| 0| [Children of Men]| |MovieSuggestions| ALT236-1|So many, thank yo...|2021-01-18 23:59:50| 1| 0| [Grand Budapest H...| |MovieSuggestions| Tevesh_CKP| Pacific Rim.|2021-01-19 00:01:32| 1| 0| [Pacific Rim]| |MovieSuggestions| Snoo_93607|Be careful with i...|2021-01-19 00:03:10| 2| 0| []| |MovieSuggestions| Tevesh_CKP|Wolf Warrior inst...|2021-01-19 00:03:28| 1| 0| [Wolf Warrior]| |MovieSuggestions| ALT236-1|The anime? Wow, h...|2021-01-19 00:04:15| 1| 0| [Wow, Thank]| +----------------+-------------------+--------------------+-------------------+-----+----------------+-------------------------+ only showing top 20 rows
df_final.write.parquet("s3a://project-group34/project/suggestions/all_movies/cleaned_with_regex/", mode="overwrite")
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.executor.memory", "12g")\
.config("spark.executor.cores", "3")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider"
)\
.getOrCreate()
print(spark.version)
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
df_final = spark.read.parquet("s3a://project-group34/project/suggestions/all_movies/cleaned_with_regex/")
23/11/25 20:48:39 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
df_final.show(5)
[Stage 1:> (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+ | subreddit| author| body| created_utc|score|controversiality|movie_names_final_cleaned| +----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|2021-03-13 10:19:07| 2| 0| [Jacobs Ladder]| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04| 1| 0| [Holes]| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...| |MovieSuggestions| jaymewheeler|Synchronic was co...|2021-03-13 10:23:37| 2| 0| [Synchronic]| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...|2021-03-13 10:25:19| 2| 0| [Climax, Sounds, ...| +----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+ only showing top 5 rows
from pyspark.sql.functions import explode, col, count, split
# Flatten the movie_names column
df_flattened = df_final.withColumn("movie_name", explode(col("movie_names_final_cleaned")))
# List of non-movie names to be removed
non_movie_terms = [
"Movie", "Suggestions", "Please", "Directv", "Thanks", "Editing", "Thank",
"Netflix", "Amazon Prime Video", "Google Play Movies", "Amazon Video", "Vudu",
"Tubi Tv", "Hoopla", "Apple Itunes", "Fandangonow", "Hulu", "Rotten Tomatoes",
"Dailymotion", "Fubotv", "Dplay", "Showtime", "Hbo Now", "Amc On Demand",
"Notes", "Great", "Also", "Note", "Tube", "Zz", "Db", "One", "Make", "Yes",
"Good", "Oh", "Yeah", "Love", "P.S", "Man", "Replying", "In the comments on this post I",
"Piracy", "Title", "Characters Minimum", "Maybe", "Fv", "OP", "Definitely",
"Edit", "Came", "Mr", "American", "Really", "Well", "Life", "Sorry", "French",
"But I", "Japanese", "Never", "Lol", "Watch", "House", "Link", "Like", "Dead",
"!<", "Absolutely", "Mother", "Girl", "Nobody",
"Redbox", "M the Creator", "Metacritic", "Korean", "Dprogram",
"Microsoft Store", "Showtime Amazon Channel", "Watched", "Hbo Now Amazon Channel",
"IMO", "Matched", "Loved", "Still", "Seen", "Go", "List", "Let", "Best", "Day",
"Men", "Time", "World", "Even", "Identity", "Manchester", "And I", "Devil", "Super",
"Cbs", "Disney", "Amazing", "English", "Baby", "Blue", "Check", "Dr", "Nice",
"Character Minimum", "Excellent", "Night", "True", "I'm", "Pretty", "Hope", "Evil",
"Two", "Spider", "Hollywood", "Murder", "Probably", "Sling Tv", "Reddit",
]
# Filter out the non-movie names and streaming services
df_flattened = df_flattened.filter(~col("movie_name").isin(non_movie_terms))
df_flattened.show(5)
[Stage 2:> (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ | subreddit| author| body| created_utc|score|controversiality|movie_names_final_cleaned| movie_name| +----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|2021-03-13 10:19:07| 2| 0| [Jacobs Ladder]| Jacobs Ladder| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04| 1| 0| [Holes]| Holes| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...| Vendetta\n\nPulp| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...|V for fiction Dan...| |MovieSuggestions| jaymewheeler|Synchronic was co...|2021-03-13 10:23:37| 2| 0| [Synchronic]| Synchronic| +----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ only showing top 5 rows
df_flattened.write.parquet("s3a://project-group34/project/suggestions/all_movies/flattened/", mode="overwrite")
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.executor.memory", "12g")\
.config("spark.executor.cores", "3")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider"
)\
.getOrCreate()
print(spark.version)
3.4.0
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
df_flattened = spark.read.parquet("s3a://project-group34/project/suggestions/all_movies/flattened/")
df_flattened.show(5)
[Stage 15:> (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ | subreddit| author| body| created_utc|score|controversiality|movie_names_final_cleaned| movie_name| +----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|2021-03-13 10:19:07| 2| 0| [Jacobs Ladder]| Jacobs Ladder| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04| 1| 0| [Holes]| Holes| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...| Vendetta\n\nPulp| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...|V for fiction Dan...| |MovieSuggestions| jaymewheeler|Synchronic was co...|2021-03-13 10:23:37| 2| 0| [Synchronic]| Synchronic| +----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ only showing top 5 rows
df_movie_frequency = df_flattened.groupBy("movie_name").count()
df_movie_frequency.show(5)
[Stage 16:==============================================> (4 + 1) / 5]
+----------+-----+ |movie_name|count| +----------+-----+ | Uoc| 20| | Sword| 44| | Gb| 19| | Kin| 43| | Heaven| 425| +----------+-----+ only showing top 5 rows
from pyspark.sql.functions import desc
# Sort by count in descending order
df_top_20_movies = df_movie_frequency.sort(desc('count')).limit(20)
df_top_20_movies.show()
[Stage 22:==============================================> (4 + 1) / 5]
+--------------+-----+ | movie_name|count| +--------------+-----+ | Hereditary| 2733| | Parasite| 2578| | Interstellar| 2557| | Alien| 2489| | Prisoners| 2478| | The Thing| 2468| | Midsommar| 2446| | Arrival| 2375| | Fight Club| 2338| | Oldboy| 2337| | Blade Runner| 2317| | Annihilation| 2171| | Memento| 2159| | Coherence| 2011| | Inception| 1954| |Shutter Island| 1928| | Saw| 1899| | Nightcrawler| 1890| |The Lighthouse| 1829| | Big| 1753| +--------------+-----+
df_top_20_movies_pd = df_top_20_movies.toPandas()
df_top_20_movies_pd.to_csv("../../data/csv/top20_movies_suggested.csv", index=False)
df_sentiment = spark.read.parquet("s3a://project-group34/project/sentiment_analysis/average_scores_per_movie/")
df_sentiment.show(5)
+--------------------+----------------------+----------------------+-----------+ | title|average_positive_score|average_negative_score|num_reviews| +--------------------+----------------------+----------------------+-----------+ | The Menu| 0.8769120351379525| 0.12308796544559403| 315| |Jim Gaffigan: Obs...| 1.0| 7.713538E-10| 5| | Winter in Wartime| 0.7367684066794117| 0.2632315926923039| 67| | 7500| 0.7694758345625764| 0.23052416603253476| 157| |10 Things I Hate ...| 0.678140454627421| 0.3218595443279501| 77| +--------------------+----------------------+----------------------+-----------+ only showing top 5 rows
from pyspark.sql.functions import col
# Constants from your dataset
C_positive = 0.7525733910082344 # Mean score of positive scores
C_negative = 0.24742646776123378 # Mean score of negative scores
m = 4 # Median number of reviews
# Adding a new column for weighted rating
positive_weighted_rating_df = df_sentiment.withColumn("weighted_rating",
(col("num_reviews") / (col("num_reviews") + m) * col("average_positive_score")) +
(m / (col("num_reviews") + m) * C_positive))
# Adding a new column for weighted rating
negative_weighted_rating_df = df_sentiment.withColumn("weighted_rating",
(col("num_reviews") / (col("num_reviews") + m) * col("average_negative_score")) +
(m / (col("num_reviews") + m) * C_negative))
# Get top 20 movies with highest average positive scores
top_20_positive = positive_weighted_rating_df.orderBy(desc("weighted_rating")).limit(20)
top_20_positive.show()
+--------------------+----------------------+----------------------+-----------+------------------+ | title|average_positive_score|average_negative_score|num_reviews| weighted_rating| +--------------------+----------------------+----------------------+-----------+------------------+ |Jim Allison: Brea...| 1.0| 8.53145166666667E-22| 48|0.9809671839237104| |Hava Nagila (The ...| 0.9867567089655173| 0.013243287766424139| 116|0.9789505983669412| | Trifling Women| 0.9992484054545456| 7.515981201820878E-4| 44|0.9786921542506863| |Blinky Bill the M...| 0.9980725209090909| 0.001927482015539...| 44|0.9776142600840194| |Molly's Theory of...| 0.999998057| 1.942951721449868E-6| 40|0.9775049055462031| |Blind Willow, Sle...| 1.0| 8.522315000009178...| 32|0.9725081545564704| |California Typewr...| 0.9936297735| 0.006370226588242396| 40|0.9717155569098395| | Half Magic| 0.9950310882352942| 0.004968912791776545| 34| 0.969509225369288| | Things Never Said| 0.9922669266666667| 0.007733078447161946| 36|0.9682975731008235| | I'm Leaving Now| 0.9990313485714285| 9.686501978153861E-4| 28|0.9682241038760293| | The Farthest| 0.9999902925925925| 9.70705925925926E-6| 27|0.9680655310978367| |Bill Cunningham N...| 0.9795781553424656| 0.020421847234691988| 73|0.9677857000523757| |Richard Linklater...| 0.9953919132258064| 0.004608088904645...| 31|0.9676412249723696| | Breaking Fast| 0.9993891259259259| 6.108749145002055E-4| 27|0.9675419343236431| | Moonage Daydream| 0.9717051444774194| 0.028294855095545027| 155|0.9661923959624714| | Suzi Q| 0.9946260866666667| 0.005373910904266718| 30|0.9661492989421452| |Salvatore: Shoema...| 0.992385378125| 0.007614620893583769| 32|0.9657396017786927| |Is the Man Who Is...| 0.9876368760526316| 0.012363124857894738| 38|0.9652498774769747| |The Eyes of Orson...| 0.9827869632608698| 0.017213035551343958| 46| 0.964369877480659| | Gentleman Jim| 0.9999992286956522| 7.678868595652176E-7| 23| 0.963343549038257| +--------------------+----------------------+----------------------+-----------+------------------+
df_flattened.show(5)
[Stage 30:> (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ | subreddit| author| body| created_utc|score|controversiality|movie_names_final_cleaned| movie_name| +----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ |MovieSuggestions| katnip_fl| Jacobs Ladder|2021-03-13 10:19:07| 2| 0| [Jacobs Ladder]| Jacobs Ladder| |MovieSuggestions| alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04| 1| 0| [Holes]| Holes| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...| Vendetta\n\nPulp| |MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29| 3| 0| [Vendetta\n\nPulp...|V for fiction Dan...| |MovieSuggestions| jaymewheeler|Synchronic was co...|2021-03-13 10:23:37| 2| 0| [Synchronic]| Synchronic| +----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ only showing top 5 rows
from pyspark.sql.functions import udf, col, levenshtein
from pyspark.sql.types import FloatType
movie_titles = [row['movie_name'] for row in df_top_20_movies.collect()]
movie_titles
['Hereditary', 'Parasite', 'Interstellar', 'Alien', 'Prisoners', 'The Thing', 'Midsommar', 'Arrival', 'Fight Club', 'Oldboy', 'Blade Runner', 'Annihilation', 'Memento', 'Coherence', 'Inception', 'Shutter Island', 'Saw', 'Nightcrawler', 'The Lighthouse', 'Big']
# Filter the sentiment data DataFrame using the list
filtered_sentiment_df = positive_weighted_rating_df.filter(positive_weighted_rating_df['title'].isin(movie_titles))
filtered_sentiment_df.show()
+--------------+----------------------+----------------------+-----------+------------------+ | title|average_positive_score|average_negative_score|num_reviews| weighted_rating| +--------------+----------------------+----------------------+-----------+------------------+ | Big| 0.8549865664698026| 0.14501343196273178| 57| 0.848270948406749| |Shutter Island| 0.7935697030268212| 0.20643029873821836| 259|0.7929461849733066| |The Lighthouse| 0.8567953555230218| 0.1432046450670607| 403| 0.855771061031476| | Hereditary| 0.6772520104153956| 0.3227479907131384| 383|0.6780305259770788| | Arrival| 0.9132867864014926| 0.08671321548572934| 440|0.9118389179745263| | Alien| 0.6957981419639608| 0.30420185732468513| 124|0.6975723684965943| | Parasite| 0.825085770061366| 0.1749142296239439| 1921|0.8249350949880088| | Inception| 0.8771357714563982| 0.12286422944972214| 365|0.8757855017496431| | The Thing| 0.7633445176656004| 0.23665548025341143| 128|0.7630181198881044| | Prisoners| 0.7677609682564671| 0.2322390359840972| 254| 0.767525501942541| | Fight Club| 0.817899975018882| 0.1821000252636096| 174|0.8164319618950472| | Nightcrawler| 0.8417514086150262| 0.1582485925737861| 280|0.8404953801980292| | Interstellar| 0.89228317674853| 0.10771682494792269| 375|0.8908086671364952| | Midsommar| 0.7541998671040593| 0.2458001330190753| 401|0.7541838031426191| | Blade Runner| 0.8798318843731172| 0.12016811486495839| 112|0.8754436604639834| | Annihilation| 0.867995822337821| 0.1320041761747021| 329|0.8666093667062345| | Oldboy| 0.7110966693962014| 0.28890333126306006| 300|0.7116424157332019| | Saw| 0.6824277930089815| 0.3175722098283076| 188|0.6838891596339659| | Coherence| 0.8787566895442769| 0.12124330903455703| 94| 0.87360635082852| | Memento| 0.9059979402771241| 0.09400205504105454| 176|0.9025885058489265| +--------------+----------------------+----------------------+-----------+------------------+
filtered_sentiment_df_pd = filtered_sentiment_df.toPandas()
df_top_20_movies_pd.columns = ["title", "count"]
df_top_20_movies_pd
title | count | |
---|---|---|
0 | Hereditary | 2733 |
1 | Parasite | 2578 |
2 | Interstellar | 2557 |
3 | Alien | 2489 |
4 | Prisoners | 2478 |
5 | The Thing | 2468 |
6 | Midsommar | 2446 |
7 | Arrival | 2375 |
8 | Fight Club | 2338 |
9 | Oldboy | 2337 |
10 | Blade Runner | 2317 |
11 | Annihilation | 2171 |
12 | Memento | 2159 |
13 | Coherence | 2011 |
14 | Inception | 1954 |
15 | Shutter Island | 1928 |
16 | Saw | 1899 |
17 | Nightcrawler | 1890 |
18 | The Lighthouse | 1829 |
19 | Big | 1753 |
filtered_sentiment_df_pd
title | average_positive_score | average_negative_score | num_reviews | weighted_rating | |
---|---|---|---|---|---|
0 | Big | 0.854987 | 0.145013 | 57 | 0.848271 |
1 | Shutter Island | 0.793570 | 0.206430 | 259 | 0.792946 |
2 | The Lighthouse | 0.856795 | 0.143205 | 403 | 0.855771 |
3 | Hereditary | 0.677252 | 0.322748 | 383 | 0.678031 |
4 | Arrival | 0.913287 | 0.086713 | 440 | 0.911839 |
5 | Alien | 0.695798 | 0.304202 | 124 | 0.697572 |
6 | Parasite | 0.825086 | 0.174914 | 1921 | 0.824935 |
7 | Inception | 0.877136 | 0.122864 | 365 | 0.875786 |
8 | The Thing | 0.763345 | 0.236655 | 128 | 0.763018 |
9 | Prisoners | 0.767761 | 0.232239 | 254 | 0.767526 |
10 | Fight Club | 0.817900 | 0.182100 | 174 | 0.816432 |
11 | Nightcrawler | 0.841751 | 0.158249 | 280 | 0.840495 |
12 | Interstellar | 0.892283 | 0.107717 | 375 | 0.890809 |
13 | Midsommar | 0.754200 | 0.245800 | 401 | 0.754184 |
14 | Blade Runner | 0.879832 | 0.120168 | 112 | 0.875444 |
15 | Annihilation | 0.867996 | 0.132004 | 329 | 0.866609 |
16 | Oldboy | 0.711097 | 0.288903 | 300 | 0.711642 |
17 | Saw | 0.682428 | 0.317572 | 188 | 0.683889 |
18 | Coherence | 0.878757 | 0.121243 | 94 | 0.873606 |
19 | Memento | 0.905998 | 0.094002 | 176 | 0.902589 |
import pandas as pd
df_sentiment_of_suggestions = pd.merge(filtered_sentiment_df_pd, df_top_20_movies_pd, on="title")[["title", "weighted_rating", "count"]].sort_values(by="count", ascending=False)
df_sentiment_of_suggestions
title | weighted_rating | count | |
---|---|---|---|
3 | Hereditary | 0.678031 | 2733 |
6 | Parasite | 0.824935 | 2578 |
12 | Interstellar | 0.890809 | 2557 |
5 | Alien | 0.697572 | 2489 |
9 | Prisoners | 0.767526 | 2478 |
8 | The Thing | 0.763018 | 2468 |
13 | Midsommar | 0.754184 | 2446 |
4 | Arrival | 0.911839 | 2375 |
10 | Fight Club | 0.816432 | 2338 |
16 | Oldboy | 0.711642 | 2337 |
14 | Blade Runner | 0.875444 | 2317 |
15 | Annihilation | 0.866609 | 2171 |
19 | Memento | 0.902589 | 2159 |
18 | Coherence | 0.873606 | 2011 |
7 | Inception | 0.875786 | 1954 |
1 | Shutter Island | 0.792946 | 1928 |
17 | Saw | 0.683889 | 1899 |
11 | Nightcrawler | 0.840495 | 1890 |
2 | The Lighthouse | 0.855771 | 1829 |
0 | Big | 0.848271 | 1753 |
df_sentiment_of_suggestions.to_csv("../../data/csv/sentiment_of_suggestions.csv", index = False)
# Define a UDF to calculate normalized Levenshtein similarity score
def similarity_score(str1, str2):
len1 = len(str1)
len2 = len(str2)
max_len = max(len1, len2)
if max_len == 0:
return 1.0
return (max_len - levenshtein(str1, str2)) / max_len
similarity_udf = udf(similarity_score, FloatType())
df_top_500_movies.write.parquet(f"s3a://{bucket}/projects/comments/extracted_movies")
# import sagemaker
# session = sagemaker.Session()
# bucket = "project-group34"
# !wget -qO- https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.1.3.jar | aws s3 cp - s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar
# !aws s3 ls s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar
!mkdir -p ./code
%%writefile ./code/suggestion_extract_process.py
import os
import logging
import argparse
# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType
import re
from pyspark.sql.functions import explode, count
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import SentenceDetector, Tokenizer, WordEmbeddingsModel, NerDLModel, NerConverter
from pyspark.ml import Pipeline
from pyspark.sql.functions import desc
import nltk
nltk.download('stopwords')
eng_stopwords = nltk.corpus.stopwords.words('english')
logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
def main():
parser = argparse.ArgumentParser(description="app inputs and outputs")
parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")
parser.add_argument("--col_name_for_filtering", type=str, help="Name of the column to filter")
args = parser.parse_args()
spark = SparkSession.builder \
.appName("Spark NLP")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
.getOrCreate()
logger.info(f"Spark version: {spark.version}")
logger.info(f"sparknlp version: {sparknlp.version()}")
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set(
"mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
)
# Downloading the data from S3 into a Dataframe
logger.info(f"going to read {args.s3_dataset_path} for r/{args.col_name_for_filtering}")
df = spark.read.parquet(args.s3_dataset_path, header=True)
vals = [args.col_name_for_filtering]
df_filtered = df.where(col("subreddit").isin(vals))
logger.info(f"finished reading files...")
logger.info(f"Number of rows in data: {df_filtered.count()}")
# DATA CLEANING
df_filtered = df_filtered.filter((df.body != '[deleted]') & (df.author != '[deleted]'))
# Define the pipeline stages
document_assembler = DocumentAssembler() \
.setInputCol("body") \
.setOutputCol("document")
sentence_detector = SentenceDetector() \
.setInputCols(["document"]) \
.setOutputCol("sentence")
tokenizer = Tokenizer() \
.setInputCols(["sentence"]) \
.setOutputCol("token")
# Use GloVe embeddings
embeddings = WordEmbeddingsModel.pretrained("glove_100d", "en") \
.setInputCols(["sentence", "token"]) \
.setOutputCol("embeddings")
# Use a lighter NER model
ner_model = NerDLModel.pretrained("ner_dl", "en") \
.setInputCols(["sentence", "token", "embeddings"]) \
.setOutputCol("ner")
ner_converter = NerConverter() \
.setInputCols(["sentence", "token", "ner"]) \
.setOutputCol("ner_chunk")
# Build the pipeline
nlp_pipeline = Pipeline(stages=[
document_assembler,
sentence_detector,
tokenizer,
embeddings,
ner_model,
ner_converter
])
# Apply the pipeline to your DataFrame
model = nlp_pipeline.fit(df_filtered)
result = model.transform(df_filtered)
print("NLP Pipeline Ran Succesfully!")
result = result.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality", "ner_chunk")
# Define a UDF to filter and extract anime names
def extract_anime(chunks):
anime_names = [chunk.result for chunk in chunks if chunk.metadata['entity'] in ['PERSON', 'ORG']]
return anime_names
extract_anime_names_udf = udf(extract_anime, ArrayType(StringType()))
# Apply the UDF to the DataFrame
anime_df = result.withColumn("movie_names", extract_anime_names_udf(F.col("ner_chunk")))
anime_df.write.parquet("s3a://project-group34/project/suggestions/all_anime/ner/", mode="overwrite")
logger.info(f"all done...")
if __name__ == "__main__":
main()
Overwriting ./code/suggestion_extract_process.py
%%time
import boto3
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
account_id = boto3.client('sts').get_caller_identity()['Account']
CPU times: user 14.8 ms, sys: 61 µs, total: 14.9 ms Wall time: 34.9 ms
account_id
'655678691473'
%%time
import time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor
# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
base_job_name="sm-spark-project",
image_uri=f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark:latest",
framework_version="3.3",
role=role,
instance_count=8,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=7200,
)
# # S3 URI of the initialization script
# s3_uri_init_script = f's3://{bucket}/{script_key}'
# s3 paths
session = sagemaker.Session()
output_prefix_logs = f"spark_logs"
configuration = [
{
"Classification": "spark-defaults",
"Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
}
]
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml CPU times: user 76.8 ms, sys: 0 ns, total: 76.8 ms Wall time: 131 ms
%%time
print(f"going to extract suggestions data for subreddit=Animesuggest")
bucket = "project-group34"
output_prefix_data_comments = "project/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
col_name_for_filtering = "Animesuggest"
# run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
spark_processor.run(
submit_app="./code/suggestion_extract_process.py",
submit_jars=[f"s3://{bucket}/spark-nlp-assembly-5.1.3.jar"],
arguments=[
"--s3_dataset_path",
s3_path,
"--col_name_for_filtering",
col_name_for_filtering,
],
spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
logs=False,
configuration=configuration
)
# give some time for resources from this iterations to get cleaned up
# if we start the job immediately we could get insufficient resources error
time.sleep(60)
going to extract suggestions data for subreddit=Animesuggest
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-11-26-18-21-00-232
.......................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................!CPU times: user 5.03 s, sys: 438 ms, total: 5.47 s Wall time: 1h 58min 36s
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
print(f"Spark version: {spark.version}")
print(f"sparknlp version: {sparknlp.version()}")
Spark version: 3.4.0 sparknlp version: 5.1.3
%%time
bucket = "project-group34"
session = sagemaker.Session()
output_prefix_data_comments = "project/comments/yyyy=2021"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
print(f"reading comments from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml reading comments from s3a://project-group34/project/comments/yyyy=2021
23/11/26 03:18:48 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
CPU times: user 205 ms, sys: 8.31 ms, total: 214 ms Wall time: 6.22 s
# comments = comments.cache()
comments.printSchema()
root |-- author: string (nullable = true) |-- author_cakeday: boolean (nullable = true) |-- author_flair_css_class: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- body: string (nullable = true) |-- can_gild: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- created_utc: timestamp (nullable = true) |-- distinguished: string (nullable = true) |-- edited: string (nullable = true) |-- gilded: long (nullable = true) |-- id: string (nullable = true) |-- is_submitter: boolean (nullable = true) |-- link_id: string (nullable = true) |-- parent_id: string (nullable = true) |-- permalink: string (nullable = true) |-- retrieved_on: timestamp (nullable = true) |-- score: long (nullable = true) |-- stickied: boolean (nullable = true) |-- subreddit: string (nullable = true) |-- subreddit_id: string (nullable = true)
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality").show()
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ | subreddit| author| body| parent_id| id| created_utc|score|controversiality| +----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ | Animesuggest| Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52| 2| 0| | Animesuggest| Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05| 1| 0| | Animesuggest| [deleted]| [deleted]| t3_m3vnjl|gqscjse|2021-03-13 10:18:25| 1| 0| |MovieSuggestions| katnip_fl| Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07| 2| 0| | Animesuggest| Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26| 1| 0| | Animesuggest| Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26| 3| 0| |MovieSuggestions| alienstabler|[Holes (2003)](ht...| t3_l20wrm|gqscozr|2021-03-13 10:21:04| 1| 0| | Animesuggest| Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10| 1| 0| | Animesuggest|crash-scientist|I didn’t mean to ...|t1_gqs4syw|gqscp6i|2021-03-13 10:21:10| 2| 0| | Animesuggest| dorting| Watch Evangelion| t3_m3vnjl|gqscp9a|2021-03-13 10:21:13| 2| 0| |MovieSuggestions| ThalesHedonist|V for Vendetta\n\...| t3_m3rw47|gqscpsz|2021-03-13 10:21:29| 3| 0| | Animesuggest| Athenza|{Vampire Hunter D...| t3_m3wy06|gqscspa|2021-03-13 10:22:51| 2| 0| | Animesuggest| Roboragi|**Vampire Hunter ...|t1_gqscspa|gqsct5l|2021-03-13 10:23:05| 2| 0| |MovieSuggestions| jaymewheeler|Synchronic was co...| t3_m3rw47|gqscu76|2021-03-13 10:23:37| 2| 0| | Animesuggest| Arvidex|- {Wonder Egg Pei...| t3_m3vnjl|gqscuva|2021-03-13 10:23:57| 2| 0| | Animesuggest| Roboragi|**Wonder Egg Prio...|t1_gqscuva|gqscvqx|2021-03-13 10:24:25| 1| 0| | Animesuggest| mgd5800|{Nejimaki Seirei ...| t3_m43dco|gqscxbt|2021-03-13 10:25:12| 2| 0| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...| t3_m3rw47|gqscxjk|2021-03-13 10:25:19| 2| 0| | Animesuggest| Roboragi|**Nejimaki Seirei...|t1_gqscxbt|gqscy94|2021-03-13 10:25:40| 2| 0| | Animesuggest| DyeDye1234|{is the order a r...| t3_m41ujb|gqsd2ku|2021-03-13 10:27:55| 1| 0| +----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ only showing top 20 rows
# Filter out rows where 'body' or 'author' is '[deleted]'
comments_filtered = comments.filter((comments.body != '[deleted]') & (comments.author != '[deleted]'))
# Show the filtered DataFrame
comments_filtered = comments_filtered.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality")
comments_filtered.show()
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ | subreddit| author| body| parent_id| id| created_utc|score|controversiality| +----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ | Animesuggest| Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52| 2| 0| | Animesuggest| Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05| 1| 0| |MovieSuggestions| katnip_fl| Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07| 2| 0| | Animesuggest| Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26| 1| 0| | Animesuggest| Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26| 3| 0| |MovieSuggestions| alienstabler|[Holes (2003)](ht...| t3_l20wrm|gqscozr|2021-03-13 10:21:04| 1| 0| | Animesuggest| Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10| 1| 0| | Animesuggest|crash-scientist|I didn’t mean to ...|t1_gqs4syw|gqscp6i|2021-03-13 10:21:10| 2| 0| | Animesuggest| dorting| Watch Evangelion| t3_m3vnjl|gqscp9a|2021-03-13 10:21:13| 2| 0| |MovieSuggestions| ThalesHedonist|V for Vendetta\n\...| t3_m3rw47|gqscpsz|2021-03-13 10:21:29| 3| 0| | Animesuggest| Athenza|{Vampire Hunter D...| t3_m3wy06|gqscspa|2021-03-13 10:22:51| 2| 0| | Animesuggest| Roboragi|**Vampire Hunter ...|t1_gqscspa|gqsct5l|2021-03-13 10:23:05| 2| 0| |MovieSuggestions| jaymewheeler|Synchronic was co...| t3_m3rw47|gqscu76|2021-03-13 10:23:37| 2| 0| | Animesuggest| Arvidex|- {Wonder Egg Pei...| t3_m3vnjl|gqscuva|2021-03-13 10:23:57| 2| 0| | Animesuggest| Roboragi|**Wonder Egg Prio...|t1_gqscuva|gqscvqx|2021-03-13 10:24:25| 1| 0| | Animesuggest| mgd5800|{Nejimaki Seirei ...| t3_m43dco|gqscxbt|2021-03-13 10:25:12| 2| 0| |MovieSuggestions| SwissBliss|Climax!!!!! Sound...| t3_m3rw47|gqscxjk|2021-03-13 10:25:19| 2| 0| | Animesuggest| Roboragi|**Nejimaki Seirei...|t1_gqscxbt|gqscy94|2021-03-13 10:25:40| 2| 0| | Animesuggest| DyeDye1234|{is the order a r...| t3_m41ujb|gqsd2ku|2021-03-13 10:27:55| 1| 0| | Animesuggest| Roboragi|**Gochuumon wa Us...|t1_gqsd2ku|gqsd3ca|2021-03-13 10:28:18| 1| 0| +----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+ only showing top 20 rows
comments_filtered = comments_filtered.where(col("subreddit").isin("Animesuggest"))
# # Define the pipeline stages
# document_assembler = DocumentAssembler() \
# .setInputCol("body") \
# .setOutputCol("document")
# sentence_detector = SentenceDetector() \
# .setInputCols(["document"]) \
# .setOutputCol("sentence")
# tokenizer = Tokenizer() \
# .setInputCols(["sentence"]) \
# .setOutputCol("token")
# # Use a pretrained embeddings model, for example, BERT
# embeddings = BertEmbeddings.pretrained("bert_base_cased", "en") \
# .setInputCols(["sentence", "token"]) \
# .setOutputCol("embeddings")
# ner_model = NerDLModel.pretrained("ner_dl_bert", "en") \
# .setInputCols(["sentence", "token", "embeddings"]) \
# .setOutputCol("ner")
# ner_converter = NerConverter() \
# .setInputCols(["sentence", "token", "ner"]) \
# .setOutputCol("ner_chunk")
# # Build the pipeline
# nlp_pipeline = Pipeline(stages=[
# document_assembler,
# sentence_detector,
# tokenizer,
# embeddings,
# ner_model,
# ner_converter
# ])
# # Apply the pipeline to your DataFrame
# model = nlp_pipeline.fit(comments_filtered_movies)
# result = model.transform(comments_filtered_movies)
bert_base_cased download started this may take some time. Approximate size to download 384.9 MB [OK!] ner_dl_bert download started this may take some time. Approximate size to download 15.4 MB [OK!]
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import SentenceDetector, Tokenizer, WordEmbeddingsModel, NerDLModel, NerConverter
from pyspark.ml import Pipeline
# Define the pipeline stages
document_assembler = DocumentAssembler() \
.setInputCol("body") \
.setOutputCol("document")
sentence_detector = SentenceDetector() \
.setInputCols(["document"]) \
.setOutputCol("sentence")
tokenizer = Tokenizer() \
.setInputCols(["sentence"]) \
.setOutputCol("token")
# Use GloVe embeddings
embeddings = WordEmbeddingsModel.pretrained("glove_100d", "en") \
.setInputCols(["sentence", "token"]) \
.setOutputCol("embeddings")
# Use a lighter NER model
ner_model = NerDLModel.pretrained("ner_dl", "en") \
.setInputCols(["sentence", "token", "embeddings"]) \
.setOutputCol("ner")
ner_converter = NerConverter() \
.setInputCols(["sentence", "token", "ner"]) \
.setOutputCol("ner_chunk")
# Build the pipeline
nlp_pipeline = Pipeline(stages=[
document_assembler,
sentence_detector,
tokenizer,
embeddings,
ner_model,
ner_converter
])
# Apply the pipeline to your DataFrame
model = nlp_pipeline.fit(comments_filtered)
result = model.transform(comments_filtered)
result = result.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality", "ner_chunk")
result.show(5)
[Stage 6:> (0 + 1) / 1]
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+ | subreddit| author| body| parent_id| id| created_utc|score|controversiality| ner_chunk| +------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+ |Animesuggest| Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52| 2| 0|[{chunk, 24, 29, ...| |Animesuggest| Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05| 1| 0|[{chunk, 2, 4, Im...| |Animesuggest| Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26| 1| 0|[{chunk, 9, 12, T...| |Animesuggest|Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26| 3| 0|[{chunk, 4, 10, M...| |Animesuggest| Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10| 1| 0|[{chunk, 2, 5, Ki...| +------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+ only showing top 5 rows
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as F
# Define a UDF to filter and extract movie names
def extract_anime(chunks):
movie_names = [chunk.result for chunk in chunks if chunk.metadata['entity'] in ['PERSON', 'ORG']]
return movie_names
extract_anime_names_udf = udf(extract_anime, ArrayType(StringType()))
# Apply the UDF to the DataFrame
anime_df = result.withColumn("movie_names", extract_anime_names_udf(F.col("ner_chunk")))
# # Display the results
# anime_df.select("body", "movie_names").show()
anime_df.show(5)
[Stage 7:> (0 + 1) / 1]
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+ | subreddit| author| body| parent_id| id| created_utc|score|controversiality| ner_chunk| movie_names| +------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+ |Animesuggest| Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52| 2| 0|[{chunk, 24, 29, ...| []| |Animesuggest| Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05| 1| 0|[{chunk, 2, 4, Im...|[Ima, Soko ni Iru...| |Animesuggest| Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26| 1| 0|[{chunk, 9, 12, T...|[The Beautiful Wo...| |Animesuggest|Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26| 3| 0|[{chunk, 4, 10, M...| []| |Animesuggest| Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10| 1| 0|[{chunk, 2, 5, Ki...|[Mobile Suit Gund...| +------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+ only showing top 5 rows
anime_df.write.parquet("s3a://project-group34/project/suggestions/all_anime/ner/2021", mode="overwrite")
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.executor.memory", "12g")\
.config("spark.executor.cores", "3")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider"
)\
.getOrCreate()
print(spark.version)
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
anime_df = spark.read.parquet("s3a://project-group34/project/suggestions/all_anime/ner/")
anime_df.show(5)
[Stage 3:> (0 + 1) / 1]
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+ | subreddit| author| body| parent_id| id| created_utc|score|controversiality| ner_chunk| movie_names| +------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+ |Animesuggest| Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52| 2| 0|[{chunk, 24, 29, ...| []| |Animesuggest| Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05| 1| 0|[{chunk, 2, 4, Im...|[Ima, Soko ni Iru...| |Animesuggest| Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26| 1| 0|[{chunk, 9, 12, T...|[The Beautiful Wo...| |Animesuggest|Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26| 3| 0|[{chunk, 4, 10, M...| []| |Animesuggest| Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10| 1| 0|[{chunk, 2, 5, Ki...|[Mobile Suit Gund...| +------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+ only showing top 5 rows
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField
# import spacy
import re
# # Load spaCy model
# nlp = spacy.load("en_core_web_sm")
# # Define schema for the UDF output
# anime_schema = StructType([
# StructField("anime_positions", ArrayType(ArrayType(StringType()))),
# StructField("anime_names", ArrayType(StringType()))
# ])
# # UDF to extract anime names
# @udf(anime_schema)
# def extract_anime_names_udf(text):
# doc = nlp(text)
# anime_positions = []
# anime_names = []
# for ent in doc.ents:
# if ent.label_ == "ORG" or ent.label_ == "PERSON":
# anime_positions.append([ent.start_char, ent.end_char])
# anime_names.append(ent.text)
# return (anime_positions, anime_names)
# UDF to remove anime names
@udf(StringType())
def remove_anime_names_udf(text, anime_names):
if anime_names:
for name in anime_names:
text = text.replace(name, ' ')
return ' '.join(text.split())
else:
return text
# UDF to extract anime names using regex
@udf(ArrayType(StringType()))
def extract_anime_names_regex_udf(text, anime_names):
anime_name_pattern = r'(?:\"([^\"]+)\"|([A-Z][a-z]*(?:\s+(?:[a-z]+\s+)*[A-Z][a-z]*)*)(?: \(\d{4}\))?)'
anime_matches = re.findall(anime_name_pattern, text)
anime = [match[0] or match[1] or match[2] for match in anime_matches]
return anime_names + anime
# Remove anime names from the 'body' text
df_removed_anime_names = anime_df.withColumn("body_no_anime", remove_anime_names_udf(anime_df["body"], anime_df["movie_names"]))
# Regex method to supplement the NER extraction
df_final = df_removed_anime_names.withColumn("additional_anime_names", extract_anime_names_regex_udf(df_removed_anime_names["body_no_anime"], df_removed_anime_names["movie_names"]))
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "additional_anime_names")
df_final.show(5)
[Stage 4:> (0 + 1) / 1]
+------------+-----------+--------------------+-------------------+-----+----------------+----------------------+ | subreddit| author| body| created_utc|score|controversiality|additional_anime_names| +------------+-----------+--------------------+-------------------+-----+----------------+----------------------+ |Animesuggest| Athenza|{Now and Then, He...|2021-03-13 10:15:52| 2| 0| [Now and Then, He...| |Animesuggest| Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05| 1| 0| [Ima, Soko ni Iru...| |Animesuggest| Athenza|{Kino no Tabi: Th...|2021-03-13 10:20:26| 1| 0| [The Beautiful Wo...| |Animesuggest|Dropsoftime|Try Mahouka kouko...|2021-03-13 10:20:26| 3| 0| [Try Mahouka]| |Animesuggest| Roboragi|**Kino no Tabi: T...|2021-03-13 10:21:10| 1| 0| [Mobile Suit Gund...| +------------+-----------+--------------------+-------------------+-----+----------------+----------------------+ only showing top 5 rows
import nltk
#nltk.download('stopwords')
eng_stopwords = nltk.corpus.stopwords.words('english')
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
def remove_stop_word_from_anime_names(animes):
# Filter out single-word anime names that are in the stop words list
return [anime for anime in animes if not (len(anime.split()) == 1 and anime.lower() in eng_stopwords)]
remove_stop_word_udf = udf(remove_stop_word_from_anime_names, ArrayType(StringType()))
df_final = df_final.withColumn("anime_names_final", remove_stop_word_udf(df_final["additional_anime_names"]))
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "anime_names_final")
# Define a UDF to filter out single-letter anime names
def remove_single_letter_animes(anime_names):
return [name for name in anime_names if len(name.strip()) > 1]
remove_single_letter_animes_udf = udf(remove_single_letter_animes, ArrayType(StringType()))
# Apply the UDF to the DataFrame
df_final = df_final.withColumn("anime_names_final_cleaned", remove_single_letter_animes_udf("anime_names_final"))
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "anime_names_final_cleaned")
df_final.show()
+------------+---------------+--------------------+-------------------+-----+----------------+-------------------------+ | subreddit| author| body| created_utc|score|controversiality|anime_names_final_cleaned| +------------+---------------+--------------------+-------------------+-----+----------------+-------------------------+ |Animesuggest| Athenza|{Now and Then, He...|2021-03-13 10:15:52| 2| 0| [Now and Then, He...| |Animesuggest| Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05| 1| 0| [Ima, Soko ni Iru...| |Animesuggest| Athenza|{Kino no Tabi: Th...|2021-03-13 10:20:26| 1| 0| [The Beautiful Wo...| |Animesuggest| Dropsoftime|Try Mahouka kouko...|2021-03-13 10:20:26| 3| 0| [Try Mahouka]| |Animesuggest| Roboragi|**Kino no Tabi: T...|2021-03-13 10:21:10| 1| 0| [Mobile Suit Gund...| |Animesuggest|crash-scientist|I didn’t mean to ...|2021-03-13 10:21:10| 2| 0| [IDK, But you lit...| |Animesuggest| dorting| Watch Evangelion|2021-03-13 10:21:13| 2| 0| [Watch Evangelion]| |Animesuggest| Athenza|{Vampire Hunter D...|2021-03-13 10:22:51| 2| 0| [{Vampire Hunter ...| |Animesuggest| Roboragi|**Vampire Hunter ...|2021-03-13 10:23:05| 2| 0| [Bloodlust, Vampi...| |Animesuggest| Arvidex|- {Wonder Egg Pei...|2021-03-13 10:23:57| 2| 0| [{Wonder Egg Peio...| |Animesuggest| Roboragi|**Wonder Egg Prio...|2021-03-13 10:24:25| 1| 0| [Wonder Egg, Wond...| |Animesuggest| mgd5800|{Nejimaki Seirei ...|2021-03-13 10:25:12| 2| 0| [The Labyrinth of...| |Animesuggest| Roboragi|**Nejimaki Seirei...|2021-03-13 10:25:40| 2| 0| [Sky, Alderamin, ...| |Animesuggest| DyeDye1234|{is the order a r...|2021-03-13 10:27:55| 1| 0| [Tanaka]| |Animesuggest| Roboragi|**Gochuumon wa Us...|2021-03-13 10:28:18| 1| 0| [KINMOZA!"),, Yur...| |Animesuggest| _-Sandwitch-_|Third Mushishi an...|2021-03-13 10:30:02| 10| 0| [Third Mushishi a...| |Animesuggest| DyeDye1234|Not a manga but {...|2021-03-13 10:31:48| 1| 0| []| |Animesuggest| Roboragi|**Katanagatari** ...|2021-03-13 10:32:15| 1| 0| [Katanagatari, En...| |Animesuggest| MaCl97| "Monster"|2021-03-13 10:34:28| 1| 0| [Monster]| |Animesuggest| kyriosgreek|>Mahouka kouko...|2021-03-13 10:35:44| 1| 0| [Mahouka]| +------------+---------------+--------------------+-------------------+-----+----------------+-------------------------+ only showing top 20 rows
df_final.write.parquet("s3a://project-group34/project/suggestions/all_animes/cleaned_with_regex/", mode="overwrite")
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.executor.memory", "12g")\
.config("spark.executor.cores", "3")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider"
)\
.getOrCreate()
print(spark.version)
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
df_final = spark.read.parquet("s3a://project-group34/project/suggestions/all_animes/cleaned_with_regex/")
23/11/26 20:54:25 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
df_final.show(5)
[Stage 1:> (0 + 1) / 1]
+------------+-----------+--------------------+-------------------+-----+----------------+-------------------------+ | subreddit| author| body| created_utc|score|controversiality|anime_names_final_cleaned| +------------+-----------+--------------------+-------------------+-----+----------------+-------------------------+ |Animesuggest| Athenza|{Now and Then, He...|2021-03-13 10:15:52| 2| 0| [Now and Then, He...| |Animesuggest| Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05| 1| 0| [Ima, Soko ni Iru...| |Animesuggest| Athenza|{Kino no Tabi: Th...|2021-03-13 10:20:26| 1| 0| [The Beautiful Wo...| |Animesuggest|Dropsoftime|Try Mahouka kouko...|2021-03-13 10:20:26| 3| 0| [Try Mahouka]| |Animesuggest| Roboragi|**Kino no Tabi: T...|2021-03-13 10:21:10| 1| 0| [Mobile Suit Gund...| +------------+-----------+--------------------+-------------------+-----+----------------+-------------------------+ only showing top 5 rows
from pyspark.sql.functions import explode, col, count, split
# Flatten the anime_names column
df_flattened = df_final.withColumn("anime_name", explode(col("anime_names_final_cleaned")))
from pyspark.sql.functions import when
# Rename specified anime titles
df_flattened = df_flattened.withColumn("anime_name",
when(col("anime_name") == "Steins", "Steins Gate")
.when(col("anime_name") == "Gate", "Steins Gate")
.when(col("anime_name") == "One", "One Piece")
.when(col("anime_name") == "Attack", "Attack on Titan")
.when(col("anime_name") == "Titan", "Attack on Titan")
.when(col("anime_name") == "Kaguya", "Kaguya-sama: Love Is War")
.when(col("anime_name") == "English: Made in Abyss", "Made in Abyss")
.when(col("anime_name") == "English: Violet Evergarden", "Violet Evergarden")
.when(col("anime_name") == "Steins; ", "Steins Gate")
.when(col("anime_name") == "AOT", "Attack on Titan")
.otherwise(col("anime_name")))
from pyspark.sql.functions import regexp_replace, col
# Replace "English: " and "English : " with an empty string
df_flattened = df_flattened.withColumn("anime_name", regexp_replace(col("anime_name"), "English: ?", ""))
from pyspark.sql.functions import trim, col
df_flattened = df_flattened.withColumn("anime_name", trim(col("anime_name")))
non_anime_terms = [
"Roboragi", "Status", "Genres", "Finished", "Episodes", "English: ",
"Animesuggest", "Edit", "Source", "Nihilate", "Synonyms", "Mistake",
"Drama", "Comedy", "Action", "Romance", "Fantasy", "Slice of Life",
"Supernatural", "Adventure", "Psychological", "Sci", "Fi", "Mystery",
"Thriller", "Horror", "Thanks", "Movie", "Ecchi", "English", "Thank",
"Manga", "Also", "Mecha", "Please", "Sports", "Releasing", "Music", "Yeah",
"English: The ", "Oh", "Fate", "Chapters", "OP", "Volumes", "Yes", "Episode",
"Maybe", "English: :", "Well", "Anime", "Season", "V Short", "Juj", "English: The",
"Jo", "English: A", "Netflix", "Great", "", "A", ":", "on", "Really", "Like", "But I",
"The"
]
# Filter out the non-anime names and streaming services
df_flattened = df_flattened.filter(~col("anime_name").isin(non_anime_terms))
df_flattened.show(5)
+------------+--------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ | subreddit| author| body| created_utc|score|controversiality|anime_names_final_cleaned| anime_name| +------------+--------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ |Animesuggest| Athenza|{Now and Then, He...|2021-03-13 10:15:52| 2| 0| [Now and Then, He...| Now and Then| |Animesuggest| Athenza|{Now and Then, He...|2021-03-13 10:15:52| 2| 0| [Now and Then, He...| Here and There| |Animesuggest|Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05| 1| 0| [Ima, Soko ni Iru...| Ima| |Animesuggest|Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05| 1| 0| [Ima, Soko ni Iru...| Soko ni Iru Boku| |Animesuggest|Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05| 1| 0| [Ima, Soko ni Iru...|Now and Then, Her...| +------------+--------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ only showing top 5 rows
df_flattened.write.parquet("s3a://project-group34/project/suggestions/all_animes/flattened/", mode="overwrite")
# Import pyspark and build Spark session
from pyspark.sql import SparkSession
# Import pyspark and build Spark session
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[*]")\
.config("spark.driver.memory","16G")\
.config("spark.executor.memory", "12g")\
.config("spark.executor.cores", "3")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider"
)\
.getOrCreate()
print(spark.version)
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
df_flattened = spark.read.parquet("s3a://project-group34/project/suggestions/all_animes/flattened/")
df_flattened.show(5)
[Stage 3:> (0 + 1) / 1]
+------------+----------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ | subreddit| author| body| created_utc|score|controversiality|anime_names_final_cleaned| anime_name| +------------+----------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ |Animesuggest| ElegantTea122|Happy Sugar Life\...|2021-01-28 17:24:45| 1| 0| [Happy Sugar Life...|Happy Sugar Life\...| |Animesuggest| ElegantTea122|Happy Sugar Life\...|2021-01-28 17:24:45| 1| 0| [Happy Sugar Life...| All have M| |Animesuggest|fearlesslalready| {God of highschool}|2021-01-28 17:27:15| 1| 1| [God]| God| |Animesuggest| DuckFantic|I’ve never heard ...|2021-01-28 17:28:46| 3| 0| [Black Clover, I’ll]| Black Clover| |Animesuggest| DuckFantic|I’ve never heard ...|2021-01-28 17:28:46| 3| 0| [Black Clover, I’ll]| I’ll| +------------+----------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+ only showing top 5 rows
df_anime_frequency = df_flattened.groupBy("anime_name").count()
df_anime_frequency.show(5)
[Stage 4:===============================================> (4 + 1) / 5]
+--------------------+-----+ | anime_name|count| +--------------------+-----+ | Sword| 565| |Apollon (Kids on ...| 3| | Vivy| 2813| | Keiyakusha| 250| | S G| 117| +--------------------+-----+ only showing top 5 rows
from pyspark.sql.functions import desc
# Sort by count in descending order
df_top_100_animes = df_anime_frequency.sort(desc('count')).limit(100)
df_top_100_animes.show(100, truncate=False)
[Stage 7:===============================================> (4 + 1) / 5]
+---------------------------------+-----+ |anime_name |count| +---------------------------------+-----+ |Steins Gate |19606| |Attack on Titan |16722| |One Piece |10936| |Violet Evergarden |8780 | |Made in Abyss |8558 | |Monster |8543 | |Clannad |7461 | |Zero |6724 | |Death Note |6559 | |Code Geass |6169 | |Cowboy Bebop |5758 | |Horimiya |5710 | |Gintama |5654 | |Berserk |5485 | |Dororo |5053 | |Vinland Saga |5043 | |Overlord |4887 | |Naruto |4778 | |Death Parade |4663 | |Mob Psycho |4621 | |Hunter x Hunter |4505 | |Fruits Basket |4500 | |Toradora |4356 | |Brotherhood |4338 | |Fullmetal Alchemist |4324 | |Kaguya-sama: Love Is War |4114 | |Another |3886 | |Mushishi |3799 | |Parasyte |3754 | |Erased |3694 | |Akudama Drive |3595 | |Hyouka |3587 | |Dorohedoro |3571 | |Mahou Shoujo |3527 | |Black Lagoon |3428 | |Samurai Champloo |3367 | |Jujutsu Kaisen |3301 | |Neon Genesis Evangelion |3275 | |One Punch Man |3184 | |Your Lie in April |3152 | |Silent Voice |3106 | |Odd Taxi |3101 | |Nichijou |3080 | |Gurren Lagann |3044 | |Bleach |3044 | |Definitely |2987 | |Konosuba |2929 | |Akame |2923 | |Love |2920 | |Tokyo Ghoul |2902 | |Ergo Proxy |2858 | |Trigun |2835 | |Charlotte |2818 | |Vivy |2813 | |Black Clover |2783 | |Talentless Nana |2732 | |Shiki |2729 | |Barakamon |2696 | |From the New World |2676 | |Kaiji |2673 | |Try |2645 | |Madoka Magica |2641 | |Perfect Blue |2622 | |Psycho |2620 | |Good |2613 | |Haikyuu |2610 | |A Place Further Than the Universe|2609 | |Death |2596 | |Land of the Lustrous |2563 | |Dr |2528 | |ERASED |2460 | |Watch |2449 | |Demon Slayer |2405 | |Grand Blue |2405 | |Puella Magi Madoka Magica |2400 | |Akira |2361 | |Angel Beats |2346 | |Baccano |2338 | |Golden Time |2337 | |Serial Experiments Lain |2334 | |Oregairu |2327 | |Galactic Heroes |2308 | |March comes in like a lion |2290 | |Sword Art Online |2283 | |Claymore |2276 | |Promised Neverland |2254 | |Evangelion |2250 | |Lion |2248 | |Mononoke |2219 | |Wotakoi |2205 | |100 |2190 | |Assassination Classroom |2146 | |Terror in Resonance |2130 | |Bot |2106 | |Black |2101 | |Jojo |2092 | |Asobi Asobase |2091 | |Noragami |2077 | |Orange |2070 | |Elfen Lied |2055 | +---------------------------------+-----+
df_top_100_animes_pd = df_top_100_animes.toPandas()
df_top_100_animes_pd
anime_name | count | |
---|---|---|
0 | Steins Gate | 19606 |
1 | Attack on Titan | 16722 |
2 | One Piece | 10936 |
3 | Violet Evergarden | 8780 |
4 | Made in Abyss | 8558 |
... | ... | ... |
95 | Jojo | 2092 |
96 | Asobi Asobase | 2091 |
97 | Noragami | 2077 |
98 | Orange | 2070 |
99 | Elfen Lied | 2055 |
100 rows × 2 columns
df_top_100_animes_pd.to_csv("../../data/csv/top100_animes_suggested.csv", index=False)
import pandas as pd
df_reddit = pd.read_csv("../../data/csv/top100_animes_suggested.csv")
df_reddit.head()
anime_name | japanese_title | count | |
---|---|---|---|
0 | Steins Gate | Steins;Gate | 19606 |
1 | Attack on Titan | Shingeki no Kyojin | 16722 |
2 | One Piece | Wan P_su | 10936 |
3 | Violet Evergarden | Vaioretto Ev_g_den | 8780 |
4 | Made in Abyss | Meido in Abisu | 8558 |
df_external = pd.read_csv("../../data/csv/anime_data.csv")
df_external.head()
Unnamed: 0 | title | genre | episodes | popularity | score | |
---|---|---|---|---|---|---|
0 | 0 | "0" | ['Music'] | 1.0 | 7345.0 | 4.77 |
1 | 1 | "Aesop" no Ohanashi yori: Ushi to Kaeru, Yokub... | ['Kids'] | 1.0 | 12413.0 | 5.61 |
2 | 2 | "Bungaku Shoujo" Kyou no Oyatsu: Hatsukoi | ['Comedy', 'Fantasy', 'School'] | 1.0 | 3466.0 | 6.96 |
3 | 3 | "Bungaku Shoujo" Memoire | ['Drama', 'Romance', 'School'] | 3.0 | 2943.0 | 7.40 |
4 | 4 | "Bungaku Shoujo" Movie | ['Mystery', 'Drama', 'Romance', 'School'] | 1.0 | 1799.0 | 7.48 |
from fuzzywuzzy import process, fuzz
def get_best_match(name, choices, scorer=fuzz.WRatio, limit=1):
return process.extractOne(name, choices, scorer=scorer)
titles = df_external['title'].tolist()
# Apply fuzzy matching
df_reddit['matched_title'] = df_reddit['japanese_title'].apply(lambda x: get_best_match(x, titles, scorer=fuzz.WRatio)[0])
df_reddit['matched_score'] = df_reddit['japanese_title'].apply(lambda x: df_external.loc[df_external['title'] == get_best_match(x, titles, scorer=fuzz.WRatio)[0], 'score'].iloc[0])
# Apply fuzzy matching
df_reddit['matched_title_english'] = df_reddit['anime_name'].apply(lambda x: get_best_match(x, titles, scorer=fuzz.WRatio)[0])
df_reddit['matched_score_english'] = df_reddit['anime_name'].apply(lambda x: df_external.loc[df_external['title'] == get_best_match(x, titles, scorer=fuzz.WRatio)[0], 'score'].iloc[0])
df_reddit.head(20)
anime_name | japanese_title | count | matched_title | matched_score | matched_title_english | matched_score_english | |
---|---|---|---|---|---|---|---|
0 | Steins Gate | Steins;Gate | 19606 | Steins;Gate | 9.11 | Steins;Gate | 9.11 |
1 | Attack on Titan | Shingeki no Kyojin | 16722 | Shingeki no Kyojin | 8.47 | K-On! | 7.86 |
2 | One Piece | Wan P_su | 10936 | Doraemon Movie 25: Nobita no Wan Nyan Jikuuden | 7.42 | One Piece | 8.53 |
3 | Violet Evergarden | Vaioretto Ev_g_den | 8780 | gdMen | 5.95 | Violet Evergarden | 8.62 |
4 | Made in Abyss | Meido in Abisu | 8558 | Isu | 4.56 | Made in Abyss | 8.83 |
5 | Monster | Monsut_ | 8543 | Monsuto Anime | 6.44 | Monster | 8.69 |
6 | Clannad | Kuranado | 7461 | K | 7.62 | Clannad | 8.16 |
7 | Zero | Zero | 6724 | AIKa Zero | 5.98 | AIKa Zero | 5.98 |
8 | Death Note | Desu N_to | 6559 | TO | 6.43 | Death Note | 8.65 |
9 | Code Geass | Code Geass | 6169 | Code Geass: Boukoku no Akito 1 - Yokuryuu wa M... | 7.49 | Code Geass: Boukoku no Akito 1 - Yokuryuu wa M... | 7.49 |
10 | Cowboy Bebop | Cowboy Bebop | 5758 | Cowboy Bebop | 8.81 | Cowboy Bebop | 8.81 |
11 | Horimiya | Horimiya | 5710 | Grim | 5.30 | Grim | 5.30 |
12 | Gintama | Gintama | 5654 | Gintama | 8.97 | Gintama | 8.97 |
13 | Berserk | Berserk | 5485 | Berserk | 6.60 | Berserk | 6.60 |
14 | Dororo | Dororo | 5053 | Dororo | 8.23 | Dororo | 8.23 |
15 | Vinland Saga | Vinland Saga | 5043 | Vinland Saga | 8.78 | Vinland Saga | 8.78 |
16 | Overlord | Overlord | 4887 | Overlord | 8.05 | Overlord | 8.05 |
17 | Naruto | Naruto | 4778 | Naruto | 7.93 | Naruto | 7.93 |
18 | Death Parade | Death Parade | 4663 | Death Parade | 8.22 | Death Parade | 8.22 |
19 | Mob Psycho | Mob Psycho | 4621 | Mob Psycho 100 | 8.51 | Mob Psycho 100 | 8.51 |
anime = ["One Piece", "Violet Evergarden", "Made in Abyss", "Monster", "Clannad", "Death Note"]
for anime_name in anime:
for i in range(len(df_reddit)):
if df_reddit.loc[i, "anime_name"] == anime_name:
df_reddit.loc[i, "matched_title"] = anime_name
df_reddit.loc[i, "matched_score"] = df_reddit.loc[i, "matched_score_english"]
else:
pass
df_reddit = df_reddit[["anime_name", "japanese_title", "count", "matched_score"]].head(20)
df_reddit.to_csv("../../data/csv/top20_animes_with_scores.csv", index=False)