-
Notifications
You must be signed in to change notification settings - Fork 0
/
BTC-Sentiment.py
221 lines (179 loc) · 6.9 KB
/
BTC-Sentiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import numpy as np
import pandas as pd
import os
import csv
from tqdm import tqdm
#For Preprocessing
import re # RegEx for removing non-letter characters
import nltk # natural language processing
nltk.download("stopwords")
nltk.download('vader_lexicon')
from nltk.corpus import stopwords
from nltk.stem.porter import *
from nltk.sentiment.vader import SentimentIntensityAnalyzer
# For Building the model
from sklearn.model_selection import train_test_split
import tensorflow as tf
import seaborn as sns
#For data visualization
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
# %matplotlib inline
pd.options.plotting.backend = "plotly"
df0 = pd.read_csv('Bitcoin_tweets.csv')
df0
df=df0[['text']][0:20000]
df
def tweet_to_words(tweet):
''' Convert tweet text into a sequence of words '''
# convert to lowercase
text = tweet.lower()
# remove non letters
text = re.sub(r"[^a-zA-Z0-9]", " ", text)
# tokenize
words = text.split()
# remove stopwords
words = [w for w in words if w not in stopwords.words("english")]
# apply stemming
words = [PorterStemmer().stem(w) for w in words]
# return list
return words
cleantext=[]
for item in tqdm(df['text']):
words=tweet_to_words(item)
cleantext+=[words]
df['cleantext']=cleantext
# df
def unlist(list):
words=''
for item in list:
words+=item+' '
return words
def compute_vader_scores(df, label):
sid = SentimentIntensityAnalyzer()
df["vader_neg"] = df[label].apply(lambda x: sid.polarity_scores(unlist(x))["neg"])
df["vader_neu"] = df[label].apply(lambda x: sid.polarity_scores(unlist(x))["neu"])
df["vader_pos"] = df[label].apply(lambda x: sid.polarity_scores(unlist(x))["pos"])
df["vader_comp"] = df[label].apply(lambda x: sid.polarity_scores(unlist(x))["compound"])
df['cleantext2'] = df[label].apply(lambda x: unlist(x))
return df
df2 = compute_vader_scores(df,'cleantext')
df2
sns.jointplot(data=df2, x='vader_pos', y='vader_neg', kind="kde")
sns.jointplot(data=df2, x='vader_pos', y='vader_neu', kind="kde")
class0=[]
for i in range(len(df2)):
if df2.loc[i,'vader_neg']>0:
class0+=[0]
elif df2.loc[i,'vader_pos']>0:
class0+=[2]
else:
class0+=[1]
df['class']=class0
df['class'].value_counts()
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
max_words = 5000
max_len=50
def tokenize_pad_sequences(text):
'''
This function tokenize the input text into sequnences of intergers and then
pad each sequence to the same length
'''
# Text tokenization
tokenizer = Tokenizer(num_words=max_words, lower=True, split=' ')
tokenizer.fit_on_texts(text)
# Transforms text to a sequence of integers
X = tokenizer.texts_to_sequences(text)
# Pad sequences to the same length
X = pad_sequences(X, padding='post', maxlen=max_len)
# return sequences
return X, tokenizer
print('Before Tokenization & Padding \n', df['cleantext2'][0])
X, tokenizer = tokenize_pad_sequences(df['cleantext2'])
print('After Tokenization & Padding \n', X[0])
print(X.shape)
y = pd.get_dummies(df['class'])
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=1)
print('Train Set: ', X_train.shape, y_train.shape)
print('Validation Set: ', X_val.shape, y_val.shape)
print('Test Set: ', X_test.shape, y_test.shape)
import tensorflow.keras.backend as K
def f1_score(precision, recall):
''' Function to calculate f1 score '''
f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
return f1_val
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Conv1D, MaxPooling1D, Bidirectional, LSTM, Dense, Dropout
from tensorflow.keras.metrics import Precision, Recall
from tensorflow.keras.optimizers import SGD
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras import datasets
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.callbacks import History
from tensorflow.keras import losses
vocab_size = 5000
embedding_size = 32
epochs = 10
learning_rate = 0.1
decay_rate = learning_rate / epochs
momentum = 0.8
sgd = tf.keras.optimizers.legacy.SGD(lr=learning_rate, momentum=momentum, decay=decay_rate, nesterov=False)
# Build model
model= Sequential()
model.add(Embedding(vocab_size, embedding_size, input_length=max_len))
model.add(Conv1D(filters=32, kernel_size=1, padding='same', activation='relu'))
model.add(MaxPooling1D(pool_size=2))
model.add(Bidirectional(LSTM(32)))
model.add(Dropout(0.4))
model.add(Dense(3, activation='softmax'))
import tensorflow as tf
tf.keras.utils.plot_model(model, show_shapes=True)
model.compile(loss='categorical_crossentropy', optimizer=sgd, metrics=['accuracy', Precision(), Recall()])
history = model.fit(X_train,y_train,validation_data=(X_val, y_val),batch_size=64,epochs=epochs,verbose=1)
# Evaluate model on the test set
loss, accuracy, precision, recall = model.evaluate(X_test, y_test, verbose=0)
# Print metrics
print('')
print('Accuracy : {:.4f}'.format(accuracy))
print('Precision : {:.4f}'.format(precision))
print('Recall : {:.4f}'.format(recall))
print('F1 Score : {:.4f}'.format(f1_score(precision, recall)))
def plot_training_hist(history):
'''Function to plot history for accuracy and loss'''
fig, ax = plt.subplots(1,2, figsize=(10,4))
# first plot
ax[0].plot(history.history['accuracy'])
ax[0].plot(history.history['val_accuracy'])
ax[0].set_title('Model Accuracy')
ax[0].set_xlabel('epoch')
ax[0].set_ylabel('accuracy')
ax[0].legend(['train', 'validation'], loc='best')
# second plot
ax[1].plot(history.history['loss'])
ax[1].plot(history.history['val_loss'])
ax[1].set_title('Model Loss')
ax[1].set_xlabel('epoch')
ax[1].set_ylabel('loss')
ax[1].legend(['train', 'validation'], loc='best')
plot_training_hist(history)
from sklearn.metrics import confusion_matrix
def plot_confusion_matrix(model, X_test, y_test):
'''Function to plot confusion matrix for the passed model and the data'''
sentiment_classes = ['Negative','Neutral', 'Positive']
# use model to do the prediction
y_pred = model.predict(X_test)
# compute confusion matrix
cm = confusion_matrix(np.argmax(y_pred, axis=1),np.argmax(np.array(y_test),axis=1))
print(pd.Series(np.argmax(np.array(y_test),axis=1)).value_counts())
print(pd.Series(np.argmax(y_pred, axis=1)).value_counts())
# plot confusion matrix
plt.figure(figsize=(8,6))
sns.heatmap(cm, cmap=plt.cm.Blues, annot=True, fmt='d',
xticklabels=sentiment_classes,
yticklabels=sentiment_classes)
plt.title('Confusion matrix', fontsize=16)
plt.xlabel('Actual label', fontsize=12)
plt.ylabel('Predicted label', fontsize=12)
plot_confusion_matrix(model, X_test, y_test)