NATURAL LANGUAGE PROCESSING¶

Setup¶

In [2]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.4.0

# install spark-nlp
%pip install spark-nlp==5.1.3

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")
Collecting package metadata (current_repodata.json): done
Solving environment: done


==> WARNING: A newer version of conda exists. <==
  current version: 23.3.1
  latest version: 23.10.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.10.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2023.08.22 |       h06a4308_0         123 KB
    certifi-2023.11.17         |  py310h06a4308_0         158 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       341.3 MB

The following NEW packages will be INSTALLED:

  openjdk            pkgs/main/linux-64::openjdk-11.0.13-h87a67e3_0 

The following packages will be UPDATED:

  ca-certificates    conda-forge::ca-certificates-2023.7.2~ --> pkgs/main::ca-certificates-2023.08.22-h06a4308_0 
  certifi            conda-forge/noarch::certifi-2023.7.22~ --> pkgs/main/linux-64::certifi-2023.11.17-py310h06a4308_0 



Downloading and Extracting Packages
openjdk-11.0.13      | 341.0 MB  |                                       |   0% 
ca-certificates-2023 | 123 KB    |                                       |   0% 

certifi-2023.11.17   | 158 KB    |                                       |   0% 
ca-certificates-2023 | 123 KB    | ##################################### | 100% 

                                                                                
                                                                                

                                                                                
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.
Collecting pyspark==3.4.0
  Using cached pyspark-3.4.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark==3.4.0)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.0
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Collecting spark-nlp==5.1.3
  Obtaining dependency information for spark-nlp==5.1.3 from https://files.pythonhosted.org/packages/cd/7d/bc0eca4c9ec4c9c1d9b28c42c2f07942af70980a7d912d0aceebf8db32dd/spark_nlp-5.1.3-py2.py3-none-any.whl.metadata
  Using cached spark_nlp-5.1.3-py2.py3-none-any.whl.metadata (53 kB)
Using cached spark_nlp-5.1.3-py2.py3-none-any.whl (537 kB)
Installing collected packages: spark-nlp
Successfully installed spark-nlp-5.1.3
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
Out[2]:

MOVIES¶

In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.executor.memory", "12g")\
    .config("spark.executor.cores", "3")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
    .config(
            "fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.ContainerCredentialsProvider"
    )\
    .getOrCreate()

# spark = (
#     SparkSession.builder.appName("PySparkApp")
#     .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
#     .config(
#         "fs.s3a.aws.credentials.provider",
#         "com.amazonaws.auth.ContainerCredentialsProvider",
#     )
#     .getOrCreate()
# )

print(spark.version)

Import Libraries¶

In [3]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
In [4]:
print(f"Spark version: {spark.version}")
print(f"sparknlp version: {sparknlp.version()}")
Spark version: 3.4.0
sparknlp version: 5.1.3

Import Data¶

In [5]:
%%time
bucket = "project-group34"
session = sagemaker.Session()
output_prefix_data_comments = "project/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
print(f"reading comments from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
reading comments from s3a://project-group34/project/comments/yyyy=*
23/11/24 22:32:41 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                
CPU times: user 191 ms, sys: 16.4 ms, total: 207 ms
Wall time: 6.64 s
In [7]:
# comments = comments.cache()
In [6]:
comments.printSchema()
root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)

