-
Notifications
You must be signed in to change notification settings - Fork 3
/
censusmunger.py
116 lines (104 loc) · 3.42 KB
/
censusmunger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import csv
import xml.etree.ElementTree as ET
import sys
import json
import urlgrabber
import xlrd
class DataMunger (object):
"Categorize locations and data into census tract level data for display"
def __init__(self, inp, out):
self.wb=xlrd.open_workbook(inp)
self.sh=self.wb.sheet_by_index(0)
self.fipslist=[]
self.inp = inp
self.out = out
# Assumes header with column headings
nvars=self.sh.ncols
#in cell form
headers=self.sh.row(0)
col=0
varlist=[]
for vars in headers:
varlist.append(vars.value)
#pull out fields:
#This could be made more flexible...
if vars.value == 'Address':
self.addressfield=col
elif vars.value == 'City':
self.cityfield=col
elif vars.value == 'Zip':
self.zipfield=col
elif vars.value == 'income':
self.incomefield=col
col=col+1
def getfips(self):
#skip header row, go through rest of data
#This block of code attaches FIPS to addresses
for rownum in range(self.sh.nrows)[1:self.sh.nrows]:
address=self.sh.row_values(rownum)[self.addressfield]+","
# Hard coding in Massachusetts!
city=self.sh.row_values(rownum)[self.cityfield]+", Ma"
zipcode=self.sh.row_values(rownum)[self.zipfield]
buildurl='http://rpc.geocoder.us/service/csv?address='+address+'+'+city+'+'+zipcode
# get rid of ridiculous unicode nonbreaking spaces and all spaces
buildurl=buildurl.replace(u'\xa0', u'').replace(' ','+')
# switch type to string
burlstr=buildurl.encode('ascii','ignore')
print burlstr
outp=urlgrabber.urlread(burlstr)
# If address not resolved, skip it, assign 999999 tract code:
if outp != "2: couldn't find this address! sorry":
lat = outp.split(",")[0]
lon = outp.split(",")[1]
buildcensurl = 'http://data.fcc.gov/api/block/2010/find?latitude='+lat+'&longitude='+lon
outblock = urlgrabber.urlread(buildcensurl)
e = ET.fromstring(outblock)
block = e.find('{http://data.fcc.gov/api}Block')
fipstract = block.attrib['FIPS'][0:11]
else:
fipstract='99999999999'
self.fipslist.append(fipstract)
def aggregate(self):
countarray={}
incomearray={}
for rownum in range(self.sh.nrows)[1:self.sh.nrows]:
tidx=rownum-1
income=self.sh.row_values(rownum)[self.incomefield]
tract=str(self.fipslist[tidx])
if countarray.has_key(tract):
countarray[tract] = countarray[tract] + 1
incomearray[tract] = incomearray[tract] + income
else:
countarray[tract] = 1
incomearray[tract] = income
tractunique = set(self.fipslist)
def makecsv(self):
with open(self.out, 'wb') as csvfile:
outwriter = csv.writer(csvfile, delimiter = ',', quotechar = '|', quoting = csv.QUOTE_MINIMAL)
outwriter.writerow(['Tract', 'TotalIncome', 'Count'])
for s in tractunique:
outwriter.writerow([s,incomearray[s],countarray[s]])
def makejson(self):
return json.dumps(self.makedict())
def makedict(self):
'actual magic was in fact used in the making of this'
return {x : {incomearray[x], countarray[x]} for x in tractunique}
def _loadcsv(self, incsv):
"DO NOT USE!"
with open(incsv, 'rb') as f:
l = [x for x in csv.reader(f)]
headers = l[0]
return {i[0]: {headers[1]: i[1], headers[2]: i[2]}for i in l[1:]}
def dowork(self):
fips = self.getfips()
mkdata = self.aggregate()
if __name__ == '__main__':
if len(sys.argv) < 2:
#test data
inp = 'CADt2t.xlsx'
out = 'tractdata.csv'
else:
inp = sys.argv[1]
out = sys.argv[2]
munger = DataMunger(inp, out)
munger.dowork()