import nltk,numpy,pandas,time,re,random
from collections import deque
from nltk.stem import WordNetLemmatizer
from nltk import pos_tag, word_tokenize
from nltk.corpus import wordnet as wn
from nltk import bigrams
ทำการอ่านข้อมูลเข้าแล้วแบ่งเป็น id,label,ข้อความ
#อ่านข้อมูล
file = open('twitter.csv', 'r',encoding='latin1')
raw_data_lines = file.readlines()
raw_data = []
file.close()
for line in raw_data_lines[1:]:
data_segment = line.split(',')
raw_data.append((int(data_segment[0]),int(data_segment[1]),','.join(data_segment[2:]).strip()))
ทำการตัดประโยคให้เป็นคำโดยใช้ TweetTokenizer
TweetCut = nltk.tokenize.TweetTokenizer()
raw_data = [(uid,label,TweetCut.tokenize(text)) for uid,label,text in raw_data]
ลบลิ้งค์ที่ขึ้นต้นด้วย http:// และ https:// ออกจากลิ้งค์
for i in range(len(raw_data)):
text = [word for word in raw_data[i][2] if not word.startswith('http://') and not word.startswith('https://')]
raw_data[i] = (raw_data[i][0],raw_data[i][1],text)
ลบการเม้นชั่นออกเพื่อไม่ให้ข้อมูลเจาะจงมากเกินไป
TwitterUserMention = re.compile(r'^@([A-Za-z0-9_]+)')
for i in range(len(raw_data)):
text = [word for word in raw_data[i][2] if not TwitterUserMention.match(word)]
raw_data[i] = (raw_data[i][0],raw_data[i][1],text)
จะอธิบายหลักการใน slide
## สร้างพจนานุกรมสำหรับเปรียบเทียบ
WordDictionary = set([word for word,sound in nltk.corpus.cmudict.entries()])
## ประกาศฟังขันสำหรับงานทำตัว String ซ้ำให้ง่าย
def is_multiple_character(text):
pattern = re.compile(r"(.)\1{2,}")
return len(pattern.findall(text)) > 0
def compress_list(strData):
result = []
count = 1
prevChar = strData[0]
while len(strData) != 0:
if prevChar == strData[0]:
count += 1
else:
result.append((prevChar,count))
prevChar = strData[0]
count = 1
strData = strData[1:]
result.append((prevChar,count))
return result
def generate_from_compress(compress_data,state = 0):
character,count = compress_data[state]
buffer_word = []
word = ''
if state < len(compress_data) -1:
next_word = generate_from_compress(compress_data,state+1)
while count >= 1:
word = character*count
for next_word_data in next_word:
buffer_word.append(word+next_word_data)
count -= 1
else:
while count >= 1:
buffer_word.append(character*count)
count -= 1
return buffer_word
def generate_all_compress(compress_data):
should_build = [(c,2 if i > 2 else i) for c,i in compress_data]
test_text = generate_from_compress(should_build,0)
return test_text
def eval_from_compress(strList):
for data in strList:
if data.lower() in WordDictionary:
return data
return strList[-1]
def multiple_character_simplify(test_str):
return eval_from_compress(generate_all_compress(compress_list(list(test_str))))
## นำตัวอักษรซ้ำออก
for i in range(len(raw_data)):
uid,label,text = raw_data[i]
for j in range(len(text)):
if is_multiple_character(text[j]):
text[j] = multiple_character_simplify(text[j])
raw_data[i] = (uid,label,text)
ทำการแทนที่คำย่อภาษา SMS ด้วยคำเต็ม https://en.wikipedia.org/wiki/SMS_language
## ประกาศภาษา sms
sms_language_dictionary = {
# Wikipedia SMS laguange Top Frequnecy Word
'afaik': ['as','far','as','i','know'],
'afk': ['away','from','keyboard'],
'thnx': ['thank'],
'thx': ['thank'],
'thxs': ['thank'],
'thxx': ['thank'],
'idk': ['i','don\'t','know'],
'np': ['no','problem'],
'jsyk': ['just','so','you','know'],
'idc': ['i','don\'t','care'],
'atm': ['at','the','moment'],
'wyd': ['what','are','you','doing'],
'wya': ['where','are','you','at'],
'btw': ['by','the','way'],
'asap': ['as','soon','as','possible'],
'ftw': ['for','the','win'],
'msg': ['message'],
'plz': ['please'],
'ttyl': ['talk','to','you','later'],
'ilu': ['i','love','you'],
'ily': ['i','love','you'],
'bc': ['because'],
'fyi': ['for','your','information'],
'imo': ['in','my','opinion'],
'imho': ['in','my','honest','opinion'],
'bf': ['boyfriend'],
'gf': ['girlfriend'],
'bff': ['best','friend','forever'],
'stfu': ['shut','the','fuck','up'],
'wth': ['what','the','hell'],
'wtf': ['what','the','fuck'],
'jk': ['just','kidding'],
'gtfo': ['get','the','fuck','out'],
'otw': ['on','the','way'],
'yw': ['you\'re','welcome'],
'tou': ['thinking','of','you'],
# custom addition
'b4': ['before'],
'gr8': ['great'],
'ur': ['your'],
'u': ['you'],
'n': ['and'],
# Emoji
'omg': ['oh','my','god'],
'omfg': ['oh','my','fucking','god'],
'lol': ['laughing','out','loud'],
'rofl': ['rolling','on','the','floor','laughing'],
'xoxo': ['hugs','and','kisses'],
'smh': ['shaking','my','head'],
'lmao': ['laughing','my','ass','off']
}
## แปลงภาษา sms
for i in range(len(raw_data)):
uid,label,text = raw_data[i]
j = 0
while j < len(text):
if text[j].lower() in sms_language_dictionary:
text = text[:j] + sms_language_dictionary[text[j].lower()] + text[j+1:]
j += 1
raw_data[i] = (uid,label,text)
เปลี่ยน hashtag ให้เป็นข้อความเพื่อนำมาใช้เป็นฟีเจอร์ หลักการทำงานโดยรวม เขียนโดยคุณ Vee Satayamas และอธิบายเพิ่มใน slide
## ตรวจว่าเป็น hashtag หรือไหม
def is_hashtag(text):
pattern = re.compile(r"\B#\w*[a-zA-Z]+\w")
return pattern.match(text)
#สร้าง WordGraph สำหรับค้นหาคำ
def createWordGraph(sentense):
graph = []
i = 0
sentense_length = len(sentense)
for i in range(sentense_length):
graph.append({'index':i,'next':[]})
j = 0
for j in range(i,sentense_length+1):
if i+1 != j and sentense[i:j].lower() in WordDictionary:
graph[i]['next'].append(j)
if len(graph[i]['next']) == 0:
graph[i]['next'].append(i+1)
graph.append({'index':i+1,'next':[],'finish':True})
return graph
#อัลกอริทึมสำหรับเส้นทางที่สั้นที่สุด โดยใช้ [SPFA](https://en.wikipedia.org/wiki/Shortest_Path_Faster_Algorithm)
def findShortestPath(graph):
out = []
queue = deque()
visited = set()
queue.append(0)
graph[0]['dist'] = 0
while len(queue) > 0:
u = queue.popleft()
for v in graph[u]['next']:
if 'dist' not in graph[v] or graph[u]['dist'] + 1 < graph[v]['dist']:
graph[v]['prev'] = graph[u]['index']
graph[v]['dist'] = graph[u]['dist']+1
if v not in visited:
if len(queue) > 0 and graph[queue[0]]['dist'] > graph[v]['dist']:
queue.appendleft(v)
else:
queue.append(v)
visited.add(v)
index = len(graph)-1
while index != 0:
out.insert(0,index)
index = graph[index]['prev']
return out
## ตัดคำจาก path ที่ได้สร้างขึ้น
def splitByPath(sentense,path):
wordbreaker = set(['_',' ','-',':',',',';','#'])
path.insert(0,0)
out = []
prebuilt = ''
for i in range(1,len(path)):
cWord = sentense[path[i-1]:path[i]]
if path[i] - path[i-1] == 1:
if cWord not in wordbreaker:
prebuilt += cWord
elif prebuilt != '':
out.append(prebuilt)
prebuilt = ''
else:
if prebuilt != '':
if len(out) == 0 or (prebuilt.isnumeric() and not out[-1].isnumeric()) :
out.append(prebuilt)
else:
out[-1] += prebuilt
prebuilt = ''
out.append(cWord)
if prebuilt != '':
out.append(prebuilt)
return out
def hashtag_tokenize(sentense):
wordGraph = createWordGraph(sentense)
shortestPath = findShortestPath(wordGraph)
return splitByPath(sentense,shortestPath)
## ตัด hashtag ให้เป็นข้อความเพื่อนำไปใช้
test_data = []
for i in range(len(raw_data)):
uid,label,text = raw_data[i]
j = 0
while j < len(text):
if is_hashtag(text[j]):
#print(text)
text = text[:j] + hashtag_tokenize(text[j][1:]) + text[j+1:]
#print(text)
j += 1
raw_data[i] = (uid,label,text)
ทำการเรียงสับเปลี่ยนข้อมูล จากนั้นข้อมูล 100000 records จะแบ่งเป็น training set 80000, devset 10000
## เรียงสับเปลี่ยนข้อมูล
random.shuffle(raw_data)
## แบ่งข้อมูล
training_set = raw_data[:80000]
devtest_set = raw_data[80000:90000]
test_set = raw_data[90000:]
def feature_extractor_type01(text):
features = {}
for word in text:
if word in features:
features[word] += 1
else:
features[word] = 1
return features
### Trianing
training_featureset = [(feature_extractor_type01(text),label) for uid,label,text in training_set]
devtest_featureset = [(feature_extractor_type01(text),label) for uid,label,text in devtest_set]
classifier = nltk.NaiveBayesClassifier.train(training_featureset)
ประเมินความถูกต้องของวิธีที่ 1
### AccurencyTest
accuracy = nltk.classify.accuracy(classifier, devtest_featureset)
print(accuracy)
### ConfusetionMatrix
devtest_expect = [label for feature,label in devtest_featureset]
devtest_actually = [classifier.classify(feature) for feature,label in devtest_featureset]
print(nltk.ConfusionMatrix(devtest_expect, devtest_actually).pretty_format(sort_by_count=True, show_percents=True, truncate=9))
ทำ Error Analysis ของวิธีที่ 1 สังเกตผลลัพธ์ท่ออกมา ว่า words ซึ่งจะเป็นคนละคำกับคำว่า word จึงควรทำ lemmarization
## แสดงผลลัพธ์ที่พลาด 10 ครั้งแรก
count_error = 0
count = 0
while count_error < 10 and count < 10000:
if devtest_expect[count] != devtest_actually[count]:
print(devtest_set[count])
count_error += 1
count += 1
Lemmatizer = nltk.stem.WordNetLemmatizer()
def feature_extractor_type02(text):
features = {}
for word in text:
word = Lemmatizer.lemmatize(word.lower())
if word in features:
features[word] += 1
else:
features[word] = 1
return features
### Trianing
training_featureset = [(feature_extractor_type02(text),label) for uid,label,text in training_set]
devtest_featureset = [(feature_extractor_type02(text),label) for uid,label,text in devtest_set]
classifier = nltk.NaiveBayesClassifier.train(training_featureset)
ประเมินความถูกต้องของวิธีที่ 2
### AccurencyTest
accuracy = nltk.classify.accuracy(classifier, devtest_featureset)
print(accuracy)
### ConfusetionMatrix
devtest_expect = [label for feature,label in devtest_featureset]
devtest_actually = [classifier.classify(feature) for feature,label in devtest_featureset]
print(nltk.ConfusionMatrix(devtest_expect, devtest_actually).pretty_format(sort_by_count=True, show_percents=True, truncate=9))
ทำ Error Analysis ของวิธีที่ 2 สังเกตผลลัพธ์ท่ออกมา ว่า going กับคำว่า go เป็นคนละคำกัน ซึ่งควรจะเป็นคำเดียวกันจึงเปลี่ยนจาก lemmarization เป็นการทำ stemming แทน
## แสดงผลลัพธ์ที่พลาด 10 ครั้งแรก
count_error = 0
count = 0
while count_error < 10 and count < 10000:
if devtest_expect[count] != devtest_actually[count]:
print(devtest_featureset[count])
count_error += 1
count += 1
Stemmer = nltk.stem.snowball.SnowballStemmer("english")
def feature_extractor_type03(text):
features = {}
for word in text:
word = Stemmer.stem(word.lower())
if word in features:
features[word] += 1
else:
features[word] = 1
return features
### Trianing
training_featureset = [(feature_extractor_type03(text),label) for uid,label,text in training_set]
devtest_featureset = [(feature_extractor_type03(text),label) for uid,label,text in devtest_set]
classifier = nltk.NaiveBayesClassifier.train(training_featureset)
ประเมินความถูกต้องของวิธีที่ 3
### AccurencyTest
accuracy = nltk.classify.accuracy(classifier, devtest_featureset)
print(accuracy)
### ConfusetionMatrix
devtest_expect = [label for feature,label in devtest_featureset]
devtest_actually = [classifier.classify(feature) for feature,label in devtest_featureset]
print(nltk.ConfusionMatrix(devtest_expect, devtest_actually).pretty_format(sort_by_count=True, show_percents=True, truncate=9))
ทำ Error Analysis ของวิธีที่ 3 พบว่าได้คล้ายกับวิธีที่ 2 และค่าความแม่นยำตำลง จึงเลือกใช้ lemmatizer แทน
Lemmatizer = nltk.stem.WordNetLemmatizer()
def feature_extractor_type04(text):
features = {}
text = [Lemmatizer.lemmatize(word.lower()) for word in text]
for word in list(nltk.bigrams(text)):
if word in features:
features[word] += 1
else:
features[word] = 1
return features
### Trianing
training_featureset = [(feature_extractor_type04(text),label) for uid,label,text in training_set]
devtest_featureset = [(feature_extractor_type04(text),label) for uid,label,text in devtest_set]
classifier = nltk.NaiveBayesClassifier.train(training_featureset)
ประเมินความถูกต้องของวิธีที่ 4
### AccurencyTest
accuracy = nltk.classify.accuracy(classifier, devtest_featureset)
print(accuracy)
### ConfusetionMatrix
devtest_expect = [label for feature,label in devtest_featureset]
devtest_actually = [classifier.classify(feature) for feature,label in devtest_featureset]
print(nltk.ConfusionMatrix(devtest_expect, devtest_actually).pretty_format(sort_by_count=True, show_percents=True, truncate=9))
ทำ Error Analysis ของวิธีที่ 4 พยว่าความแม่นยำเพิ่มขึ้น
def feature_extractor_type05(text):
features = {}
text = [Lemmatizer.lemmatize(word.lower()) for word in text]
for word in list(nltk.trigrams(text)):
if word in features:
features[word] += 1
else:
features[word] = 1
return features
### Trianing
training_featureset = [(feature_extractor_type05(text),label) for uid,label,text in training_set]
devtest_featureset = [(feature_extractor_type05(text),label) for uid,label,text in devtest_set]
classifier = nltk.NaiveBayesClassifier.train(training_featureset)
ประเมินความถูกต้องของวิธีที่ 5
### AccurencyTest
accuracy = nltk.classify.accuracy(classifier, devtest_featureset)
print(accuracy)
### ConfusetionMatrix
devtest_expect = [label for feature,label in devtest_featureset]
devtest_actually = [classifier.classify(feature) for feature,label in devtest_featureset]
print(nltk.ConfusionMatrix(devtest_expect, devtest_actually).pretty_format(sort_by_count=True, show_percents=True, truncate=9))
พบกว่าการใช้ trigram แล้วผลลัพธ์แย่กว่าใช้ bigram
def penn_to_wn(tag):
if tag.startswith('J'):
return wn.ADJ
elif tag.startswith('N'):
return wn.NOUN
elif tag.startswith('R'):
return wn.ADV
elif tag.startswith('V'):
return wn.VERB
return None
def feature_extractor_type06(text):
features = {}
synsets = []
tagged = pos_tag(text)
for token in tagged:
wn_tag = penn_to_wn(token[1])
if not wn_tag:
continue
lemma = Lemmatizer.lemmatize(token[0], pos=wn_tag)
current_synset = wn.synsets(lemma, pos=wn_tag)
if len(current_synset) == 0:
continue
synsets.append(current_synset[0].name())
for word in synsets:
if word in features:
features[word] += 1
else:
features[word] = 1
return features
### Trianing
training_featureset = [(feature_extractor_type06(text),label) for uid,label,text in training_set]
devtest_featureset = [(feature_extractor_type06(text),label) for uid,label,text in devtest_set]
classifier = nltk.NaiveBayesClassifier.train(training_featureset)
ประเมินความถูกต้องของวิธีที่ 6
### AccurencyTest
accuracy = nltk.classify.accuracy(classifier, devtest_featureset)
print(accuracy)
### ConfusetionMatrix
devtest_expect = [label for feature,label in devtest_featureset]
devtest_actually = [classifier.classify(feature) for feature,label in devtest_featureset]
print(nltk.ConfusionMatrix(devtest_expect, devtest_actually).pretty_format(sort_by_count=True, show_percents=True, truncate=9))
def feature_extractor_type07(text):
features = {}
synsets = []
tagged = pos_tag(text)
for token in tagged:
wn_tag = penn_to_wn(token[1])
if not wn_tag:
continue
lemma = Lemmatizer.lemmatize(token[0], pos=wn_tag)
current_synset = wn.synsets(lemma, pos=wn_tag)
if len(current_synset) == 0:
continue
synsets.append(current_synset[0].name())
for word in list(bigrams(synsets)):
if word in features:
features[word] += 1
else:
features[word] = 1
return features
### Trianing
training_featureset = [(feature_extractor_type07(text),label) for uid,label,text in training_set]
devtest_featureset = [(feature_extractor_type07(text),label) for uid,label,text in devtest_set]
classifier = nltk.NaiveBayesClassifier.train(training_featureset)
ประเมินความถูกต้องของวิธีที่ 7
### AccurencyTest
accuracy = nltk.classify.accuracy(classifier, devtest_featureset)
print(accuracy)
### ConfusetionMatrix
devtest_expect = [label for feature,label in devtest_featureset]
devtest_actually = [classifier.classify(feature) for feature,label in devtest_featureset]
print(nltk.ConfusionMatrix(devtest_expect, devtest_actually).pretty_format(sort_by_count=True, show_percents=True, truncate=9))
### Trianing
training_featureset = [(feature_extractor_type04(text),label) for uid,label,text in training_set]
testset_featureset = [(feature_extractor_type04(text),label) for uid,label,text in test_set]
classifier = nltk.NaiveBayesClassifier.train(training_featureset)
### AccurencyTest
accuracy = nltk.classify.accuracy(classifier, testset_featureset)
print(accuracy)
### ConfusetionMatrix
devtest_expect = [label for feature,label in testset_featureset]
devtest_actually = [classifier.classify(feature) for feature,label in testset_featureset]
print(nltk.ConfusionMatrix(devtest_expect, devtest_actually).pretty_format(sort_by_count=True, show_percents=True, truncate=9))