In [7]:
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality").show()
                                                                                
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|       subreddit|         author|                body| parent_id|     id|        created_utc|score|controversiality|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|    Animesuggest|        Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|
|    Animesuggest|       Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|
|    Animesuggest|      [deleted]|           [deleted]| t3_m3vnjl|gqscjse|2021-03-13 10:18:25|    1|               0|
|MovieSuggestions|      katnip_fl|       Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|
|    Animesuggest|        Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|
|    Animesuggest|    Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26|    3|               0|
|MovieSuggestions|   alienstabler|[Holes (2003)](ht...| t3_l20wrm|gqscozr|2021-03-13 10:21:04|    1|               0|
|    Animesuggest|       Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10|    1|               0|
|    Animesuggest|crash-scientist|I didn’t mean to ...|t1_gqs4syw|gqscp6i|2021-03-13 10:21:10|    2|               0|
|    Animesuggest|        dorting|    Watch Evangelion| t3_m3vnjl|gqscp9a|2021-03-13 10:21:13|    2|               0|
|MovieSuggestions| ThalesHedonist|V for Vendetta\n\...| t3_m3rw47|gqscpsz|2021-03-13 10:21:29|    3|               0|
|    Animesuggest|        Athenza|{Vampire Hunter D...| t3_m3wy06|gqscspa|2021-03-13 10:22:51|    2|               0|
|    Animesuggest|       Roboragi|**Vampire Hunter ...|t1_gqscspa|gqsct5l|2021-03-13 10:23:05|    2|               0|
|MovieSuggestions|   jaymewheeler|Synchronic was co...| t3_m3rw47|gqscu76|2021-03-13 10:23:37|    2|               0|
|    Animesuggest|        Arvidex|- {Wonder Egg Pei...| t3_m3vnjl|gqscuva|2021-03-13 10:23:57|    2|               0|
|    Animesuggest|       Roboragi|**Wonder Egg Prio...|t1_gqscuva|gqscvqx|2021-03-13 10:24:25|    1|               0|
|    Animesuggest|        mgd5800|{Nejimaki Seirei ...| t3_m43dco|gqscxbt|2021-03-13 10:25:12|    2|               0|
|MovieSuggestions|     SwissBliss|Climax!!!!! Sound...| t3_m3rw47|gqscxjk|2021-03-13 10:25:19|    2|               0|
|    Animesuggest|       Roboragi|**Nejimaki Seirei...|t1_gqscxbt|gqscy94|2021-03-13 10:25:40|    2|               0|
|    Animesuggest|     DyeDye1234|{is the order a r...| t3_m41ujb|gqsd2ku|2021-03-13 10:27:55|    1|               0|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
only showing top 20 rows

DATA PROCESSING¶

In [8]:
# Filter out rows where 'body' or 'author' is '[deleted]'
comments_filtered = comments.filter((comments.body != '[deleted]') & (comments.author != '[deleted]'))

# Show the filtered DataFrame
comments_filtered = comments_filtered.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality")
In [9]:
comments_filtered.show()
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|       subreddit|         author|                body| parent_id|     id|        created_utc|score|controversiality|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|    Animesuggest|        Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|
|    Animesuggest|       Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|
|MovieSuggestions|      katnip_fl|       Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|
|    Animesuggest|        Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|
|    Animesuggest|    Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26|    3|               0|
|MovieSuggestions|   alienstabler|[Holes (2003)](ht...| t3_l20wrm|gqscozr|2021-03-13 10:21:04|    1|               0|
|    Animesuggest|       Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10|    1|               0|
|    Animesuggest|crash-scientist|I didn’t mean to ...|t1_gqs4syw|gqscp6i|2021-03-13 10:21:10|    2|               0|
|    Animesuggest|        dorting|    Watch Evangelion| t3_m3vnjl|gqscp9a|2021-03-13 10:21:13|    2|               0|
|MovieSuggestions| ThalesHedonist|V for Vendetta\n\...| t3_m3rw47|gqscpsz|2021-03-13 10:21:29|    3|               0|
|    Animesuggest|        Athenza|{Vampire Hunter D...| t3_m3wy06|gqscspa|2021-03-13 10:22:51|    2|               0|
|    Animesuggest|       Roboragi|**Vampire Hunter ...|t1_gqscspa|gqsct5l|2021-03-13 10:23:05|    2|               0|
|MovieSuggestions|   jaymewheeler|Synchronic was co...| t3_m3rw47|gqscu76|2021-03-13 10:23:37|    2|               0|
|    Animesuggest|        Arvidex|- {Wonder Egg Pei...| t3_m3vnjl|gqscuva|2021-03-13 10:23:57|    2|               0|
|    Animesuggest|       Roboragi|**Wonder Egg Prio...|t1_gqscuva|gqscvqx|2021-03-13 10:24:25|    1|               0|
|    Animesuggest|        mgd5800|{Nejimaki Seirei ...| t3_m43dco|gqscxbt|2021-03-13 10:25:12|    2|               0|
|MovieSuggestions|     SwissBliss|Climax!!!!! Sound...| t3_m3rw47|gqscxjk|2021-03-13 10:25:19|    2|               0|
|    Animesuggest|       Roboragi|**Nejimaki Seirei...|t1_gqscxbt|gqscy94|2021-03-13 10:25:40|    2|               0|
|    Animesuggest|     DyeDye1234|{is the order a r...| t3_m41ujb|gqsd2ku|2021-03-13 10:27:55|    1|               0|
|    Animesuggest|       Roboragi|**Gochuumon wa Us...|t1_gqsd2ku|gqsd3ca|2021-03-13 10:28:18|    1|               0|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
only showing top 20 rows

In [10]:
comments_filtered = comments_filtered.where(col("subreddit").isin("MovieSuggestions"))
In [12]:
# # Define the pipeline stages
# document_assembler = DocumentAssembler() \
#     .setInputCol("body") \
#     .setOutputCol("document")

# sentence_detector = SentenceDetector() \
#     .setInputCols(["document"]) \
#     .setOutputCol("sentence")

# tokenizer = Tokenizer() \
#     .setInputCols(["sentence"]) \
#     .setOutputCol("token")

# # Use a pretrained embeddings model, for example, BERT
# embeddings = BertEmbeddings.pretrained("bert_base_cased", "en") \
#     .setInputCols(["sentence", "token"]) \
#     .setOutputCol("embeddings")

# ner_model = NerDLModel.pretrained("ner_dl_bert", "en") \
#     .setInputCols(["sentence", "token", "embeddings"]) \
#     .setOutputCol("ner")

# ner_converter = NerConverter() \
#     .setInputCols(["sentence", "token", "ner"]) \
#     .setOutputCol("ner_chunk")

# # Build the pipeline
# nlp_pipeline = Pipeline(stages=[
#     document_assembler,
#     sentence_detector,
#     tokenizer,
#     embeddings,
#     ner_model,
#     ner_converter
# ])

# # Apply the pipeline to your DataFrame
# model = nlp_pipeline.fit(comments_filtered_movies)
# result = model.transform(comments_filtered_movies)
bert_base_cased download started this may take some time.
Approximate size to download 384.9 MB
[OK!]
ner_dl_bert download started this may take some time.
Approximate size to download 15.4 MB
[OK!]

MOVIE SUGGESTIONS EXTRACTION USING NER¶

In [ ]:
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import SentenceDetector, Tokenizer, WordEmbeddingsModel, NerDLModel, NerConverter
from pyspark.ml import Pipeline

# Define the pipeline stages
document_assembler = DocumentAssembler() \
    .setInputCol("body") \
    .setOutputCol("document")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

# Use GloVe embeddings
embeddings = WordEmbeddingsModel.pretrained("glove_100d", "en") \
    .setInputCols(["sentence", "token"]) \
    .setOutputCol("embeddings")

# Use a lighter NER model
ner_model = NerDLModel.pretrained("ner_dl", "en") \
    .setInputCols(["sentence", "token", "embeddings"]) \
    .setOutputCol("ner")

ner_converter = NerConverter() \
    .setInputCols(["sentence", "token", "ner"]) \
    .setOutputCol("ner_chunk")

# Build the pipeline
nlp_pipeline = Pipeline(stages=[
    document_assembler,
    sentence_detector,
    tokenizer,
    embeddings,
    ner_model,
    ner_converter
])

# Apply the pipeline to your DataFrame
model = nlp_pipeline.fit(comments_filtered)
result = model.transform(comments_filtered)
In [13]:
result = result.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality", "ner_chunk")
In [14]:
result.show(5)
[Stage 7:>                                                          (0 + 1) / 1]
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+
|       subreddit|        author|                body|parent_id|     id|        created_utc|score|controversiality|           ner_chunk|
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+
|MovieSuggestions|     katnip_fl|       Jacobs Ladder|t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|[{chunk, 0, 12, J...|
|MovieSuggestions|  alienstabler|[Holes (2003)](ht...|t3_l20wrm|gqscozr|2021-03-13 10:21:04|    1|               0|                  []|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|t3_m3rw47|gqscpsz|2021-03-13 10:21:29|    3|               0|[{chunk, 6, 19, V...|
|MovieSuggestions|  jaymewheeler|Synchronic was co...|t3_m3rw47|gqscu76|2021-03-13 10:23:37|    2|               0|                  []|
|MovieSuggestions|    SwissBliss|Climax!!!!! Sound...|t3_m3rw47|gqscxjk|2021-03-13 10:25:19|    2|               0|[{chunk, 94, 99, ...|
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+
only showing top 5 rows

                                                                                
In [15]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as F

# Define a UDF to filter and extract movie names
def extract_movies(chunks):
    movie_names = [chunk.result for chunk in chunks if chunk.metadata['entity'] in ['PERSON', 'ORG']]
    return movie_names

extract_movie_names_udf = udf(extract_movies, ArrayType(StringType()))

# Apply the UDF to the DataFrame
movies_df = result.withColumn("movie_names", extract_movie_names_udf(F.col("ner_chunk")))

# # Display the results
# movies_df.select("body", "movie_names").show()
In [16]:
movies_df.show(5)
[Stage 8:>                                                          (0 + 1) / 1]
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+
|       subreddit|        author|                body|parent_id|     id|        created_utc|score|controversiality|           ner_chunk|       movie_names|
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+
|MovieSuggestions|     katnip_fl|       Jacobs Ladder|t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|[{chunk, 0, 12, J...|                []|
|MovieSuggestions|  alienstabler|[Holes (2003)](ht...|t3_l20wrm|gqscozr|2021-03-13 10:21:04|    1|               0|                  []|                []|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|t3_m3rw47|gqscpsz|2021-03-13 10:21:29|    3|               0|[{chunk, 6, 19, V...|[Vendetta\n\nPulp]|
|MovieSuggestions|  jaymewheeler|Synchronic was co...|t3_m3rw47|gqscu76|2021-03-13 10:23:37|    2|               0|                  []|                []|
|MovieSuggestions|    SwissBliss|Climax!!!!! Sound...|t3_m3rw47|gqscxjk|2021-03-13 10:25:19|    2|               0|[{chunk, 94, 99, ...|                []|
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+
only showing top 5 rows

                                                                                
In [ ]:
movies_df.write.parquet("s3a://project-group34/project/suggestions/all_movies/ner/", mode="overwrite")
                                                                                

MOVIE SUGGESTIONS EXTRACTION USING REGEX¶

In [7]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.executor.memory", "12g")\
    .config("spark.executor.cores", "3")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
    .config(
            "fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.ContainerCredentialsProvider"
    )\
    .getOrCreate()

print(spark.version)
3.4.0
In [5]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
In [6]:
movies_df = spark.read.parquet("s3a://project-group34/project/suggestions/all_movies/ner/")
23/11/25 19:35:08 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                
In [8]:
movies_df.show(5)
[Stage 1:>                                                          (0 + 1) / 1]
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+
|       subreddit|        author|                body|parent_id|     id|        created_utc|score|controversiality|           ner_chunk|       movie_names|
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+
|MovieSuggestions|     katnip_fl|       Jacobs Ladder|t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|[{chunk, 0, 12, J...|                []|
|MovieSuggestions|  alienstabler|[Holes (2003)](ht...|t3_l20wrm|gqscozr|2021-03-13 10:21:04|    1|               0|                  []|                []|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|t3_m3rw47|gqscpsz|2021-03-13 10:21:29|    3|               0|[{chunk, 6, 19, V...|[Vendetta\n\nPulp]|
|MovieSuggestions|  jaymewheeler|Synchronic was co...|t3_m3rw47|gqscu76|2021-03-13 10:23:37|    2|               0|                  []|                []|
|MovieSuggestions|    SwissBliss|Climax!!!!! Sound...|t3_m3rw47|gqscxjk|2021-03-13 10:25:19|    2|               0|[{chunk, 94, 99, ...|                []|
+----------------+--------------+--------------------+---------+-------+-------------------+-----+----------------+--------------------+------------------+
only showing top 5 rows

                                                                                
In [9]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

# import spacy
import re

# # Load spaCy model
# nlp = spacy.load("en_core_web_sm")

# # Define schema for the UDF output
# movie_schema = StructType([
#     StructField("movie_positions", ArrayType(ArrayType(StringType()))),
#     StructField("movie_names", ArrayType(StringType()))
# ])

# # UDF to extract movie names
# @udf(movie_schema)
# def extract_movie_names_udf(text):
#     doc = nlp(text)
#     movie_positions = []
#     movie_names = []

#     for ent in doc.ents:
#         if ent.label_ == "ORG" or ent.label_ == "PERSON":
#             movie_positions.append([ent.start_char, ent.end_char])
#             movie_names.append(ent.text)

#     return (movie_positions, movie_names)

# UDF to remove movie names
@udf(StringType())
def remove_movie_names_udf(text, movie_names):
    if movie_names:
        for name in movie_names:
            text = text.replace(name, ' ')
        return ' '.join(text.split())
    else:
        return text

# UDF to extract movie names using regex
@udf(ArrayType(StringType()))
def extract_movie_names_regex_udf(text, movie_names):
    movie_name_pattern = r'(?:\"([^\"]+)\"|([A-Z][a-z]*(?:\s+(?:[a-z]+\s+)*[A-Z][a-z]*)*)(?: \(\d{4}\))?)'

    movie_matches = re.findall(movie_name_pattern, text)
    movies = [match[0] or match[1] or match[2] for match in movie_matches]
    return movie_names + movies

# Remove movie names from the 'body' text
df_removed_movie_names = movies_df.withColumn("body_no_movies", remove_movie_names_udf(movies_df["body"], movies_df["movie_names"]))

# If you still want to use the regex method to supplement the NER extraction
df_final = df_removed_movie_names.withColumn("additional_movie_names", extract_movie_names_regex_udf(df_removed_movie_names["body_no_movies"], df_removed_movie_names["movie_names"]))
In [10]:
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "additional_movie_names")
In [11]:
df_final.show(5)
[Stage 2:>                                                          (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+----------------------+
|       subreddit|        author|                body|        created_utc|score|controversiality|additional_movie_names|
+----------------+--------------+--------------------+-------------------+-----+----------------+----------------------+
|MovieSuggestions|     katnip_fl|       Jacobs Ladder|2021-03-13 10:19:07|    2|               0|       [Jacobs Ladder]|
|MovieSuggestions|  alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04|    1|               0|               [Holes]|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|  [Vendetta\n\nPulp...|
|MovieSuggestions|  jaymewheeler|Synchronic was co...|2021-03-13 10:23:37|    2|               0|          [Synchronic]|
|MovieSuggestions|    SwissBliss|Climax!!!!! Sound...|2021-03-13 10:25:19|    2|               0|  [Climax, Sounds, ...|
+----------------+--------------+--------------------+-------------------+-----+----------------+----------------------+
only showing top 5 rows

                                                                                
In [12]:
import nltk
#nltk.download('stopwords')
eng_stopwords = nltk.corpus.stopwords.words('english')
In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def remove_stop_word_from_movie_names(movies):
    # Filter out single-word movie names that are in the stop words list
    return [movie for movie in movies if not (len(movie.split()) == 1 and movie.lower() in eng_stopwords)]

remove_stop_word_udf = udf(remove_stop_word_from_movie_names, ArrayType(StringType()))
In [14]:
df_final = df_final.withColumn("movie_names_final", remove_stop_word_udf(df_final["additional_movie_names"]))
In [15]:
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "movie_names_final")
In [16]:
# Define a UDF to filter out single-letter movie names
def remove_single_letter_movies(movie_names):
    return [name for name in movie_names if len(name.strip()) > 1]

remove_single_letter_movies_udf = udf(remove_single_letter_movies, ArrayType(StringType()))

# Apply the UDF to the DataFrame
df_final = df_final.withColumn("movie_names_final_cleaned", remove_single_letter_movies_udf("movie_names_final"))
In [17]:
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "movie_names_final_cleaned")
In [18]:
df_final.show()
+----------------+-------------------+--------------------+-------------------+-----+----------------+-------------------------+
|       subreddit|             author|                body|        created_utc|score|controversiality|movie_names_final_cleaned|
+----------------+-------------------+--------------------+-------------------+-----+----------------+-------------------------+
|MovieSuggestions|          katnip_fl|       Jacobs Ladder|2021-03-13 10:19:07|    2|               0|          [Jacobs Ladder]|
|MovieSuggestions|       alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04|    1|               0|                  [Holes]|
|MovieSuggestions|     ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|     [Vendetta\n\nPulp...|
|MovieSuggestions|       jaymewheeler|Synchronic was co...|2021-03-13 10:23:37|    2|               0|             [Synchronic]|
|MovieSuggestions|         SwissBliss|Climax!!!!! Sound...|2021-03-13 10:25:19|    2|               0|     [Climax, Sounds, ...|
|MovieSuggestions|        frankocozzo|• Sorry To Bother...|2021-03-13 10:29:35|    2|               0|     [Bacurau  \n• Coh...|
|MovieSuggestions|Organized-Konfusion|I would also sugg...|2021-03-13 10:31:39|    2|               0|     [I would also sug...|
|MovieSuggestions|      jonny_designs|Midsommar. Was no...|2021-03-13 10:37:18|    2|               0|     [Midsommar, Was n...|
|MovieSuggestions|     LaughingGor108|Reservoir Dogs\n\...|2021-03-13 10:39:56|    1|               0|     [Reservoir Dogs\n...|
|MovieSuggestions|           jonnyzat|             Hoodlum|2021-03-13 10:43:46|    1|               0|                [Hoodlum]|
|MovieSuggestions|     LaughingGor108|Crime/Thriller:\n...|2021-03-13 10:46:55|    1|               0|     [No Mercy, Asura,...|
|MovieSuggestions|     LaughingGor108|Don't Breathe\n\n...|2021-03-13 10:50:09|    1|               0|     [Don't Breathe, O...|
|MovieSuggestions|           ALT236-1|This is a new one...|2021-01-18 23:56:21|    1|               0|                  [Thank]|
|MovieSuggestions|   ChickenInAPotpie|Just realized the...|2021-01-18 23:58:12|    2|               0|                       []|
|MovieSuggestions|            maaximo|     Children of Men|2021-01-18 23:59:41|    5|               0|        [Children of Men]|
|MovieSuggestions|           ALT236-1|So many, thank yo...|2021-01-18 23:59:50|    1|               0|     [Grand Budapest H...|
|MovieSuggestions|         Tevesh_CKP|        Pacific Rim.|2021-01-19 00:01:32|    1|               0|            [Pacific Rim]|
|MovieSuggestions|         Snoo_93607|Be careful with i...|2021-01-19 00:03:10|    2|               0|                       []|
|MovieSuggestions|         Tevesh_CKP|Wolf Warrior inst...|2021-01-19 00:03:28|    1|               0|           [Wolf Warrior]|
|MovieSuggestions|           ALT236-1|The anime? Wow, h...|2021-01-19 00:04:15|    1|               0|             [Wow, Thank]|
+----------------+-------------------+--------------------+-------------------+-----+----------------+-------------------------+
only showing top 20 rows

                                                                                
In [19]:
df_final.write.parquet("s3a://project-group34/project/suggestions/all_movies/cleaned_with_regex/", mode="overwrite")
                                                                                

EXPLODING SUGGESTIONS¶

In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.executor.memory", "12g")\
    .config("spark.executor.cores", "3")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
    .config(
            "fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.ContainerCredentialsProvider"
    )\
    .getOrCreate()

print(spark.version)
In [3]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
In [4]:
df_final = spark.read.parquet("s3a://project-group34/project/suggestions/all_movies/cleaned_with_regex/")
23/11/25 20:48:39 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                
In [5]:
df_final.show(5)
[Stage 1:>                                                          (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+
|       subreddit|        author|                body|        created_utc|score|controversiality|movie_names_final_cleaned|
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+
|MovieSuggestions|     katnip_fl|       Jacobs Ladder|2021-03-13 10:19:07|    2|               0|          [Jacobs Ladder]|
|MovieSuggestions|  alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04|    1|               0|                  [Holes]|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|     [Vendetta\n\nPulp...|
|MovieSuggestions|  jaymewheeler|Synchronic was co...|2021-03-13 10:23:37|    2|               0|             [Synchronic]|
|MovieSuggestions|    SwissBliss|Climax!!!!! Sound...|2021-03-13 10:25:19|    2|               0|     [Climax, Sounds, ...|
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+
only showing top 5 rows

                                                                                
In [6]:
from pyspark.sql.functions import explode, col, count, split

# Flatten the movie_names column
df_flattened = df_final.withColumn("movie_name", explode(col("movie_names_final_cleaned")))
In [7]:
# List of non-movie names to be removed
non_movie_terms = [
    "Movie", "Suggestions", "Please", "Directv", "Thanks", "Editing", "Thank",
    "Netflix", "Amazon Prime Video", "Google Play Movies", "Amazon Video", "Vudu",
    "Tubi Tv", "Hoopla", "Apple Itunes", "Fandangonow", "Hulu", "Rotten Tomatoes",
    "Dailymotion", "Fubotv", "Dplay", "Showtime", "Hbo Now", "Amc On Demand",
    "Notes", "Great", "Also", "Note", "Tube", "Zz", "Db", "One", "Make", "Yes",
    "Good", "Oh", "Yeah", "Love", "P.S", "Man", "Replying", "In the comments on this post I",
    "Piracy", "Title", "Characters Minimum", "Maybe", "Fv", "OP", "Definitely",
    "Edit", "Came", "Mr", "American", "Really", "Well", "Life", "Sorry", "French",
    "But I", "Japanese", "Never", "Lol", "Watch", "House", "Link", "Like", "Dead",
    "!&lt", "Absolutely", "Mother", "Girl", "Nobody", 
    "Redbox", "M the Creator", "Metacritic", "Korean", "Dprogram",
    "Microsoft Store", "Showtime Amazon Channel", "Watched", "Hbo Now Amazon Channel",
    "IMO", "Matched", "Loved", "Still", "Seen", "Go", "List", "Let", "Best", "Day",
    "Men", "Time", "World", "Even", "Identity", "Manchester", "And I", "Devil", "Super",
    "Cbs", "Disney", "Amazing", "English", "Baby", "Blue", "Check", "Dr", "Nice",
    "Character Minimum", "Excellent", "Night", "True", "I'm", "Pretty", "Hope", "Evil",
    "Two", "Spider", "Hollywood", "Murder", "Probably", "Sling Tv", "Reddit", 
]
In [8]:
# Filter out the non-movie names and streaming services
df_flattened = df_flattened.filter(~col("movie_name").isin(non_movie_terms))
In [9]:
df_flattened.show(5)
[Stage 2:>                                                          (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|       subreddit|        author|                body|        created_utc|score|controversiality|movie_names_final_cleaned|          movie_name|
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|MovieSuggestions|     katnip_fl|       Jacobs Ladder|2021-03-13 10:19:07|    2|               0|          [Jacobs Ladder]|       Jacobs Ladder|
|MovieSuggestions|  alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04|    1|               0|                  [Holes]|               Holes|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|     [Vendetta\n\nPulp...|    Vendetta\n\nPulp|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|     [Vendetta\n\nPulp...|V for fiction Dan...|
|MovieSuggestions|  jaymewheeler|Synchronic was co...|2021-03-13 10:23:37|    2|               0|             [Synchronic]|          Synchronic|
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
only showing top 5 rows

                                                                                
In [10]:
df_flattened.write.parquet("s3a://project-group34/project/suggestions/all_movies/flattened/", mode="overwrite")
                                                                                

GROUPING SUGGESTIONS¶

In [27]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.executor.memory", "12g")\
    .config("spark.executor.cores", "3")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
    .config(
            "fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.ContainerCredentialsProvider"
    )\
    .getOrCreate()

print(spark.version)
3.4.0
In [28]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
In [29]:
df_flattened = spark.read.parquet("s3a://project-group34/project/suggestions/all_movies/flattened/")
                                                                                
In [30]:
df_flattened.show(5)
[Stage 15:>                                                         (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|       subreddit|        author|                body|        created_utc|score|controversiality|movie_names_final_cleaned|          movie_name|
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|MovieSuggestions|     katnip_fl|       Jacobs Ladder|2021-03-13 10:19:07|    2|               0|          [Jacobs Ladder]|       Jacobs Ladder|
|MovieSuggestions|  alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04|    1|               0|                  [Holes]|               Holes|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|     [Vendetta\n\nPulp...|    Vendetta\n\nPulp|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|     [Vendetta\n\nPulp...|V for fiction Dan...|
|MovieSuggestions|  jaymewheeler|Synchronic was co...|2021-03-13 10:23:37|    2|               0|             [Synchronic]|          Synchronic|
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
only showing top 5 rows

                                                                                
In [31]:
df_movie_frequency = df_flattened.groupBy("movie_name").count()
In [32]:
df_movie_frequency.show(5)
[Stage 16:==============================================>           (4 + 1) / 5]
+----------+-----+
|movie_name|count|
+----------+-----+
|       Uoc|   20|
|     Sword|   44|
|        Gb|   19|
|       Kin|   43|
|    Heaven|  425|
+----------+-----+
only showing top 5 rows

                                                                                
In [34]:
from pyspark.sql.functions import desc

# Sort by count in descending order
df_top_20_movies = df_movie_frequency.sort(desc('count')).limit(20)
In [36]:
df_top_20_movies.show()
[Stage 22:==============================================>           (4 + 1) / 5]
+--------------+-----+
|    movie_name|count|
+--------------+-----+
|    Hereditary| 2733|
|      Parasite| 2578|
|  Interstellar| 2557|
|         Alien| 2489|
|     Prisoners| 2478|
|     The Thing| 2468|
|     Midsommar| 2446|
|       Arrival| 2375|
|    Fight Club| 2338|
|        Oldboy| 2337|
|  Blade Runner| 2317|
|  Annihilation| 2171|
|       Memento| 2159|
|     Coherence| 2011|
|     Inception| 1954|
|Shutter Island| 1928|
|           Saw| 1899|
|  Nightcrawler| 1890|
|The Lighthouse| 1829|
|           Big| 1753|
+--------------+-----+

                                                                                
In [37]:
df_top_20_movies_pd = df_top_20_movies.toPandas()
                                                                                
In [39]:
df_top_20_movies_pd.to_csv("../../data/csv/top20_movies_suggested.csv", index=False)

MERGE WITH EXTERNAL¶

In [7]:
df_sentiment = spark.read.parquet("s3a://project-group34/project/sentiment_analysis/average_scores_per_movie/")
In [8]:
df_sentiment.show(5)
+--------------------+----------------------+----------------------+-----------+
|               title|average_positive_score|average_negative_score|num_reviews|
+--------------------+----------------------+----------------------+-----------+
|            The Menu|    0.8769120351379525|   0.12308796544559403|        315|
|Jim Gaffigan: Obs...|                   1.0|          7.713538E-10|          5|
|   Winter in Wartime|    0.7367684066794117|    0.2632315926923039|         67|
|                7500|    0.7694758345625764|   0.23052416603253476|        157|
|10 Things I Hate ...|     0.678140454627421|    0.3218595443279501|         77|
+--------------------+----------------------+----------------------+-----------+
only showing top 5 rows

In [9]:
from pyspark.sql.functions import col

# Constants from your dataset
C_positive = 0.7525733910082344  # Mean score of positive scores
C_negative = 0.24742646776123378 # Mean score of negative scores
m = 4  # Median number of reviews

# Adding a new column for weighted rating
positive_weighted_rating_df = df_sentiment.withColumn("weighted_rating", 
                                   (col("num_reviews") / (col("num_reviews") + m) * col("average_positive_score")) + 
                                   (m / (col("num_reviews") + m) * C_positive))

# Adding a new column for weighted rating
negative_weighted_rating_df = df_sentiment.withColumn("weighted_rating", 
                                   (col("num_reviews") / (col("num_reviews") + m) * col("average_negative_score")) + 
                                   (m / (col("num_reviews") + m) * C_negative))
In [41]:
# Get top 20 movies with highest average positive scores
top_20_positive = positive_weighted_rating_df.orderBy(desc("weighted_rating")).limit(20)
In [42]:
top_20_positive.show()
+--------------------+----------------------+----------------------+-----------+------------------+
|               title|average_positive_score|average_negative_score|num_reviews|   weighted_rating|
+--------------------+----------------------+----------------------+-----------+------------------+
|Jim Allison: Brea...|                   1.0|  8.53145166666667E-22|         48|0.9809671839237104|
|Hava Nagila (The ...|    0.9867567089655173|  0.013243287766424139|        116|0.9789505983669412|
|      Trifling Women|    0.9992484054545456|  7.515981201820878E-4|         44|0.9786921542506863|
|Blinky Bill the M...|    0.9980725209090909|  0.001927482015539...|         44|0.9776142600840194|
|Molly's Theory of...|           0.999998057|  1.942951721449868E-6|         40|0.9775049055462031|
|Blind Willow, Sle...|                   1.0|  8.522315000009178...|         32|0.9725081545564704|
|California Typewr...|          0.9936297735|  0.006370226588242396|         40|0.9717155569098395|
|          Half Magic|    0.9950310882352942|  0.004968912791776545|         34| 0.969509225369288|
|   Things Never Said|    0.9922669266666667|  0.007733078447161946|         36|0.9682975731008235|
|     I'm Leaving Now|    0.9990313485714285|  9.686501978153861E-4|         28|0.9682241038760293|
|        The Farthest|    0.9999902925925925|   9.70705925925926E-6|         27|0.9680655310978367|
|Bill Cunningham N...|    0.9795781553424656|  0.020421847234691988|         73|0.9677857000523757|
|Richard Linklater...|    0.9953919132258064|  0.004608088904645...|         31|0.9676412249723696|
|       Breaking Fast|    0.9993891259259259|  6.108749145002055E-4|         27|0.9675419343236431|
|    Moonage Daydream|    0.9717051444774194|  0.028294855095545027|        155|0.9661923959624714|
|              Suzi Q|    0.9946260866666667|  0.005373910904266718|         30|0.9661492989421452|
|Salvatore: Shoema...|        0.992385378125|  0.007614620893583769|         32|0.9657396017786927|
|Is the Man Who Is...|    0.9876368760526316|  0.012363124857894738|         38|0.9652498774769747|
|The Eyes of Orson...|    0.9827869632608698|  0.017213035551343958|         46| 0.964369877480659|
|       Gentleman Jim|    0.9999992286956522|  7.678868595652176E-7|         23| 0.963343549038257|
+--------------------+----------------------+----------------------+-----------+------------------+

In [44]:
df_flattened.show(5)
[Stage 30:>                                                         (0 + 1) / 1]
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|       subreddit|        author|                body|        created_utc|score|controversiality|movie_names_final_cleaned|          movie_name|
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|MovieSuggestions|     katnip_fl|       Jacobs Ladder|2021-03-13 10:19:07|    2|               0|          [Jacobs Ladder]|       Jacobs Ladder|
|MovieSuggestions|  alienstabler|[Holes (2003)](ht...|2021-03-13 10:21:04|    1|               0|                  [Holes]|               Holes|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|     [Vendetta\n\nPulp...|    Vendetta\n\nPulp|
|MovieSuggestions|ThalesHedonist|V for Vendetta\n\...|2021-03-13 10:21:29|    3|               0|     [Vendetta\n\nPulp...|V for fiction Dan...|
|MovieSuggestions|  jaymewheeler|Synchronic was co...|2021-03-13 10:23:37|    2|               0|             [Synchronic]|          Synchronic|
+----------------+--------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
only showing top 5 rows

                                                                                
In [22]:
from pyspark.sql.functions import udf, col, levenshtein
from pyspark.sql.types import FloatType
In [45]:
movie_titles = [row['movie_name'] for row in df_top_20_movies.collect()]
In [46]:
movie_titles
Out[46]:
['Hereditary',
 'Parasite',
 'Interstellar',
 'Alien',
 'Prisoners',
 'The Thing',
 'Midsommar',
 'Arrival',
 'Fight Club',
 'Oldboy',
 'Blade Runner',
 'Annihilation',
 'Memento',
 'Coherence',
 'Inception',
 'Shutter Island',
 'Saw',
 'Nightcrawler',
 'The Lighthouse',
 'Big']
In [47]:
# Filter the sentiment data DataFrame using the list
filtered_sentiment_df = positive_weighted_rating_df.filter(positive_weighted_rating_df['title'].isin(movie_titles))
In [49]:
filtered_sentiment_df.show()
+--------------+----------------------+----------------------+-----------+------------------+
|         title|average_positive_score|average_negative_score|num_reviews|   weighted_rating|
+--------------+----------------------+----------------------+-----------+------------------+
|           Big|    0.8549865664698026|   0.14501343196273178|         57| 0.848270948406749|
|Shutter Island|    0.7935697030268212|   0.20643029873821836|        259|0.7929461849733066|
|The Lighthouse|    0.8567953555230218|    0.1432046450670607|        403| 0.855771061031476|
|    Hereditary|    0.6772520104153956|    0.3227479907131384|        383|0.6780305259770788|
|       Arrival|    0.9132867864014926|   0.08671321548572934|        440|0.9118389179745263|
|         Alien|    0.6957981419639608|   0.30420185732468513|        124|0.6975723684965943|
|      Parasite|     0.825085770061366|    0.1749142296239439|       1921|0.8249350949880088|
|     Inception|    0.8771357714563982|   0.12286422944972214|        365|0.8757855017496431|
|     The Thing|    0.7633445176656004|   0.23665548025341143|        128|0.7630181198881044|
|     Prisoners|    0.7677609682564671|    0.2322390359840972|        254| 0.767525501942541|
|    Fight Club|     0.817899975018882|    0.1821000252636096|        174|0.8164319618950472|
|  Nightcrawler|    0.8417514086150262|    0.1582485925737861|        280|0.8404953801980292|
|  Interstellar|      0.89228317674853|   0.10771682494792269|        375|0.8908086671364952|
|     Midsommar|    0.7541998671040593|    0.2458001330190753|        401|0.7541838031426191|
|  Blade Runner|    0.8798318843731172|   0.12016811486495839|        112|0.8754436604639834|
|  Annihilation|     0.867995822337821|    0.1320041761747021|        329|0.8666093667062345|
|        Oldboy|    0.7110966693962014|   0.28890333126306006|        300|0.7116424157332019|
|           Saw|    0.6824277930089815|    0.3175722098283076|        188|0.6838891596339659|
|     Coherence|    0.8787566895442769|   0.12124330903455703|         94|  0.87360635082852|
|       Memento|    0.9059979402771241|   0.09400205504105454|        176|0.9025885058489265|
+--------------+----------------------+----------------------+-----------+------------------+

In [50]:
filtered_sentiment_df_pd = filtered_sentiment_df.toPandas()
In [54]:
df_top_20_movies_pd.columns = ["title", "count"]
In [55]:
df_top_20_movies_pd
Out[55]:
title count
0 Hereditary 2733
1 Parasite 2578
2 Interstellar 2557
3 Alien 2489
4 Prisoners 2478
5 The Thing 2468
6 Midsommar 2446
7 Arrival 2375
8 Fight Club 2338
9 Oldboy 2337
10 Blade Runner 2317
11 Annihilation 2171
12 Memento 2159
13 Coherence 2011
14 Inception 1954
15 Shutter Island 1928
16 Saw 1899
17 Nightcrawler 1890
18 The Lighthouse 1829
19 Big 1753
In [51]:
filtered_sentiment_df_pd
Out[51]:
title average_positive_score average_negative_score num_reviews weighted_rating
0 Big 0.854987 0.145013 57 0.848271
1 Shutter Island 0.793570 0.206430 259 0.792946
2 The Lighthouse 0.856795 0.143205 403 0.855771
3 Hereditary 0.677252 0.322748 383 0.678031
4 Arrival 0.913287 0.086713 440 0.911839
5 Alien 0.695798 0.304202 124 0.697572
6 Parasite 0.825086 0.174914 1921 0.824935
7 Inception 0.877136 0.122864 365 0.875786
8 The Thing 0.763345 0.236655 128 0.763018
9 Prisoners 0.767761 0.232239 254 0.767526
10 Fight Club 0.817900 0.182100 174 0.816432
11 Nightcrawler 0.841751 0.158249 280 0.840495
12 Interstellar 0.892283 0.107717 375 0.890809
13 Midsommar 0.754200 0.245800 401 0.754184
14 Blade Runner 0.879832 0.120168 112 0.875444
15 Annihilation 0.867996 0.132004 329 0.866609
16 Oldboy 0.711097 0.288903 300 0.711642
17 Saw 0.682428 0.317572 188 0.683889
18 Coherence 0.878757 0.121243 94 0.873606
19 Memento 0.905998 0.094002 176 0.902589
In [56]:
import pandas as pd
In [61]:
df_sentiment_of_suggestions = pd.merge(filtered_sentiment_df_pd, df_top_20_movies_pd, on="title")[["title", "weighted_rating", "count"]].sort_values(by="count", ascending=False)
In [62]:
df_sentiment_of_suggestions
Out[62]:
title weighted_rating count
3 Hereditary 0.678031 2733
6 Parasite 0.824935 2578
12 Interstellar 0.890809 2557
5 Alien 0.697572 2489
9 Prisoners 0.767526 2478
8 The Thing 0.763018 2468
13 Midsommar 0.754184 2446
4 Arrival 0.911839 2375
10 Fight Club 0.816432 2338
16 Oldboy 0.711642 2337
14 Blade Runner 0.875444 2317
15 Annihilation 0.866609 2171
19 Memento 0.902589 2159
18 Coherence 0.873606 2011
7 Inception 0.875786 1954
1 Shutter Island 0.792946 1928
17 Saw 0.683889 1899
11 Nightcrawler 0.840495 1890
2 The Lighthouse 0.855771 1829
0 Big 0.848271 1753
In [63]:
df_sentiment_of_suggestions.to_csv("../../data/csv/sentiment_of_suggestions.csv", index = False)
In [23]:
# Define a UDF to calculate normalized Levenshtein similarity score
def similarity_score(str1, str2):
    len1 = len(str1)
    len2 = len(str2)
    max_len = max(len1, len2)
    if max_len == 0:
        return 1.0
    return (max_len - levenshtein(str1, str2)) / max_len

similarity_udf = udf(similarity_score, FloatType())
In [3]:
df_top_500_movies.write.parquet(f"s3a://{bucket}/projects/comments/extracted_movies")

ANIME¶

SPARK JOB FOR NER¶

In [ ]:
# import sagemaker
# session = sagemaker.Session()
# bucket = "project-group34"
# !wget -qO- https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.1.3.jar | aws s3 cp - s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar
# !aws s3 ls s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar
In [2]:
!mkdir -p ./code
In [8]:
%%writefile ./code/suggestion_extract_process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType
import re
from pyspark.sql.functions import explode, count
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import SentenceDetector, Tokenizer, WordEmbeddingsModel, NerDLModel, NerConverter
from pyspark.ml import Pipeline

from pyspark.sql.functions import desc

import nltk
nltk.download('stopwords')
eng_stopwords = nltk.corpus.stopwords.words('english')

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")    
    parser.add_argument("--col_name_for_filtering", type=str, help="Name of the column to filter")
    args = parser.parse_args()

    spark = SparkSession.builder \
    .appName("Spark NLP")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
    .getOrCreate()
    
    logger.info(f"Spark version: {spark.version}")
    logger.info(f"sparknlp version: {sparknlp.version()}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path} for r/{args.col_name_for_filtering}")
    df = spark.read.parquet(args.s3_dataset_path, header=True)
    vals = [args.col_name_for_filtering]
    df_filtered = df.where(col("subreddit").isin(vals))
    logger.info(f"finished reading files...")
    logger.info(f"Number of rows in data: {df_filtered.count()}")
    
    # DATA CLEANING
    df_filtered = df_filtered.filter((df.body != '[deleted]') & (df.author != '[deleted]'))

    # Define the pipeline stages
    document_assembler = DocumentAssembler() \
        .setInputCol("body") \
        .setOutputCol("document")

    sentence_detector = SentenceDetector() \
        .setInputCols(["document"]) \
        .setOutputCol("sentence")

    tokenizer = Tokenizer() \
        .setInputCols(["sentence"]) \
        .setOutputCol("token")

    # Use GloVe embeddings
    embeddings = WordEmbeddingsModel.pretrained("glove_100d", "en") \
        .setInputCols(["sentence", "token"]) \
        .setOutputCol("embeddings")

    # Use a lighter NER model
    ner_model = NerDLModel.pretrained("ner_dl", "en") \
        .setInputCols(["sentence", "token", "embeddings"]) \
        .setOutputCol("ner")

    ner_converter = NerConverter() \
        .setInputCols(["sentence", "token", "ner"]) \
        .setOutputCol("ner_chunk")

    # Build the pipeline
    nlp_pipeline = Pipeline(stages=[
        document_assembler,
        sentence_detector,
        tokenizer,
        embeddings,
        ner_model,
        ner_converter
    ])

    # Apply the pipeline to your DataFrame
    model = nlp_pipeline.fit(df_filtered)
    result = model.transform(df_filtered)

    print("NLP Pipeline Ran Succesfully!")
                
    result = result.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality", "ner_chunk")

    # Define a UDF to filter and extract anime names
    def extract_anime(chunks):
        anime_names = [chunk.result for chunk in chunks if chunk.metadata['entity'] in ['PERSON', 'ORG']]
        return anime_names

    extract_anime_names_udf = udf(extract_anime, ArrayType(StringType()))

    # Apply the UDF to the DataFrame
    anime_df = result.withColumn("movie_names", extract_anime_names_udf(F.col("ner_chunk")))
    
    anime_df.write.parquet("s3a://project-group34/project/suggestions/all_anime/ner/", mode="overwrite")
    
    logger.info(f"all done...")
    
if __name__ == "__main__":
    main()
Overwriting ./code/suggestion_extract_process.py
In [9]:
%%time
import boto3
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

account_id = boto3.client('sts').get_caller_identity()['Account']
CPU times: user 14.8 ms, sys: 61 µs, total: 14.9 ms
Wall time: 34.9 ms
In [10]:
account_id
Out[10]:
'655678691473'
In [11]:
%%time
import time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    image_uri=f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark:latest",
    framework_version="3.3",
    role=role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=7200,
)

# # S3 URI of the initialization script
# s3_uri_init_script = f's3://{bucket}/{script_key}'

# s3 paths
session = sagemaker.Session()
output_prefix_logs = f"spark_logs"

configuration = [
    {
        "Classification": "spark-defaults",
        "Properties": {"spark.executor.memory": "12g", "spark.executor.cores": "4"},
    }
]
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
CPU times: user 76.8 ms, sys: 0 ns, total: 76.8 ms
Wall time: 131 ms
In [12]:
%%time
print(f"going to extract suggestions data for subreddit=Animesuggest")
bucket = "project-group34"
output_prefix_data_comments = "project/comments/yyyy=*"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
col_name_for_filtering = "Animesuggest"

# run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
spark_processor.run(
    submit_app="./code/suggestion_extract_process.py",
    submit_jars=[f"s3://{bucket}/spark-nlp-assembly-5.1.3.jar"],
    arguments=[
        "--s3_dataset_path",
        s3_path,
        "--col_name_for_filtering",
        col_name_for_filtering,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
    configuration=configuration
)
# give some time for resources from this iterations to get cleaned up
# if we start the job immediately we could get insufficient resources error
time.sleep(60)
going to extract suggestions data for subreddit=Animesuggest
INFO:sagemaker:Creating processing-job with name sm-spark-project-2023-11-26-18-21-00-232
.......................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................!CPU times: user 5.03 s, sys: 438 ms, total: 5.47 s
Wall time: 1h 58min 36s

Import Libraries¶

In [3]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
In [4]:
print(f"Spark version: {spark.version}")
print(f"sparknlp version: {sparknlp.version()}")
Spark version: 3.4.0
sparknlp version: 5.1.3

Import Data¶

In [5]:
%%time
bucket = "project-group34"
session = sagemaker.Session()
output_prefix_data_comments = "project/comments/yyyy=2021"
s3_path = f"s3a://{bucket}/{output_prefix_data_comments}"
print(f"reading comments from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
reading comments from s3a://project-group34/project/comments/yyyy=2021
23/11/26 03:18:48 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                
CPU times: user 205 ms, sys: 8.31 ms, total: 214 ms
Wall time: 6.22 s
In [7]:
# comments = comments.cache()
In [6]:
comments.printSchema()
root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)

In [7]:
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality").show()
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|       subreddit|         author|                body| parent_id|     id|        created_utc|score|controversiality|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|    Animesuggest|        Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|
|    Animesuggest|       Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|
|    Animesuggest|      [deleted]|           [deleted]| t3_m3vnjl|gqscjse|2021-03-13 10:18:25|    1|               0|
|MovieSuggestions|      katnip_fl|       Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|
|    Animesuggest|        Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|
|    Animesuggest|    Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26|    3|               0|
|MovieSuggestions|   alienstabler|[Holes (2003)](ht...| t3_l20wrm|gqscozr|2021-03-13 10:21:04|    1|               0|
|    Animesuggest|       Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10|    1|               0|
|    Animesuggest|crash-scientist|I didn’t mean to ...|t1_gqs4syw|gqscp6i|2021-03-13 10:21:10|    2|               0|
|    Animesuggest|        dorting|    Watch Evangelion| t3_m3vnjl|gqscp9a|2021-03-13 10:21:13|    2|               0|
|MovieSuggestions| ThalesHedonist|V for Vendetta\n\...| t3_m3rw47|gqscpsz|2021-03-13 10:21:29|    3|               0|
|    Animesuggest|        Athenza|{Vampire Hunter D...| t3_m3wy06|gqscspa|2021-03-13 10:22:51|    2|               0|
|    Animesuggest|       Roboragi|**Vampire Hunter ...|t1_gqscspa|gqsct5l|2021-03-13 10:23:05|    2|               0|
|MovieSuggestions|   jaymewheeler|Synchronic was co...| t3_m3rw47|gqscu76|2021-03-13 10:23:37|    2|               0|
|    Animesuggest|        Arvidex|- {Wonder Egg Pei...| t3_m3vnjl|gqscuva|2021-03-13 10:23:57|    2|               0|
|    Animesuggest|       Roboragi|**Wonder Egg Prio...|t1_gqscuva|gqscvqx|2021-03-13 10:24:25|    1|               0|
|    Animesuggest|        mgd5800|{Nejimaki Seirei ...| t3_m43dco|gqscxbt|2021-03-13 10:25:12|    2|               0|
|MovieSuggestions|     SwissBliss|Climax!!!!! Sound...| t3_m3rw47|gqscxjk|2021-03-13 10:25:19|    2|               0|
|    Animesuggest|       Roboragi|**Nejimaki Seirei...|t1_gqscxbt|gqscy94|2021-03-13 10:25:40|    2|               0|
|    Animesuggest|     DyeDye1234|{is the order a r...| t3_m41ujb|gqsd2ku|2021-03-13 10:27:55|    1|               0|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
only showing top 20 rows

                                                                                

DATA PROCESSING¶

In [8]:
# Filter out rows where 'body' or 'author' is '[deleted]'
comments_filtered = comments.filter((comments.body != '[deleted]') & (comments.author != '[deleted]'))

# Show the filtered DataFrame
comments_filtered = comments_filtered.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality")
In [9]:
comments_filtered.show()
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|       subreddit|         author|                body| parent_id|     id|        created_utc|score|controversiality|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
|    Animesuggest|        Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|
|    Animesuggest|       Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|
|MovieSuggestions|      katnip_fl|       Jacobs Ladder| t3_m3rw47|gqscl5i|2021-03-13 10:19:07|    2|               0|
|    Animesuggest|        Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|
|    Animesuggest|    Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26|    3|               0|
|MovieSuggestions|   alienstabler|[Holes (2003)](ht...| t3_l20wrm|gqscozr|2021-03-13 10:21:04|    1|               0|
|    Animesuggest|       Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10|    1|               0|
|    Animesuggest|crash-scientist|I didn’t mean to ...|t1_gqs4syw|gqscp6i|2021-03-13 10:21:10|    2|               0|
|    Animesuggest|        dorting|    Watch Evangelion| t3_m3vnjl|gqscp9a|2021-03-13 10:21:13|    2|               0|
|MovieSuggestions| ThalesHedonist|V for Vendetta\n\...| t3_m3rw47|gqscpsz|2021-03-13 10:21:29|    3|               0|
|    Animesuggest|        Athenza|{Vampire Hunter D...| t3_m3wy06|gqscspa|2021-03-13 10:22:51|    2|               0|
|    Animesuggest|       Roboragi|**Vampire Hunter ...|t1_gqscspa|gqsct5l|2021-03-13 10:23:05|    2|               0|
|MovieSuggestions|   jaymewheeler|Synchronic was co...| t3_m3rw47|gqscu76|2021-03-13 10:23:37|    2|               0|
|    Animesuggest|        Arvidex|- {Wonder Egg Pei...| t3_m3vnjl|gqscuva|2021-03-13 10:23:57|    2|               0|
|    Animesuggest|       Roboragi|**Wonder Egg Prio...|t1_gqscuva|gqscvqx|2021-03-13 10:24:25|    1|               0|
|    Animesuggest|        mgd5800|{Nejimaki Seirei ...| t3_m43dco|gqscxbt|2021-03-13 10:25:12|    2|               0|
|MovieSuggestions|     SwissBliss|Climax!!!!! Sound...| t3_m3rw47|gqscxjk|2021-03-13 10:25:19|    2|               0|
|    Animesuggest|       Roboragi|**Nejimaki Seirei...|t1_gqscxbt|gqscy94|2021-03-13 10:25:40|    2|               0|
|    Animesuggest|     DyeDye1234|{is the order a r...| t3_m41ujb|gqsd2ku|2021-03-13 10:27:55|    1|               0|
|    Animesuggest|       Roboragi|**Gochuumon wa Us...|t1_gqsd2ku|gqsd3ca|2021-03-13 10:28:18|    1|               0|
+----------------+---------------+--------------------+----------+-------+-------------------+-----+----------------+
only showing top 20 rows

In [10]:
comments_filtered = comments_filtered.where(col("subreddit").isin("Animesuggest"))
In [12]:
# # Define the pipeline stages
# document_assembler = DocumentAssembler() \
#     .setInputCol("body") \
#     .setOutputCol("document")

# sentence_detector = SentenceDetector() \
#     .setInputCols(["document"]) \
#     .setOutputCol("sentence")

# tokenizer = Tokenizer() \
#     .setInputCols(["sentence"]) \
#     .setOutputCol("token")

# # Use a pretrained embeddings model, for example, BERT
# embeddings = BertEmbeddings.pretrained("bert_base_cased", "en") \
#     .setInputCols(["sentence", "token"]) \
#     .setOutputCol("embeddings")

# ner_model = NerDLModel.pretrained("ner_dl_bert", "en") \
#     .setInputCols(["sentence", "token", "embeddings"]) \
#     .setOutputCol("ner")

# ner_converter = NerConverter() \
#     .setInputCols(["sentence", "token", "ner"]) \
#     .setOutputCol("ner_chunk")

# # Build the pipeline
# nlp_pipeline = Pipeline(stages=[
#     document_assembler,
#     sentence_detector,
#     tokenizer,
#     embeddings,
#     ner_model,
#     ner_converter
# ])

# # Apply the pipeline to your DataFrame
# model = nlp_pipeline.fit(comments_filtered_movies)
# result = model.transform(comments_filtered_movies)
bert_base_cased download started this may take some time.
Approximate size to download 384.9 MB
[OK!]
ner_dl_bert download started this may take some time.
Approximate size to download 15.4 MB
[OK!]

ANIME SUGGESTIONS EXTRACTION USING NER¶

In [ ]:
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import SentenceDetector, Tokenizer, WordEmbeddingsModel, NerDLModel, NerConverter
from pyspark.ml import Pipeline

# Define the pipeline stages
document_assembler = DocumentAssembler() \
    .setInputCol("body") \
    .setOutputCol("document")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

# Use GloVe embeddings
embeddings = WordEmbeddingsModel.pretrained("glove_100d", "en") \
    .setInputCols(["sentence", "token"]) \
    .setOutputCol("embeddings")

# Use a lighter NER model
ner_model = NerDLModel.pretrained("ner_dl", "en") \
    .setInputCols(["sentence", "token", "embeddings"]) \
    .setOutputCol("ner")

ner_converter = NerConverter() \
    .setInputCols(["sentence", "token", "ner"]) \
    .setOutputCol("ner_chunk")

# Build the pipeline
nlp_pipeline = Pipeline(stages=[
    document_assembler,
    sentence_detector,
    tokenizer,
    embeddings,
    ner_model,
    ner_converter
])

# Apply the pipeline to your DataFrame
model = nlp_pipeline.fit(comments_filtered)
result = model.transform(comments_filtered)
In [12]:
result = result.select("subreddit", "author", "body", "parent_id", "id", "created_utc", "score", "controversiality", "ner_chunk")
In [13]:
result.show(5)
[Stage 6:>                                                          (0 + 1) / 1]
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+
|   subreddit|     author|                body| parent_id|     id|        created_utc|score|controversiality|           ner_chunk|
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+
|Animesuggest|    Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|[{chunk, 24, 29, ...|
|Animesuggest|   Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|[{chunk, 2, 4, Im...|
|Animesuggest|    Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|[{chunk, 9, 12, T...|
|Animesuggest|Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26|    3|               0|[{chunk, 4, 10, M...|
|Animesuggest|   Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10|    1|               0|[{chunk, 2, 5, Ki...|
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+
only showing top 5 rows

                                                                                
In [14]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, ArrayType
import pyspark.sql.functions as F

# Define a UDF to filter and extract movie names
def extract_anime(chunks):
    movie_names = [chunk.result for chunk in chunks if chunk.metadata['entity'] in ['PERSON', 'ORG']]
    return movie_names

extract_anime_names_udf = udf(extract_anime, ArrayType(StringType()))

# Apply the UDF to the DataFrame
anime_df = result.withColumn("movie_names", extract_anime_names_udf(F.col("ner_chunk")))

# # Display the results
# anime_df.select("body", "movie_names").show()
In [15]:
anime_df.show(5)
[Stage 7:>                                                          (0 + 1) / 1]
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+
|   subreddit|     author|                body| parent_id|     id|        created_utc|score|controversiality|           ner_chunk|         movie_names|
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+
|Animesuggest|    Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|[{chunk, 24, 29, ...|                  []|
|Animesuggest|   Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|[{chunk, 2, 4, Im...|[Ima, Soko ni Iru...|
|Animesuggest|    Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|[{chunk, 9, 12, T...|[The Beautiful Wo...|
|Animesuggest|Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26|    3|               0|[{chunk, 4, 10, M...|                  []|
|Animesuggest|   Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10|    1|               0|[{chunk, 2, 5, Ki...|[Mobile Suit Gund...|
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+
only showing top 5 rows

                                                                                
In [ ]:
anime_df.write.parquet("s3a://project-group34/project/suggestions/all_anime/ner/2021", mode="overwrite")

ANIME SUGGESTIONS EXTRACTION USING REGEX¶

In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.executor.memory", "12g")\
    .config("spark.executor.cores", "3")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
    .config(
            "fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.ContainerCredentialsProvider"
    )\
    .getOrCreate()

print(spark.version)
In [14]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
In [17]:
anime_df = spark.read.parquet("s3a://project-group34/project/suggestions/all_anime/ner/")
In [18]:
anime_df.show(5)
[Stage 3:>                                                          (0 + 1) / 1]
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+
|   subreddit|     author|                body| parent_id|     id|        created_utc|score|controversiality|           ner_chunk|         movie_names|
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+
|Animesuggest|    Athenza|{Now and Then, He...| t3_m3ygv3|gqscelh|2021-03-13 10:15:52|    2|               0|[{chunk, 24, 29, ...|                  []|
|Animesuggest|   Roboragi|**Ima, Soko ni Ir...|t1_gqscelh|gqscf1z|2021-03-13 10:16:05|    1|               0|[{chunk, 2, 4, Im...|[Ima, Soko ni Iru...|
|Animesuggest|    Athenza|{Kino no Tabi: Th...| t3_m3xpu6|gqscnqz|2021-03-13 10:20:26|    1|               0|[{chunk, 9, 12, T...|[The Beautiful Wo...|
|Animesuggest|Dropsoftime|Try Mahouka kouko...| t3_m43dco|gqscnr8|2021-03-13 10:20:26|    3|               0|[{chunk, 4, 10, M...|                  []|
|Animesuggest|   Roboragi|**Kino no Tabi: T...|t1_gqscnqz|gqscp63|2021-03-13 10:21:10|    1|               0|[{chunk, 2, 5, Ki...|[Mobile Suit Gund...|
+------------+-----------+--------------------+----------+-------+-------------------+-----+----------------+--------------------+--------------------+
only showing top 5 rows

                                                                                
In [19]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

# import spacy
import re

# # Load spaCy model
# nlp = spacy.load("en_core_web_sm")

# # Define schema for the UDF output
# anime_schema = StructType([
#     StructField("anime_positions", ArrayType(ArrayType(StringType()))),
#     StructField("anime_names", ArrayType(StringType()))
# ])

# # UDF to extract anime names
# @udf(anime_schema)
# def extract_anime_names_udf(text):
#     doc = nlp(text)
#     anime_positions = []
#     anime_names = []

#     for ent in doc.ents:
#         if ent.label_ == "ORG" or ent.label_ == "PERSON":
#             anime_positions.append([ent.start_char, ent.end_char])
#             anime_names.append(ent.text)

#     return (anime_positions, anime_names)

# UDF to remove anime names
@udf(StringType())
def remove_anime_names_udf(text, anime_names):
    if anime_names:
        for name in anime_names:
            text = text.replace(name, ' ')
        return ' '.join(text.split())
    else:
        return text

# UDF to extract anime names using regex
@udf(ArrayType(StringType()))
def extract_anime_names_regex_udf(text, anime_names):
    anime_name_pattern = r'(?:\"([^\"]+)\"|([A-Z][a-z]*(?:\s+(?:[a-z]+\s+)*[A-Z][a-z]*)*)(?: \(\d{4}\))?)'

    anime_matches = re.findall(anime_name_pattern, text)
    anime = [match[0] or match[1] or match[2] for match in anime_matches]
    return anime_names + anime

# Remove anime names from the 'body' text
df_removed_anime_names = anime_df.withColumn("body_no_anime", remove_anime_names_udf(anime_df["body"], anime_df["movie_names"]))

# Regex method to supplement the NER extraction
df_final = df_removed_anime_names.withColumn("additional_anime_names", extract_anime_names_regex_udf(df_removed_anime_names["body_no_anime"], df_removed_anime_names["movie_names"]))
In [21]:
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "additional_anime_names")
In [22]:
df_final.show(5)
[Stage 4:>                                                          (0 + 1) / 1]
+------------+-----------+--------------------+-------------------+-----+----------------+----------------------+
|   subreddit|     author|                body|        created_utc|score|controversiality|additional_anime_names|
+------------+-----------+--------------------+-------------------+-----+----------------+----------------------+
|Animesuggest|    Athenza|{Now and Then, He...|2021-03-13 10:15:52|    2|               0|  [Now and Then, He...|
|Animesuggest|   Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05|    1|               0|  [Ima, Soko ni Iru...|
|Animesuggest|    Athenza|{Kino no Tabi: Th...|2021-03-13 10:20:26|    1|               0|  [The Beautiful Wo...|
|Animesuggest|Dropsoftime|Try Mahouka kouko...|2021-03-13 10:20:26|    3|               0|         [Try Mahouka]|
|Animesuggest|   Roboragi|**Kino no Tabi: T...|2021-03-13 10:21:10|    1|               0|  [Mobile Suit Gund...|
+------------+-----------+--------------------+-------------------+-----+----------------+----------------------+
only showing top 5 rows

                                                                                
In [23]:
import nltk
#nltk.download('stopwords')
eng_stopwords = nltk.corpus.stopwords.words('english')
In [24]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def remove_stop_word_from_anime_names(animes):
    # Filter out single-word anime names that are in the stop words list
    return [anime for anime in animes if not (len(anime.split()) == 1 and anime.lower() in eng_stopwords)]

remove_stop_word_udf = udf(remove_stop_word_from_anime_names, ArrayType(StringType()))
In [25]:
df_final = df_final.withColumn("anime_names_final", remove_stop_word_udf(df_final["additional_anime_names"]))
In [26]:
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "anime_names_final")
In [27]:
# Define a UDF to filter out single-letter anime names
def remove_single_letter_animes(anime_names):
    return [name for name in anime_names if len(name.strip()) > 1]

remove_single_letter_animes_udf = udf(remove_single_letter_animes, ArrayType(StringType()))

# Apply the UDF to the DataFrame
df_final = df_final.withColumn("anime_names_final_cleaned", remove_single_letter_animes_udf("anime_names_final"))
In [28]:
df_final = df_final.select("subreddit", "author", "body", "created_utc", "score", "controversiality", "anime_names_final_cleaned")
In [29]:
df_final.show()
+------------+---------------+--------------------+-------------------+-----+----------------+-------------------------+
|   subreddit|         author|                body|        created_utc|score|controversiality|anime_names_final_cleaned|
+------------+---------------+--------------------+-------------------+-----+----------------+-------------------------+
|Animesuggest|        Athenza|{Now and Then, He...|2021-03-13 10:15:52|    2|               0|     [Now and Then, He...|
|Animesuggest|       Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05|    1|               0|     [Ima, Soko ni Iru...|
|Animesuggest|        Athenza|{Kino no Tabi: Th...|2021-03-13 10:20:26|    1|               0|     [The Beautiful Wo...|
|Animesuggest|    Dropsoftime|Try Mahouka kouko...|2021-03-13 10:20:26|    3|               0|            [Try Mahouka]|
|Animesuggest|       Roboragi|**Kino no Tabi: T...|2021-03-13 10:21:10|    1|               0|     [Mobile Suit Gund...|
|Animesuggest|crash-scientist|I didn’t mean to ...|2021-03-13 10:21:10|    2|               0|     [IDK, But you lit...|
|Animesuggest|        dorting|    Watch Evangelion|2021-03-13 10:21:13|    2|               0|       [Watch Evangelion]|
|Animesuggest|        Athenza|{Vampire Hunter D...|2021-03-13 10:22:51|    2|               0|     [{Vampire Hunter ...|
|Animesuggest|       Roboragi|**Vampire Hunter ...|2021-03-13 10:23:05|    2|               0|     [Bloodlust, Vampi...|
|Animesuggest|        Arvidex|- {Wonder Egg Pei...|2021-03-13 10:23:57|    2|               0|     [{Wonder Egg Peio...|
|Animesuggest|       Roboragi|**Wonder Egg Prio...|2021-03-13 10:24:25|    1|               0|     [Wonder Egg, Wond...|
|Animesuggest|        mgd5800|{Nejimaki Seirei ...|2021-03-13 10:25:12|    2|               0|     [The Labyrinth of...|
|Animesuggest|       Roboragi|**Nejimaki Seirei...|2021-03-13 10:25:40|    2|               0|     [Sky, Alderamin, ...|
|Animesuggest|     DyeDye1234|{is the order a r...|2021-03-13 10:27:55|    1|               0|                 [Tanaka]|
|Animesuggest|       Roboragi|**Gochuumon wa Us...|2021-03-13 10:28:18|    1|               0|     [KINMOZA!"),, Yur...|
|Animesuggest|  _-Sandwitch-_|Third Mushishi an...|2021-03-13 10:30:02|   10|               0|     [Third Mushishi a...|
|Animesuggest|     DyeDye1234|Not a manga but {...|2021-03-13 10:31:48|    1|               0|                       []|
|Animesuggest|       Roboragi|**Katanagatari** ...|2021-03-13 10:32:15|    1|               0|     [Katanagatari, En...|
|Animesuggest|         MaCl97|           "Monster"|2021-03-13 10:34:28|    1|               0|                [Monster]|
|Animesuggest|    kyriosgreek|&gt;Mahouka kouko...|2021-03-13 10:35:44|    1|               0|                [Mahouka]|
+------------+---------------+--------------------+-------------------+-----+----------------+-------------------------+
only showing top 20 rows

                                                                                
In [30]:
df_final.write.parquet("s3a://project-group34/project/suggestions/all_animes/cleaned_with_regex/", mode="overwrite")
                                                                                

EXPLODING SUGGESTIONS¶

In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.executor.memory", "12g")\
    .config("spark.executor.cores", "3")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
    .config(
            "fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.ContainerCredentialsProvider"
    )\
    .getOrCreate()

print(spark.version)
In [3]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
In [4]:
df_final = spark.read.parquet("s3a://project-group34/project/suggestions/all_animes/cleaned_with_regex/")
23/11/26 20:54:25 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                
In [5]:
df_final.show(5)
[Stage 1:>                                                          (0 + 1) / 1]
+------------+-----------+--------------------+-------------------+-----+----------------+-------------------------+
|   subreddit|     author|                body|        created_utc|score|controversiality|anime_names_final_cleaned|
+------------+-----------+--------------------+-------------------+-----+----------------+-------------------------+
|Animesuggest|    Athenza|{Now and Then, He...|2021-03-13 10:15:52|    2|               0|     [Now and Then, He...|
|Animesuggest|   Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05|    1|               0|     [Ima, Soko ni Iru...|
|Animesuggest|    Athenza|{Kino no Tabi: Th...|2021-03-13 10:20:26|    1|               0|     [The Beautiful Wo...|
|Animesuggest|Dropsoftime|Try Mahouka kouko...|2021-03-13 10:20:26|    3|               0|            [Try Mahouka]|
|Animesuggest|   Roboragi|**Kino no Tabi: T...|2021-03-13 10:21:10|    1|               0|     [Mobile Suit Gund...|
+------------+-----------+--------------------+-------------------+-----+----------------+-------------------------+
only showing top 5 rows

                                                                                
In [6]:
from pyspark.sql.functions import explode, col, count, split

# Flatten the anime_names column
df_flattened = df_final.withColumn("anime_name", explode(col("anime_names_final_cleaned")))
In [16]:
from pyspark.sql.functions import when
# Rename specified anime titles
df_flattened = df_flattened.withColumn("anime_name",
                   when(col("anime_name") == "Steins", "Steins Gate")
                   .when(col("anime_name") == "Gate", "Steins Gate")
                   .when(col("anime_name") == "One", "One Piece")
                   .when(col("anime_name") == "Attack", "Attack on Titan")
                   .when(col("anime_name") == "Titan", "Attack on Titan")
                   .when(col("anime_name") == "Kaguya", "Kaguya-sama: Love Is War")
                   .when(col("anime_name") == "English: Made in Abyss", "Made in Abyss")
                   .when(col("anime_name") == "English: Violet Evergarden", "Violet Evergarden")
                   .when(col("anime_name") == "Steins; ", "Steins Gate")
                   .when(col("anime_name") == "AOT", "Attack on Titan")
                   .otherwise(col("anime_name")))
In [17]:
from pyspark.sql.functions import regexp_replace, col

# Replace "English: " and "English : " with an empty string
df_flattened = df_flattened.withColumn("anime_name", regexp_replace(col("anime_name"), "English: ?", ""))
In [18]:
from pyspark.sql.functions import trim, col

df_flattened = df_flattened.withColumn("anime_name", trim(col("anime_name")))
In [25]:
non_anime_terms = [
    "Roboragi", "Status", "Genres", "Finished", "Episodes", "English: ",
    "Animesuggest", "Edit", "Source", "Nihilate", "Synonyms", "Mistake",
    "Drama", "Comedy", "Action", "Romance", "Fantasy", "Slice of Life",
    "Supernatural", "Adventure", "Psychological", "Sci", "Fi", "Mystery",
    "Thriller", "Horror", "Thanks", "Movie", "Ecchi", "English", "Thank",
    "Manga", "Also", "Mecha", "Please", "Sports", "Releasing", "Music", "Yeah", 
    "English: The ", "Oh", "Fate", "Chapters", "OP", "Volumes", "Yes", "Episode", 
    "Maybe", "English: :", "Well", "Anime", "Season", "V Short", "Juj", "English: The",
    "Jo", "English: A", "Netflix", "Great", "", "A", ":", "on", "Really", "Like", "But I",
    "The"
]
In [26]:
# Filter out the non-anime names and streaming services
df_flattened = df_flattened.filter(~col("anime_name").isin(non_anime_terms))
In [27]:
df_flattened.show(5)
+------------+--------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|   subreddit|  author|                body|        created_utc|score|controversiality|anime_names_final_cleaned|          anime_name|
+------------+--------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|Animesuggest| Athenza|{Now and Then, He...|2021-03-13 10:15:52|    2|               0|     [Now and Then, He...|        Now and Then|
|Animesuggest| Athenza|{Now and Then, He...|2021-03-13 10:15:52|    2|               0|     [Now and Then, He...|      Here and There|
|Animesuggest|Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05|    1|               0|     [Ima, Soko ni Iru...|                 Ima|
|Animesuggest|Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05|    1|               0|     [Ima, Soko ni Iru...|    Soko ni Iru Boku|
|Animesuggest|Roboragi|**Ima, Soko ni Ir...|2021-03-13 10:16:05|    1|               0|     [Ima, Soko ni Iru...|Now and Then, Her...|
+------------+--------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
only showing top 5 rows

                                                                                
In [28]:
df_flattened.write.parquet("s3a://project-group34/project/suggestions/all_animes/flattened/", mode="overwrite")
                                                                                

GROUPING SUGGESTIONS¶

In [ ]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

# Import pyspark and build Spark session
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.executor.memory", "12g")\
    .config("spark.executor.cores", "3")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3,org.apache.hadoop:hadoop-aws:3.2.2")\
    .config(
            "fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.ContainerCredentialsProvider"
    )\
    .getOrCreate()

print(spark.version)
In [3]:
import sagemaker
from pyspark.sql.functions import lower, regexp_replace, col, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import Finisher, DocumentAssembler
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
In [7]:
df_flattened = spark.read.parquet("s3a://project-group34/project/suggestions/all_animes/flattened/")
In [8]:
df_flattened.show(5)
[Stage 3:>                                                          (0 + 1) / 1]
+------------+----------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|   subreddit|          author|                body|        created_utc|score|controversiality|anime_names_final_cleaned|          anime_name|
+------------+----------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
|Animesuggest|   ElegantTea122|Happy Sugar Life\...|2021-01-28 17:24:45|    1|               0|     [Happy Sugar Life...|Happy Sugar Life\...|
|Animesuggest|   ElegantTea122|Happy Sugar Life\...|2021-01-28 17:24:45|    1|               0|     [Happy Sugar Life...|          All have M|
|Animesuggest|fearlesslalready| {God of highschool}|2021-01-28 17:27:15|    1|               1|                    [God]|                 God|
|Animesuggest|      DuckFantic|I’ve never heard ...|2021-01-28 17:28:46|    3|               0|     [Black Clover, I’ll]|        Black Clover|
|Animesuggest|      DuckFantic|I’ve never heard ...|2021-01-28 17:28:46|    3|               0|     [Black Clover, I’ll]|                I’ll|
+------------+----------------+--------------------+-------------------+-----+----------------+-------------------------+--------------------+
only showing top 5 rows

                                                                                
In [9]:
df_anime_frequency = df_flattened.groupBy("anime_name").count()
In [10]:
df_anime_frequency.show(5)
[Stage 4:===============================================>           (4 + 1) / 5]
+--------------------+-----+
|          anime_name|count|
+--------------------+-----+
|               Sword|  565|
|Apollon (Kids on ...|    3|
|                Vivy| 2813|
|          Keiyakusha|  250|
|                 S G|  117|
+--------------------+-----+
only showing top 5 rows

                                                                                
In [11]:
from pyspark.sql.functions import desc

# Sort by count in descending order
df_top_100_animes = df_anime_frequency.sort(desc('count')).limit(100)
In [12]:
df_top_100_animes.show(100, truncate=False)
[Stage 7:===============================================>           (4 + 1) / 5]
+---------------------------------+-----+
|anime_name                       |count|
+---------------------------------+-----+
|Steins Gate                      |19606|
|Attack on Titan                  |16722|
|One Piece                        |10936|
|Violet Evergarden                |8780 |
|Made in Abyss                    |8558 |
|Monster                          |8543 |
|Clannad                          |7461 |
|Zero                             |6724 |
|Death Note                       |6559 |
|Code Geass                       |6169 |
|Cowboy Bebop                     |5758 |
|Horimiya                         |5710 |
|Gintama                          |5654 |
|Berserk                          |5485 |
|Dororo                           |5053 |
|Vinland Saga                     |5043 |
|Overlord                         |4887 |
|Naruto                           |4778 |
|Death Parade                     |4663 |
|Mob Psycho                       |4621 |
|Hunter x Hunter                  |4505 |
|Fruits Basket                    |4500 |
|Toradora                         |4356 |
|Brotherhood                      |4338 |
|Fullmetal Alchemist              |4324 |
|Kaguya-sama: Love Is War         |4114 |
|Another                          |3886 |
|Mushishi                         |3799 |
|Parasyte                         |3754 |
|Erased                           |3694 |
|Akudama Drive                    |3595 |
|Hyouka                           |3587 |
|Dorohedoro                       |3571 |
|Mahou Shoujo                     |3527 |
|Black Lagoon                     |3428 |
|Samurai Champloo                 |3367 |
|Jujutsu Kaisen                   |3301 |
|Neon Genesis Evangelion          |3275 |
|One Punch Man                    |3184 |
|Your Lie in April                |3152 |
|Silent Voice                     |3106 |
|Odd Taxi                         |3101 |
|Nichijou                         |3080 |
|Gurren Lagann                    |3044 |
|Bleach                           |3044 |
|Definitely                       |2987 |
|Konosuba                         |2929 |
|Akame                            |2923 |
|Love                             |2920 |
|Tokyo Ghoul                      |2902 |
|Ergo Proxy                       |2858 |
|Trigun                           |2835 |
|Charlotte                        |2818 |
|Vivy                             |2813 |
|Black Clover                     |2783 |
|Talentless Nana                  |2732 |
|Shiki                            |2729 |
|Barakamon                        |2696 |
|From the New World               |2676 |
|Kaiji                            |2673 |
|Try                              |2645 |
|Madoka Magica                    |2641 |
|Perfect Blue                     |2622 |
|Psycho                           |2620 |
|Good                             |2613 |
|Haikyuu                          |2610 |
|A Place Further Than the Universe|2609 |
|Death                            |2596 |
|Land of the Lustrous             |2563 |
|Dr                               |2528 |
|ERASED                           |2460 |
|Watch                            |2449 |
|Demon Slayer                     |2405 |
|Grand Blue                       |2405 |
|Puella Magi Madoka Magica        |2400 |
|Akira                            |2361 |
|Angel Beats                      |2346 |
|Baccano                          |2338 |
|Golden Time                      |2337 |
|Serial Experiments Lain          |2334 |
|Oregairu                         |2327 |
|Galactic Heroes                  |2308 |
|March comes in like a lion       |2290 |
|Sword Art Online                 |2283 |
|Claymore                         |2276 |
|Promised Neverland               |2254 |
|Evangelion                       |2250 |
|Lion                             |2248 |
|Mononoke                         |2219 |
|Wotakoi                          |2205 |
|100                              |2190 |
|Assassination Classroom          |2146 |
|Terror in Resonance              |2130 |
|Bot                              |2106 |
|Black                            |2101 |
|Jojo                             |2092 |
|Asobi Asobase                    |2091 |
|Noragami                         |2077 |
|Orange                           |2070 |
|Elfen Lied                       |2055 |
+---------------------------------+-----+

                                                                                
In [13]:
df_top_100_animes_pd = df_top_100_animes.toPandas()
                                                                                
In [14]:
df_top_100_animes_pd
Out[14]:
anime_name count
0 Steins Gate 19606
1 Attack on Titan 16722
2 One Piece 10936
3 Violet Evergarden 8780
4 Made in Abyss 8558
... ... ...
95 Jojo 2092
96 Asobi Asobase 2091
97 Noragami 2077
98 Orange 2070
99 Elfen Lied 2055

100 rows × 2 columns

In [15]:
df_top_100_animes_pd.to_csv("../../data/csv/top100_animes_suggested.csv", index=False)

MERGE WITH EXTERNAL¶

In [6]:
import pandas as pd
In [7]:
df_reddit = pd.read_csv("../../data/csv/top100_animes_suggested.csv")
df_reddit.head()
Out[7]:
anime_name japanese_title count
0 Steins Gate Steins;Gate 19606
1 Attack on Titan Shingeki no Kyojin 16722
2 One Piece Wan P_su 10936
3 Violet Evergarden Vaioretto Ev_g_den 8780
4 Made in Abyss Meido in Abisu 8558
In [8]:
df_external = pd.read_csv("../../data/csv/anime_data.csv")
df_external.head()
Out[8]:
Unnamed: 0 title genre episodes popularity score
0 0 "0" ['Music'] 1.0 7345.0 4.77
1 1 "Aesop" no Ohanashi yori: Ushi to Kaeru, Yokub... ['Kids'] 1.0 12413.0 5.61
2 2 "Bungaku Shoujo" Kyou no Oyatsu: Hatsukoi ['Comedy', 'Fantasy', 'School'] 1.0 3466.0 6.96
3 3 "Bungaku Shoujo" Memoire ['Drama', 'Romance', 'School'] 3.0 2943.0 7.40
4 4 "Bungaku Shoujo" Movie ['Mystery', 'Drama', 'Romance', 'School'] 1.0 1799.0 7.48
In [15]:
from fuzzywuzzy import process, fuzz
In [10]:
def get_best_match(name, choices, scorer=fuzz.WRatio, limit=1):
    return process.extractOne(name, choices, scorer=scorer)
In [11]:
titles = df_external['title'].tolist()
In [23]:
# Apply fuzzy matching
df_reddit['matched_title'] = df_reddit['japanese_title'].apply(lambda x: get_best_match(x, titles, scorer=fuzz.WRatio)[0])
df_reddit['matched_score'] = df_reddit['japanese_title'].apply(lambda x: df_external.loc[df_external['title'] == get_best_match(x, titles, scorer=fuzz.WRatio)[0], 'score'].iloc[0])

# Apply fuzzy matching
df_reddit['matched_title_english'] = df_reddit['anime_name'].apply(lambda x: get_best_match(x, titles, scorer=fuzz.WRatio)[0])
df_reddit['matched_score_english'] = df_reddit['anime_name'].apply(lambda x: df_external.loc[df_external['title'] == get_best_match(x, titles, scorer=fuzz.WRatio)[0], 'score'].iloc[0])
In [24]:
df_reddit.head(20)
Out[24]:
anime_name japanese_title count matched_title matched_score matched_title_english matched_score_english
0 Steins Gate Steins;Gate 19606 Steins;Gate 9.11 Steins;Gate 9.11
1 Attack on Titan Shingeki no Kyojin 16722 Shingeki no Kyojin 8.47 K-On! 7.86
2 One Piece Wan P_su 10936 Doraemon Movie 25: Nobita no Wan Nyan Jikuuden 7.42 One Piece 8.53
3 Violet Evergarden Vaioretto Ev_g_den 8780 gdMen 5.95 Violet Evergarden 8.62
4 Made in Abyss Meido in Abisu 8558 Isu 4.56 Made in Abyss 8.83
5 Monster Monsut_ 8543 Monsuto Anime 6.44 Monster 8.69
6 Clannad Kuranado 7461 K 7.62 Clannad 8.16
7 Zero Zero 6724 AIKa Zero 5.98 AIKa Zero 5.98
8 Death Note Desu N_to 6559 TO 6.43 Death Note 8.65
9 Code Geass Code Geass 6169 Code Geass: Boukoku no Akito 1 - Yokuryuu wa M... 7.49 Code Geass: Boukoku no Akito 1 - Yokuryuu wa M... 7.49
10 Cowboy Bebop Cowboy Bebop 5758 Cowboy Bebop 8.81 Cowboy Bebop 8.81
11 Horimiya Horimiya 5710 Grim 5.30 Grim 5.30
12 Gintama Gintama 5654 Gintama 8.97 Gintama 8.97
13 Berserk Berserk 5485 Berserk 6.60 Berserk 6.60
14 Dororo Dororo 5053 Dororo 8.23 Dororo 8.23
15 Vinland Saga Vinland Saga 5043 Vinland Saga 8.78 Vinland Saga 8.78
16 Overlord Overlord 4887 Overlord 8.05 Overlord 8.05
17 Naruto Naruto 4778 Naruto 7.93 Naruto 7.93
18 Death Parade Death Parade 4663 Death Parade 8.22 Death Parade 8.22
19 Mob Psycho Mob Psycho 4621 Mob Psycho 100 8.51 Mob Psycho 100 8.51
In [25]:
anime = ["One Piece", "Violet Evergarden", "Made in Abyss", "Monster", "Clannad", "Death Note"]

for anime_name in anime:
    
    for i in range(len(df_reddit)):
        
        if df_reddit.loc[i, "anime_name"] == anime_name:
            df_reddit.loc[i, "matched_title"] = anime_name
            df_reddit.loc[i, "matched_score"] = df_reddit.loc[i, "matched_score_english"]
        else:
            pass
In [28]:
df_reddit = df_reddit[["anime_name", "japanese_title", "count", "matched_score"]].head(20)
In [29]:
df_reddit.to_csv("../../data/csv/top20_animes_with_scores.csv", index=False)