-
Notifications
You must be signed in to change notification settings - Fork 31
/
load_tweets.py
180 lines (154 loc) · 6.77 KB
/
load_tweets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python
# encoding: utf-8
"""
load_tweets.py
Created by Hilary Mason on 2010-04-25.
Copyright (c) 2010 Hilary Mason. All rights reserved.
"""
import sys, os
import datetime
import subprocess
import pickle
import pymongo
import tweepy # Twitter API class: http://github.com/joshthecoder/tweepy
from lib import mongodb
from lib import klout
from classifiers.classify_tweets import *
import settings # local app settings
class loadTweets(object):
DB_NAME = 'tweets'
USER_COLL_NAME = 'users'
def __init__(self, debug=False):
self.debug = debug
self.db = mongodb.connect(self.DB_NAME)
auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET)
auth.set_access_token(settings.ACCESS_KEY, settings.ACCESS_SECRET)
self.api = tweepy.API(auth)
last_tweet_id = self.get_last_tweet_id()
try:
self.fetchTweets(last_tweet_id)
except tweepy.error.TweepError: # authorization failure
print "You need to authorize tc to connect to your twitter account. I'm going to open a browser. Once you authorize, I'll ask for your PIN."
auth = self.setup_auth()
self.api = tweepy.API(auth)
self.fetchTweets(last_tweet_id)
self.classify_tweets()
def get_last_tweet_id(self):
for r in self.db[self.DB_NAME].find(fields={'id': True}).sort('id',direction=pymongo.DESCENDING).limit(1):
return r['id']
def fetchTweets(self, since_id=None):
if since_id:
tweets = self.api.home_timeline(since_id, count=500)
else:
tweets = self.api.home_timeline(count=500)
# parse each incoming tweet
ts = []
authors = []
for tweet in tweets:
t = {
'author': tweet.author.screen_name,
'contributors': tweet.contributors,
'coordinates': tweet.coordinates,
'created_at': tweet.created_at,
# 'destroy': tweet.destroy,
# 'favorite': tweet.favorite,
'favorited': tweet.favorited,
'geo': tweet.geo,
'id': tweet.id,
'in_reply_to_screen_name': tweet.in_reply_to_screen_name,
'in_reply_to_status_id': tweet.in_reply_to_status_id,
'in_reply_to_user_id': tweet.in_reply_to_user_id,
# 'parse': tweet.parse,
# 'parse_list': tweet.parse_list,
'place': tweet.place,
# 'retweet': dir(tweet.retweet),
# 'retweets': dir(tweet.retweets),
'source': tweet.source,
# 'source_url': tweet.source_url,
'text': tweet.text,
'truncated': tweet.truncated,
'user': tweet.user.screen_name,
}
u = {
'_id': tweet.author.screen_name, # use as mongo primary key
'contributors_enabled': tweet.author.contributors_enabled,
'created_at': tweet.author.created_at,
'description': tweet.author.description,
'favourites_count': tweet.author.favourites_count, # beware the british
'follow_request_sent': tweet.author.follow_request_sent,
'followers_count': tweet.author.followers_count,
'following': tweet.author.following,
'friends_count': tweet.author.friends_count,
'geo_enabled': tweet.author.geo_enabled,
'twitter_user_id': tweet.author.id,
'lang': tweet.author.lang,
'listed_count': tweet.author.listed_count,
'location': tweet.author.location,
'name': tweet.author.name,
'notifications': tweet.author.notifications,
'profile_image_url': tweet.author.profile_image_url,
'protected': tweet.author.protected,
'statuses_count': tweet.author.statuses_count,
'time_zone': tweet.author.time_zone,
'url': tweet.author.url,
'utc_offset': tweet.author.utc_offset,
'verified': tweet.author.verified,
'_updated': datetime.datetime.now(),
}
authors.append(u)
ts.append(t)
self.update_authors(authors)
# insert into db
try:
self.db[self.DB_NAME].insert(ts)
except pymongo.errors.InvalidOperation: # no tweets?
pass
if self.debug:
print "added %s tweets to the db" % (len(ts))
def update_authors(self, authors):
k = klout.KloutAPI(settings.KLOUT_API_KEY)
update_count = 0
for user in authors:
records = [r for r in self.db[self.USER_COLL_NAME].find(spec={'_id': user['_id']})]
if not records or abs(records[0]['_updated'] - datetime.datetime.now()) >= datetime.timedelta(1): # update once per day
kwargs = { 'users': user['_id'] }
try:
response = k.call('klout', **kwargs)
user['klout_score'] = response['users'][0]['kscore']
except klout.KloutError: # probably a 404
pass
self.db[self.USER_COLL_NAME].remove({'_id': user['_id']})
self.db[self.USER_COLL_NAME].insert(user)
update_count += 1
if self.debug:
print "updated %s users in the db" % (update_count)
def classify_tweets(self):
classifiers = []
for active_classifier in active_classifiers:
c = globals()[active_classifier]()
classifiers.append(c)
for r in self.db[self.DB_NAME].find(spec={'topics': {'$exists': False } },fields={'text': True, 'user': True}): # for all unclassified tweets
topics = {}
for c in classifiers:
(topic, score) = c.classify(r['text'])
topics[topic] = score
self.db[self.DB_NAME].update({'_id': r['_id']}, {'$set': {'topics': topics }})
# util classes
def setup_auth(self):
"""
setup_auth: authorize tc with oath
"""
auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET)
auth_url = auth.get_authorization_url()
p = subprocess.Popen("open %s" % auth_url, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
print "( if the browser fails to open, please go to: %s )" % auth_url
verifier = raw_input("What's your PIN: ").strip()
auth.get_access_token(verifier)
pickle.dump((auth.access_token.key, auth.access_token.secret), open('settings_twitter_creds','w'))
return auth
def init_twitter(self, username, password):
auth = tweepy.BasicAuthHandler(username, password)
api = tweepy.API(auth)
return api
if __name__ == '__main__':
l = loadTweets(debug=True)