Skip to content

Commit

Permalink
Merge pull request #7 from SpikeTheMaster/master
Browse files Browse the repository at this point in the history
Add Amazon and Steam parsing changes
  • Loading branch information
oliver006 committed Jan 29, 2015
2 parents d8faeea + 62ed335 commit 0301cb8
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 13 deletions.
52 changes: 41 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ Elasticsearch For Beginners: Indexing your Gmail Inbox
=======================


#### What's this all about?

#### What's this all about?

I recently looked at my Gmail inbox and noticed that I have well over 50k emails, taking up about 12GB of space but there is no good way to tell what emails take up space, who sent them to, who emails me, etc

Expand All @@ -15,11 +16,11 @@ __Related tutorial:__ [Index and Search Hacker News using Elasticsearch and the

Set up [Elasticsearch](http://ohardt.us/es-install) and make sure it's running at [http://localhost:9200](http://localhost:9200)

I use Python and [Tornado](https://github.com/tornadoweb/tornado/) for the scripts to import and query the data. Run `pip install tornado` to install Tornado.
I use Python and [Tornado](https://github.com/tornadoweb/tornado/) for the scripts to import and query the data. Run `pip install tornado chardet` to install Tornado and chardet.



#### Aight, where do we start?
#### Aight, where do we start?

First, go [here](http://ohardt.us/download-gmail-mailbox) and download your Gmail mailbox, depending on the amount of emails you have accumulated this might take a while.

Expand Down Expand Up @@ -100,7 +101,7 @@ for part in parts:

##### Index the data with Elasticsearch

The most simple aproach is a PUT request per item:
The most simple approach is a PUT request per item:

```python
def upload_item_to_es(item):
Expand All @@ -109,12 +110,12 @@ def upload_item_to_es(item):
response = yield http_client.fetch(request)
if not response.code in [200, 201]:
print "\nfailed to add item %s" % item['message-id']

```

However, Elasticsearch provides a better method for importing large chunks of data: [bulk indexing](http://ohardt.us/es-bulk-indexing)
Instead of making a HTTP request per document and indexing individually, we batch them in chunks of eg. 1000 documents and then index them.<br>
Bulk messages are of the format:
Bulk messages are of the format:

```
cmd\n
Expand Down Expand Up @@ -195,11 +196,9 @@ You can also quickly query for certain fields via the `q` parameter. This exampl
curl "localhost:9200/gmail/email/_search?pretty&q=from:ship-confirm@amazon.com"
```



##### Aggregation queries

Aggregation queries let us bucket data by a given key and count the number of messages per bucket.
Aggregation queries let us bucket data by a given key and count the number of messages per bucket.
For example, number of messages grouped by recipient:

```
Expand Down Expand Up @@ -255,7 +254,7 @@ Result:
"doc_count" : 4285
}, { "key" : "unread",
"doc_count" : 510
},
},
...
]
}
Expand All @@ -269,7 +268,7 @@ curl -s "localhost:9200/gmail/email/_search?pretty&search_type=count" -d '
"years": {
"date_histogram": {
"field": "date_ts", "interval": "year"
}}}}
}}}}
'
```

Expand All @@ -296,6 +295,37 @@ Result:
}
```

Write aggregation queries to work out how much you spent on Amazon/Steam:

```
GET _search
{
"query": {
"match_all": {}
},
"size": 0,
"aggs": {
"group_by_company": {
"terms": {
"field": "order_details.merchant"
},
"aggs": {
"total_spent": {
"sum": {
"field": "order_details.order_total"
}
},
"postage": {
"sum": {
"field": "order_details.postage"
}
}
}
}
}
}
```


#### Todo

Expand Down
81 changes: 81 additions & 0 deletions src/AmazonEmailParser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import json
import re

class AmazonEmailParser(object):

def __init__(self):
self.orderTotalRE = re.compile(r"(?<=Order Total:) (?:.*?)(\d+.\d+)")
self.postageRE = re.compile(r"(?<=Postage & Packing:) (?:.*?)(\d+.\d+)")
self.deliveryRE = re.compile(r"(?<=Delivery & Handling::) (?:.*?)(\d+.\d+)")
self.orderItemsRE = re.compile(r"==========\r\n\r\n")
self.costRE = re.compile(r"(\d+\.\d+)")

def canParse(self, email):
try:
if 'auto-confirm@amazon' in email['from']:
return True
else:
return False
except:
return False

def parse(self, email):
body = email['body']

if 'Order Confirmation' in body:
postage = 0
orderTotal = 0

result = re.search(self.orderTotalRE, body)

if result:
orderTotal = float(result.groups()[0])

result = re.search(self.postageRE, body)

if result:
postage = float(result.groups()[0])
else:
result = re.search(self.deliveryRE, body)
if result:
postage = float(result.groups()[0])

email['order_details'] = {
"order_items" : [],
"order_total" : orderTotal,
"postage" : postage,
"merchant" : "amazon"
}

orders = re.split(self.orderItemsRE, body)[1]
orders = orders.split('\r\n\r\n')

#Remove first and last 3 items
orders.pop(0)
orders.pop()
orders.pop()
orders.pop()

costTotal = orderTotal

for item in orders:
if 'Your estimated delivery date is:' in item or 'Your order will be sent to:' in item:
continue
else:
lines = item.replace('_','').split('\r\n')
if len(lines) < 4:
continue
itemName = lines[0].strip()
cost = float(re.search(self.costRE, lines[1].strip()).groups()[0])
condition = lines[2].rpartition(':')[2].strip()
seller = lines[3].replace('Sold by', '').strip()

email['order_details']['order_items'].append({"item":itemName, "cost":cost, "condition": condition, "seller": seller})
costTotal -= cost

if costTotal != 0:
print "Warning order not parsed correctly, order items may be missing, or promotion may have been applied."
print email['order_details']
print body

return email
11 changes: 11 additions & 0 deletions src/DelegatingEmailParser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
class DelegatingEmailParser(object):

def __init__(self, parsers):
self.parsers = parsers

def parse(self, email):
for parser in self.parsers:
if parser.canParse(email):
return parser.parse(email)

return email
61 changes: 61 additions & 0 deletions src/SteamEmailParser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
import re

class SteamEmailParser(object):

def __init__(self):
self.orderTotalRE = re.compile(r"(?<=Total:)[ \t]+(\d+.\d+)")
self.orderItemsRE = re.compile(r"(?:\.\r\n)+")
self.costRE = re.compile(r"(\d+\.\d+)")

def canParse(self, email):
try:
if 'noreply@steampowered.com' in email['from']:
return True
else:
return False
except:
return False

def parse(self, email):
body = email['body']

if 'Thank you' in email['subject'] and 'purchase' in body:
orderTotal = 0

result = re.search(self.orderTotalRE, body)

if result:
orderTotal = float(result.groups()[0])

email['order_details'] = {
"order_items" : [],
"order_total" : orderTotal,
"merchant" : "steam"
}

order = re.split(self.orderItemsRE, body)[2].split('\r\n') #This parser to get order total is currently broken, gift purchases are not parsed

costTotal = orderTotal

costTotal = orderTotal

for item in order:
if '-------' in item:
break
else:
if item == '' or ': ' not in item:
continue
splitResult = item.rpartition(':')
itemName = splitResult[0].strip()
cost = float(re.match(self.costRE, splitResult[2].strip()).groups()[0])

email['order_details']['order_items'].append({"item":itemName, "cost":cost})
costTotal -= cost

if costTotal != 0:
print "Warning order not parsed correctly, order items may be missing, or promotion may have been applied."
print email['order_details']
print body

return email
11 changes: 9 additions & 2 deletions src/index_emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
import email.utils
import mailbox
import email
import quopri
import chardet
from DelegatingEmailParser import DelegatingEmailParser
from AmazonEmailParser import AmazonEmailParser
from SteamEmailParser import SteamEmailParser
import logging

http_client = HTTPClient()
Expand All @@ -19,13 +24,12 @@ def delete_index():
try:
url = "%s/%s?refresh=true" % (tornado.options.options.es_url, tornado.options.options.index_name)
request = HTTPRequest(url, method="DELETE", request_timeout=240)
body = {"refresh": True}
response = http_client.fetch(request)
logging.info('Delete index done %s' % response.body)
except:
pass



def create_index():

schema = {
Expand Down Expand Up @@ -135,6 +139,9 @@ def load_from_file():
upload_data = list()
logging.info("Starting import from file %s" % tornado.options.options.infile)
mbox = mailbox.UnixMailbox(open(tornado.options.options.infile, 'rb'), email.message_from_file)

emailParser = DelegatingEmailParser([AmazonEmailParser(), SteamEmailParser()])

for msg in mbox:
count += 1
if count < tornado.options.options.skip:
Expand Down

0 comments on commit 0301cb8

Please sign in to comment.