Setup - Azure¶
In [ ]:
spark
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 37, 6, Finished, Available)
Out[ ]:
SparkSession - hive
In [ ]:
sc
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 37, 7, Finished, Available)
Out[ ]:
In [ ]:
%%configure -f \
{"conf": {"spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.2"}}
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, -1, Finished, Available)
Unrecognized options:
In [ ]:
!pip install spark-nlp
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 6, Finished, Available)
Collecting spark-nlp Downloading spark_nlp-5.1.4-py2.py3-none-any.whl (540 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 540.7/540.7 kB 19.7 MB/s eta 0:00:00 Installing collected packages: spark-nlp Successfully installed spark-nlp-5.1.4
In [ ]:
!pip install azureml.fsspec
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 7, Finished, Available)
Collecting azureml.fsspec Downloading azureml_fsspec-1.2.0-py3-none-any.whl (10 kB) Collecting azureml-dataprep<4.14.0a,>=4.12.0a Downloading azureml_dataprep-4.12.7-py3-none-any.whl (38.2 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 38.2/38.2 MB 56.5 MB/s eta 0:00:0000:0100:01 Requirement already satisfied: fsspec>=2021.6.1 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azureml.fsspec) (2022.10.0) Requirement already satisfied: cloudpickle<3.0.0,>=1.1.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (2.2.0) Requirement already satisfied: pyyaml<7.0.0,>=5.1.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (6.0) Requirement already satisfied: azure-identity>=1.7.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (1.7.0) Requirement already satisfied: dotnetcore2<4.0.0,>=3.0.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (3.1.23) Requirement already satisfied: azureml-dataprep-native<39.0.0,>=38.0.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (38.0.0) Requirement already satisfied: jsonschema in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (4.17.0) Collecting azureml-dataprep-rslex~=2.19.6dev0 Downloading azureml_dataprep_rslex-2.19.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (23.5 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 23.5/23.5 MB 84.8 MB/s eta 0:00:0000:0100:01 Collecting msal-extensions~=0.3.0 Downloading msal_extensions-0.3.1-py2.py3-none-any.whl (18 kB) Requirement already satisfied: azure-core<2.0.0,>=1.11.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (1.26.1) Requirement already satisfied: msal<2.0.0,>=1.12.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (1.18.0) Requirement already satisfied: six>=1.12.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (1.16.0) Requirement already satisfied: cryptography>=2.5 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (38.0.3) Requirement already satisfied: distro>=1.2.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from dotnetcore2<4.0.0,>=3.0.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (1.8.0) Requirement already satisfied: attrs>=17.4.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from jsonschema->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (22.1.0) Requirement already satisfied: pyrsistent!=0.17.0,!=0.17.1,!=0.17.2,>=0.14.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from jsonschema->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (0.19.2) Requirement already satisfied: typing-extensions>=4.0.1 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azure-core<2.0.0,>=1.11.0->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (4.4.0) Requirement already satisfied: requests>=2.18.4 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from azure-core<2.0.0,>=1.11.0->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (2.28.1) Requirement already satisfied: cffi>=1.12 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from cryptography>=2.5->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (1.15.1) Requirement already satisfied: PyJWT[crypto]<3,>=1.0.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from msal<2.0.0,>=1.12.0->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (2.6.0) Requirement already satisfied: portalocker<3,>=1.0 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from msal-extensions~=0.3.0->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (2.6.0) Requirement already satisfied: pycparser in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from cffi>=1.12->cryptography>=2.5->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (2.21) Requirement already satisfied: idna<4,>=2.5 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from requests>=2.18.4->azure-core<2.0.0,>=1.11.0->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (3.4) Requirement already satisfied: urllib3<1.27,>=1.21.1 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from requests>=2.18.4->azure-core<2.0.0,>=1.11.0->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (1.26.4) Requirement already satisfied: charset-normalizer<3,>=2 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from requests>=2.18.4->azure-core<2.0.0,>=1.11.0->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (2.1.1) Requirement already satisfied: certifi>=2017.4.17 in /home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages (from requests>=2.18.4->azure-core<2.0.0,>=1.11.0->azure-identity>=1.7.0->azureml-dataprep<4.14.0a,>=4.12.0a->azureml.fsspec) (2022.9.24) Installing collected packages: azureml-dataprep-rslex, msal-extensions, azureml-dataprep, azureml.fsspec Attempting uninstall: azureml-dataprep-rslex Found existing installation: azureml-dataprep-rslex 2.11.4 Uninstalling azureml-dataprep-rslex-2.11.4: Successfully uninstalled azureml-dataprep-rslex-2.11.4 Attempting uninstall: msal-extensions Found existing installation: msal-extensions 1.0.0 Uninstalling msal-extensions-1.0.0: Successfully uninstalled msal-extensions-1.0.0 Attempting uninstall: azureml-dataprep Found existing installation: azureml-dataprep 4.5.7 Uninstalling azureml-dataprep-4.5.7: Successfully uninstalled azureml-dataprep-4.5.7 ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts. azureml-dataset-runtime 1.47.0 requires azureml-dataprep<4.6.0a,>=4.5.0a, but you have azureml-dataprep 4.12.7 which is incompatible. Successfully installed azureml-dataprep-4.12.7 azureml-dataprep-rslex-2.19.6 azureml.fsspec-1.2.0 msal-extensions-0.3.1
In [ ]:
import pandas as pd
import numpy as np
import json
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from azure.storage.blob import BlobServiceClient
import io
from azureml.fsspec import AzureMachineLearningFileSystem
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 8, Finished, Available)
Read in Data¶
Comments¶
In [ ]:
# Azure Storage Account details
connection_string = "DefaultEndpointsProtocol=https;AccountName=group01astorage63e260f45;AccountKey=iGcY4Un0hlKMMqSs6BlLhmqNU0D7m8uJyVz2din6CTAp3AvM3QPH8/Tk8k+xN77D5R3KXvJZYBwX+AStLsNR5Q==;EndpointSuffix=core.windows.net"
container_name = "azureml-blobstore-a1e50e78-9796-4cfe-a8bb-88f7de188a74"
# Initialize the BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# create the filesystem
fs = AzureMachineLearningFileSystem("azureml://subscriptions/71060237-912c-4e5c-849b-1a23626fc284/resourcegroups/project-rg/workspaces/group-01-aml/datastores/workspaceblobstore/paths/")
# Get the list of files
files = fs.glob('comments/*.parquet')
# Initialize an empty list to hold DataFrames
dfs = []
for file in files: # Skip the first file
# Get the blob client for the file
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file)
# Download the blob to a stream
with io.BytesIO() as input_blob:
blob_client.download_blob().readinto(input_blob)
input_blob.seek(0) # Go to the start of the stream
# Read the parquet file
df = pd.read_parquet(input_blob, engine='pyarrow')
dfs.append(df)
# Concatenate all DataFrames in the list
comments = pd.concat(dfs, ignore_index=True)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 32, 9, Finished, Available)
Failure while loading azureml_run_type_providers. Failed to load entrypoint azureml.scriptrun = azureml.core.script_run:ScriptRun._from_run_dto with exception [Errno 2] No such file or directory: '/home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages/msal_extensions-1.0.0.dist-info/METADATA'.
Submissions¶
In [ ]:
# Azure Storage Account details
connection_string = "DefaultEndpointsProtocol=https;AccountName=group01astorage63e260f45;AccountKey=iGcY4Un0hlKMMqSs6BlLhmqNU0D7m8uJyVz2din6CTAp3AvM3QPH8/Tk8k+xN77D5R3KXvJZYBwX+AStLsNR5Q==;EndpointSuffix=core.windows.net"
container_name = "azureml-blobstore-a1e50e78-9796-4cfe-a8bb-88f7de188a74"
# Initialize the BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# create the filesystem
fs = AzureMachineLearningFileSystem("azureml://subscriptions/71060237-912c-4e5c-849b-1a23626fc284/resourcegroups/project-rg/workspaces/group-01-aml/datastores/workspaceblobstore/paths/")
# Get the list of files
files = fs.glob('submissions/*.parquet')
# Initialize an empty list to hold DataFrames
dfs = []
for file in files: # Skip the first file
# Get the blob client for the file
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file)
# Download the blob to a stream
with io.BytesIO() as input_blob:
blob_client.download_blob().readinto(input_blob)
input_blob.seek(0) # Go to the start of the stream
# Read the parquet file
df = pd.read_parquet(input_blob, engine='pyarrow')
dfs.append(df)
# Concatenate all DataFrames in the list
submissions = pd.concat(dfs, ignore_index=True)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 9, Finished, Available)
Failure while loading azureml_run_type_providers. Failed to load entrypoint azureml.scriptrun = azureml.core.script_run:ScriptRun._from_run_dto with exception [Errno 2] No such file or directory: '/home/trusted-service-user/cluster-env/env/lib/python3.10/site-packages/msal_extensions-1.0.0.dist-info/METADATA'.
Conduct Basic NLP and Text Checks¶
Create pipeline for text cleaning¶
In [ ]:
def get_pipeline(input_):
# Assemble document
document_assembler = DocumentAssembler() \
.setInputCol(input_) \
.setOutputCol("document")
# Tokenize text
tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")
# Normalize tokens
normalizer = Normalizer() \
.setInputCols(["token"]) \
.setOutputCol("normalized") \
.setLowercase(True) \
.setCleanupPatterns(["[^A-Za-z]+"])
# Spell checking
spell_checker = NorvigSweetingModel.pretrained() \
.setInputCols(["normalized"]) \
.setOutputCol("corrected")
# Remove stop words
stop_words_cleaner = StopWordsCleaner() \
.setInputCols(["corrected"]) \
.setOutputCol("cleaned") \
.setCaseSensitive(False) \
.setStopWords(StopWordsCleaner().getStopWords()+['im','youre','k','dont','wont','couldnt'])
# Lemmatize tokens
lemmatizer = LemmatizerModel.pretrained("lemma_antbnc") \
.setInputCols(["cleaned"]) \
.setOutputCol("lemmatized")
# Add Finisher
finisher = Finisher() \
.setInputCols(["lemmatized"]) \
.setOutputCols(["cleaned_body"]) \
.setOutputAsArray(True) \
.setCleanAnnotations(True)
# Build and run the pipeline
pipeline = Pipeline(stages=[
document_assembler,
tokenizer,
normalizer,
spell_checker,
stop_words_cleaner,
lemmatizer,
finisher
])
return pipeline
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 10, Finished, Available)
Clean submission body data through the pipeline¶
In [ ]:
from pyspark.sql.types import *
# remove crosspost_parent and crosspost_parent_list due to NAs
submissions = submissions.drop(columns=['crosspost_parent', 'crosspost_parent_list'])
# specify schema
schema = StructType([
StructField("author", StringType(), True),
StructField("title", StringType(), True),
StructField("selftext", StringType(),True),
StructField("subreddit", StringType(),True),
StructField("score", IntegerType(),True),
StructField("num_comments", StringType(),True),
StructField("permalink", StringType(),True),
StructField("created_utc", DateType(),True),
StructField("url", StringType(),True),
StructField("domain", StringType(),True),
StructField("is_video", BooleanType(),True),
StructField("is_self", BooleanType(),True),
StructField("is_reddit_media_domain", BooleanType(),True),
StructField("spoiler", BooleanType(),True),
StructField("over_18", BooleanType(),True),
StructField("stickied", BooleanType(),True),
StructField("thumbnail", StringType(),True),
StructField("media", StringType(),True),
StructField("secure_media", StringType(),True),
StructField("gilded", IntegerType(),True),
StructField("archived", BooleanType(),True),
StructField("distinguished", StringType(),True)])
# convert submissions pandas df to spark df
submissions_spark=spark.createDataFrame(submissions, schema=schema)
# Run model
pipeline = get_pipeline(input_='selftext')
model = pipeline.fit(submissions_spark)
cleaned_df_subs = model.transform(submissions_spark)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 11, Finished, Available)
/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:604: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead. /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:425: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below: Expected a string or bytes dtype, got int64 Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
spellcheck_norvig download started this may take some time. Approximate size to download 4.2 MB [OK!] lemma_antbnc download started this may take some time. Approximate size to download 907.6 KB [OK!]
Clean submission title data through the pipeline¶
In [ ]:
def get_pipeline(input_):
# Assemble document
document_assembler = DocumentAssembler() \
.setInputCol(input_) \
.setOutputCol("document")
# Tokenize text
tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")
# Normalize tokens
normalizer = Normalizer() \
.setInputCols(["token"]) \
.setOutputCol("normalized") \
.setLowercase(True) \
.setCleanupPatterns(["[^A-Za-z]+"])
# Spell checking
spell_checker = NorvigSweetingModel.pretrained() \
.setInputCols(["normalized"]) \
.setOutputCol("corrected")
# Remove stop words
stop_words_cleaner = StopWordsCleaner() \
.setInputCols(["corrected"]) \
.setOutputCol("cleaned") \
.setCaseSensitive(False) \
.setStopWords(StopWordsCleaner().getStopWords()+['im','youre','k','dont','wont','couldnt'])
# Lemmatize tokens
lemmatizer = LemmatizerModel.pretrained("lemma_antbnc") \
.setInputCols(["cleaned"]) \
.setOutputCol("lemmatized")
# Add Finisher
finisher = Finisher() \
.setInputCols(["lemmatized"]) \
.setOutputCols(["cleaned_title"]) \
.setOutputAsArray(True) \
.setCleanAnnotations(True)
# Build and run the pipeline
pipeline = Pipeline(stages=[
document_assembler,
tokenizer,
normalizer,
spell_checker,
stop_words_cleaner,
lemmatizer,
finisher
])
return pipeline
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 12, Finished, Available)
In [ ]:
# Run model
pipeline = get_pipeline(input_='title')
model = pipeline.fit(cleaned_df_subs)
submissions_cleaned = model.transform(cleaned_df_subs)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 13, Finished, Available)
spellcheck_norvig download started this may take some time. Approximate size to download 4.2 MB [OK!] lemma_antbnc download started this may take some time. Approximate size to download 907.6 KB [OK!]
Clean comment body data through the pipeline¶
In [ ]:
def get_pipeline(input_):
# Assemble document
document_assembler = DocumentAssembler() \
.setInputCol(input_) \
.setOutputCol("document")
# Tokenize text
tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")
# Normalize tokens
normalizer = Normalizer() \
.setInputCols(["token"]) \
.setOutputCol("normalized") \
.setLowercase(True) \
.setCleanupPatterns(["[^A-Za-z]+"])
# Spell checking
spell_checker = NorvigSweetingModel.pretrained() \
.setInputCols(["normalized"]) \
.setOutputCol("corrected")
# Remove stop words
stop_words_cleaner = StopWordsCleaner() \
.setInputCols(["corrected"]) \
.setOutputCol("cleaned") \
.setCaseSensitive(False) \
.setStopWords(StopWordsCleaner().getStopWords()+['im','youre','k','dont','wont','couldnt'])
# Lemmatize tokens
lemmatizer = LemmatizerModel.pretrained("lemma_antbnc") \
.setInputCols(["cleaned"]) \
.setOutputCol("lemmatized")
# Add Finisher
finisher = Finisher() \
.setInputCols(["lemmatized"]) \
.setOutputCols(["cleaned_body"]) \
.setOutputAsArray(True) \
.setCleanAnnotations(True)
# Build and run the pipeline
pipeline = Pipeline(stages=[
document_assembler,
tokenizer,
normalizer,
spell_checker,
stop_words_cleaner,
lemmatizer,
finisher
])
return pipeline
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 32, 10, Finished, Available)
In [ ]:
from pyspark.sql.types import *
# remove author_flair_css_class and author_cakeday due to NAs
comments = comments.drop(columns=['author_flair_css_class', 'author_cakeday'])
# specify schema
schema = StructType([
StructField("author", StringType(), True),
StructField("body", StringType(),True),
StructField("can_gild", StringType(),True),
StructField("controversiality", IntegerType(),True),
StructField("created_utc", DateType(),True),
StructField("distinguished", StringType(),True),
StructField("edited", StringType(),True),
StructField("gilded", IntegerType(),True),
StructField("id", StringType(),True),
StructField("is_submitter", BooleanType(),True),
StructField("link_id", StringType(),True),
StructField("parent_id", StringType(),True),
StructField("permalink", StringType(),True),
StructField("retrieved_on", DateType(),True),
StructField("score", IntegerType(),True),
StructField("stickied", BooleanType(),True),
StructField("subreddit", StringType(),True),
StructField("subreddit_id", StringType(),True)])
# convert submissions pandas df to spark df
comments_spark=spark.createDataFrame(comments, schema=schema)
# Run model
pipeline = get_pipeline(input_='body')
model = pipeline.fit(comments_spark)
comments_cleaned = model.transform(comments_spark)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 32, 11, Submitted, Running)
/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:604: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead. /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:425: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below: Expected a string or bytes dtype, got bool Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. /opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py:486: FutureWarning: iteritems is deprecated and will be removed in a future version. Use .items instead.
InvalidHttpRequestToLivy: Submission failed due to error content =["requirement failed: Session isn't active."] HTTP status code: 400. Trace ID: 693095dc-4b95-4b78-8b23-0e122d26f05e.
Export cleaned submissions and comments as parquet¶
In [ ]:
workspace_default_storage_account = "group01astorage63e260f45"
workspace_default_container = "azureml-blobstore-a1e50e78-9796-4cfe-a8bb-88f7de188a74"
workspace_wasbs_base_url = (
f"wasbs://{workspace_default_container}@{workspace_default_storage_account}.blob.core.windows.net/")
# save submissions
submissions_cleaned.repartition(64).write.mode("overwrite").parquet(f"{workspace_wasbs_base_url}nlp_cleaned_teg/submissions")
StatementMeta(7bcc356e-2304-45b0-a972-3b83d052d5ce, 8, 17, Finished, Available)
In [ ]:
# save comments
workspace_default_storage_account = "group01astorage63e260f45"
workspace_default_container = "azureml-blobstore-a1e50e78-9796-4cfe-a8bb-88f7de188a74"
workspace_wasbs_base_url = (
f"wasbs://{workspace_default_container}@{workspace_default_storage_account}.blob.core.windows.net/")
comments_cleaned.repartition(64).write.mode("overwrite").parquet(f"{workspace_wasbs_base_url}nlp_cleaned_teg/comments")
StatementMeta(7bcc356e-2304-45b0-a972-3b83d052d5ce, 8, 18, Submitted, Running)
TF-IDF, Common Words, and Text Length¶
Determine the most important words in politics subreddits submissions body and title using TF-IDF¶
In [ ]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF, HashingTF, StopWordsRemover
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, MapType, StringType
import string
from sparknlp.pretrained import PretrainedPipeline
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, Normalizer, StopWordsCleaner, LemmatizerModel
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 14, Finished, Available)
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
pol_subs_body_title_words = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits)) \
.select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body).alias("nlpbodytext"))\
# Split the nlpbodytext into an array of words
pol_subs_body_title_words = pol_subs_body_title_words.withColumn("array_words", F.split(pol_subs_body_title_words["nlpbodytext"], " "))
# Define HashingTF
hashingTF = HashingTF(inputCol="array_words", outputCol="rawFeatures",numFeatures=40000000)
# Define IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
words = pol_subs_body_title_words.select('array_words')
# Hashing frequency
tf = hashingTF.transform(words)
# IDF
idf_model = idf.fit(tf)
# TFIDF
tfidf = idf_model.transform(tf)
ndf = tfidf.select(F.explode('array_words').name('expwords')).withColumn('words',F.array('expwords'))
# Define HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=40000000)
hashudf = F.udf(lambda vector : vector.indices.tolist()[0],F.StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(F.col('rawFeatures')))
wordtf = wordtf.dropDuplicates(["expwords"])
udf1 = F.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = tfidf.select(F.explode(udf1(F.col('features'))).name('wordhash','value'))
valuedf = valuedf.dropDuplicates(["wordhash"])
join_importance = wordtf.join(valuedf, wordtf.wordhash == valuedf.wordhash, 'inner').select(wordtf.expwords, valuedf.value)
top100_pol_subs = join_importance.sort(F.col("value").desc()).limit(100).toPandas()
top100_pol_subs = top100_pol_subs.reset_index(drop=True)
top100_pol_subs = top100_pol_subs.rename(columns={"expwords":"word", "value":"tfidf_value"})
StatementMeta(7bcc356e-2304-45b0-a972-3b83d052d5ce, 5, 28, Finished, Available)
In [ ]:
top100_pol_subs.to_csv("Users/tg693/fall-2023-reddit-project-team-01/data/csv/tfidf_pol_subs.csv",index=False)
StatementMeta(7bcc356e-2304-45b0-a972-3b83d052d5ce, 5, 38, Finished, Available)
Determine the most important words in economics subreddits submissions body and title using TF-IDF¶
In [ ]:
econ_subs_body_title_words = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits)) \
.select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body).alias("nlpbodytext"))\
.alias("nlpbodytext")
# Split the nlpbodytext into an array of words
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("array_words", F.split(econ_subs_body_title_words["nlpbodytext"], " "))
# Define HashingTF
hashingTF = HashingTF(inputCol="array_words", outputCol="rawFeatures",numFeatures=40000000)
# Define IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
words = econ_subs_body_title_words.select('array_words')
# Hashing frequency
tf = hashingTF.transform(econ_subs_body_title_words)
# IDF
idf_model = idf.fit(tf)
# TFIDF
tfidf = idf_model.transform(tf)
ndf = tfidf.select(F.explode('array_words').name('expwords')).withColumn('words',F.array('expwords'))
# Define HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=40000000)
hashudf = F.udf(lambda vector : vector.indices.tolist()[0],F.StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(F.col('rawFeatures')))
wordtf = wordtf.dropDuplicates(["expwords"])
udf1 = F.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = tfidf.select(F.explode(udf1(F.col('features'))).name('wordhash','value'))
valuedf = valuedf.dropDuplicates(["wordhash"])
join_importance = wordtf.join(valuedf, wordtf.wordhash == valuedf.wordhash, 'inner').select(wordtf.expwords, valuedf.value)
top100_econ_subs = join_importance.sort(F.col("value").desc()).limit(100).toPandas()
top100_econ_subs = top100_econ_subs.reset_index(drop=True)
top100_econ_subs = top100_econ_subs.rename(columns={"expwords":"word", "value":"tfidf_value"})
StatementMeta(7bcc356e-2304-45b0-a972-3b83d052d5ce, 5, 39, Finished, Available)
In [ ]:
top100_econ_subs.to_csv("Users/tg693/fall-2023-reddit-project-team-01/data/csv/tfidf_econ_subs.csv",index=False)
StatementMeta(7bcc356e-2304-45b0-a972-3b83d052d5ce, 5, 41, Finished, Available)
Determine the most important words in politics subreddits comments body using TF-IDF¶
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
pol_comms_body_words = comments_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits)) \
.select(F.col("cleaned_body"))\
# Define HashingTF
hashingTF = HashingTF(inputCol="cleaned_body", outputCol="rawFeatures",numFeatures=40000000)
# Define IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
words = pol_comms_body_words.select('cleaned_body')
# Hashing frequency
tf = hashingTF.transform(words)
# IDF
idf_model = idf.fit(tf)
# TFIDF
tfidf = idf_model.transform(tf)
ndf = tfidf.select(F.explode('cleaned_body').name('expwords')).withColumn('words',F.array('expwords'))
# Define HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=40000000)
hashudf = F.udf(lambda vector : vector.indices.tolist()[0],F.StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(F.col('rawFeatures')))
wordtf = wordtf.dropDuplicates(["expwords"])
udf1 = F.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = tfidf.select(F.explode(udf1(F.col('features'))).name('wordhash','value'))
valuedf = valuedf.dropDuplicates(["wordhash"])
join_importance = wordtf.join(valuedf, wordtf.wordhash == valuedf.wordhash, 'inner').select(wordtf.expwords, valuedf.value)
top100_pol_comms = join_importance.sort(F.col("value").desc()).limit(100).toPandas()
top100_pol_comms = top100_pol_comms.reset_index(drop=True)
top100_pol_comms = top100_pol_comms.rename(columns={"expwords":"word", "value":"tfidf_value"})
StatementMeta(7bcc356e-2304-45b0-a972-3b83d052d5ce, 5, 42, Finished, Available)
In [ ]:
top100_pol_comms.to_csv("Users/tg693/fall-2023-reddit-project-team-01/data/csv/tfidf_pol_comms.csv",index=False)
StatementMeta(7bcc356e-2304-45b0-a972-3b83d052d5ce, 5, 43, Finished, Available)
Determine the most important words in economics subreddits comments body using TF-IDF¶
In [ ]:
from pyspark.sql.types import *
# Azure Storage Account details
connection_string = "DefaultEndpointsProtocol=https;AccountName=group01astorage63e260f45;AccountKey=iGcY4Un0hlKMMqSs6BlLhmqNU0D7m8uJyVz2din6CTAp3AvM3QPH8/Tk8k+xN77D5R3KXvJZYBwX+AStLsNR5Q==;EndpointSuffix=core.windows.net"
container_name = "azureml-blobstore-a1e50e78-9796-4cfe-a8bb-88f7de188a74"
# Initialize the BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# create the filesystem
fs = AzureMachineLearningFileSystem("azureml://subscriptions/71060237-912c-4e5c-849b-1a23626fc284/resourcegroups/project-rg/workspaces/group-01-aml/datastores/workspaceblobstore/paths/")
# Get the list of files
files = fs.glob('nlp_cleaned_teg/comments/*.parquet')
# Initialize an empty list to hold DataFrames
dfs = []
for file in files: # Skip the first file
# Get the blob client for the file
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file)
# Download the blob to a stream
with io.BytesIO() as input_blob:
blob_client.download_blob().readinto(input_blob)
input_blob.seek(0) # Go to the start of the stream
# Read the parquet file
df = pd.read_parquet(input_blob, engine='pyarrow')
dfs.append(df)
# Concatenate all DataFrames in the list
comments_cleaned = pd.concat(dfs, ignore_index=True)
# specify schema
schema = StructType([
StructField("author", StringType(), True),
StructField("body", StringType(),True),
StructField("can_gild", StringType(),True),
StructField("controversiality", IntegerType(),True),
StructField("created_utc", DateType(),True),
StructField("distinguished", StringType(),True),
StructField("edited", StringType(),True),
StructField("gilded", IntegerType(),True),
StructField("id", StringType(),True),
StructField("is_submitter", BooleanType(),True),
StructField("link_id", StringType(),True),
StructField("parent_id", StringType(),True),
StructField("permalink", StringType(),True),
StructField("retrieved_on", DateType(),True),
StructField("score", IntegerType(),True),
StructField("stickied", BooleanType(),True),
StructField("subreddit", StringType(),True),
StructField("subreddit_id", StringType(),True)])
# convert submissions pandas df to spark df
comments_cleaned=spark.createDataFrame(comments_cleaned, schema=schema)
StatementMeta(, , , Waiting, )
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
econ_comms_body_words = comments_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
.select(F.col("cleaned_body"))
print("econ_comms_body_words created")
# Define HashingTF
hashingTF = HashingTF(inputCol="cleaned_body", outputCol="rawFeatures",numFeatures=40000000)
# Define IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
# Hashing frequency
tf = hashingTF.transform(econ_comms_body_words)
print("hashingTF done")
# IDF
idf_model = idf.fit(tf)
# TFIDF
tfidf = idf_model.transform(tf)
print("idf_model done")
ndf = tfidf.select(F.explode('cleaned_body').name('expwords')).withColumn('words',F.array('expwords'))
# Define HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures",numFeatures=40000000)
hashudf = F.udf(lambda vector : vector.indices.tolist()[0],F.StringType())
wordtf = hashingTF.transform(ndf).withColumn('wordhash',hashudf(F.col('rawFeatures')))
wordtf = wordtf.dropDuplicates(["expwords"])
print("Duplicates dropped")
udf1 = F.udf(lambda vec : dict(zip(vec.indices.tolist(),vec.values.tolist())),MapType(StringType(),StringType()))
valuedf = tfidf.select(F.explode(udf1(F.col('features'))).name('wordhash','value'))
valuedf = valuedf.dropDuplicates(["wordhash"])
join_importance = wordtf.join(valuedf, wordtf.wordhash == valuedf.wordhash, 'inner').select(wordtf.expwords, valuedf.value)
top100_econ_comms = join_importance.sort(F.col("value").desc()).limit(100).toPandas()
print("top100_econ_comms created")
top100_econ_comms = top100_econ_comms.reset_index(drop=True)
top100_econ_comms = top100_econ_comms.rename(columns={"expwords":"word", "value":"tfidf_value"})
StatementMeta(, , , Waiting, )
In [ ]:
top100_econ_comms.to_csv("Users/tg693/fall-2023-reddit-project-team-01/data/csv/tfidf_econ_comms.csv",index=False)
StatementMeta(, , , Cancelled, )
LDA Topic Modeling for Politics and Economics Subreddits Submissions¶
In [ ]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import Tokenizer
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 17, Finished, Available)
Politics Subreddits¶
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
pol_subs_body_title_words = submissions_cleaned.filter(~F.col("subreddit").isin(excluded_subreddits)) \
.select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body).alias("nlpbodytext"))\
substr_to_remove = ["http", "https", "tinyurl", "com", "www", "jpg", "uploads", "delete", "remove"]
regex = "|".join(substr_to_remove)
pol_subs_body_title_words = pol_subs_body_title_words.withColumn("nlpbodytext", F.regexp_replace("nlpbodytext", regex, " "))
# Split the nlpbodytext into an array of words
pol_subs_body_title_words = pol_subs_body_title_words.withColumn("array_words", F.split(pol_subs_body_title_words["nlpbodytext"], " "))
# Create the CountVectorizer model
cv = CountVectorizer(inputCol="array_words", outputCol="features", vocabSize=10000, minDF=5)
cv_model = cv.fit(pol_subs_body_title_words)
vectorized = cv_model.transform(pol_subs_body_title_words)
# Set the number of topics and max iterations
num_topics = 5
max_iterations = 50
# Create and fit the LDA model
lda = LDA(k=num_topics, maxIter=max_iterations, featuresCol="features")
lda_model = lda.fit(vectorized)
# Get the topics and their top words
topics = lda_model.describeTopics(maxTermsPerTopic=10)
vocab = cv_model.vocabulary
# Define a function to map topic words indices to actual words
def indices_to_words(indices):
return [vocab[int(index)] for index in indices]
# Map topic words indices to actual words
indices_to_words_udf = F.udf(indices_to_words, ArrayType(StringType()))
pol_topics = topics.withColumn("topic_words", indices_to_words_udf(topics["termIndices"]))
# Show the topics and their top words
pol_topics.select("topic", "topic_words").show(truncate=False)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 23, Finished, Available)
+-----+-----------------------------------------------------------------------------+ |topic|topic_words | +-----+-----------------------------------------------------------------------------+ |0 |[widen, trump, democrat, republican, vote, say, election, bill, house, state]| |1 |[, cmv, user, conservative, people, trump, woman, ban, e, s] | |2 |[, covid, school, vaccine, news, police, new, get, say, shoot] | |3 |[people, , like, think, make, get, one, say, e, see] | |4 |[widen, we, ukraine, war, china, say, russia, border, year, desantis] | +-----+-----------------------------------------------------------------------------+
In [ ]:
pol_topics = pol_topics.toPandas()
pol_topics.to_csv("Users/tg693/fall-2023-reddit-project-team-01/data/csv/lda_pol_subs.csv",index=False)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 24, Finished, Available)
Economics Subreddits¶
In [ ]:
excluded_subreddits = ["Economics", "finance"]
# Concat body, title and filter for political subreddits
econ_subs_body_title_words = submissions_cleaned.filter(F.col("subreddit").isin(excluded_subreddits))\
.select(F.concat_ws(" ", submissions_cleaned.cleaned_title,submissions_cleaned.cleaned_body)\
.alias("nlpbodytext"))
substr_to_remove = ["http", "https", "tinyurl", "com", "www", "jpg", "uploads", "delete", "remove"]
regex = "|".join(substr_to_remove)
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("nlpbodytext", F.regexp_replace("nlpbodytext", regex, " "))
# Split the nlpbodytext into an array of words
econ_subs_body_title_words = econ_subs_body_title_words.withColumn("array_words", F.split(econ_subs_body_title_words["nlpbodytext"], " "))
# Create the CountVectorizer model
cv = CountVectorizer(inputCol="array_words", outputCol="features", vocabSize=10000, minDF=5)
cv_model = cv.fit(econ_subs_body_title_words)
vectorized = cv_model.transform(econ_subs_body_title_words)
# Set the number of topics and max iterations
num_topics = 5
max_iterations = 50
# Create and fit the LDA model
lda = LDA(k=num_topics, maxIter=max_iterations, featuresCol="features")
lda_model = lda.fit(vectorized)
# Get the topics and their top words
topics = lda_model.describeTopics(maxTermsPerTopic=10)
vocab = cv_model.vocabulary
# Define a function to map topic words indices to actual words
def indices_to_words(indices):
return [vocab[int(index)] for index in indices]
# Map topic words indices to actual words
indices_to_words_udf = F.udf(indices_to_words, ArrayType(StringType()))
econ_topics = topics.withColumn("topic_words", indices_to_words_udf(topics["termIndices"]))
# Show the topics and their top words
econ_topics.select("topic", "topic_words").show(truncate=False)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 25, Finished, Available)
+-----+----------------------------------------------------------------------------+ |topic|topic_words | +-----+----------------------------------------------------------------------------+ |0 |[we, inflation, rate, price, year, economy, market, high, bank, say] | |1 |[stock, money, make, news, get, take, say, change, life, good] | |2 |[know, view, one, question, team, finance, editorial, r, loan, un] | |3 |[, s, fintechinshorts, content, fintechnews, wp, bank, user, launch, new] | |4 |[currency, review, strength, e, weakness, th, war, russian, ukraine, dollar]| +-----+----------------------------------------------------------------------------+
In [ ]:
econ_topics = econ_topics.toPandas()
econ_topics.to_csv("Users/tg693/fall-2023-reddit-project-team-01/data/csv/lda_econ_subs.csv",index=False)
StatementMeta(17b2472e-89f2-4244-b483-22f4ee0cf696, 38, 26, Finished, Available)