-
Notifications
You must be signed in to change notification settings - Fork 2
/
load_data.py
135 lines (118 loc) · 4.37 KB
/
load_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import glob
import json
import os
import tempfile
import urllib
import zipfile
import addfips
import electiondata as e
import geopandas
import pandas as pd
import requests
import tqdm.auto as tqdm
import us
from permacache import permacache
from py_essentials import hashing
def block_to_zcta():
with zipfile.ZipFile("../census_downloader/outputs/block_to_zcta_2020.zip") as zf:
with zf.open("block_to_zcta_2020.json") as f:
return json.load(f)
@permacache(
"population_density/load_blocks_2",
key_function=dict(path=lambda path: hashing.fileChecksum(path, "sha256")),
)
def load_blocks(path):
result = pd.read_csv(path)
blocks = result[(result.BLOCK == result.BLOCK) & (result.POP100 > 0)].copy()
blocks.COUNTY = blocks.COUNTY.apply(lambda x: f"{int(x):03d}")
blocks["FIPS"] = (
blocks.STUSAB.apply(lambda x: us.states.lookup(x).fips) + blocks.COUNTY
)
btz = block_to_zcta()
blocks["ZCTA"] = blocks["GEOID"].apply(lambda x: btz.get(x, "NOZIP"))
return blocks
@permacache("population_density/load_subcounties_geojson")
def load_subcounties_geojson():
tempdir = tempfile.TemporaryDirectory()
rootpath = tempdir.name
os.system(f"mkdir -p {rootpath}")
root = "https://www2.census.gov/geo/tiger/TIGER2020PL/LAYER/COUSUB/2020/"
[table] = pd.read_html(root)
for path in tqdm.tqdm(
[x for x in table.Name if isinstance(x, str) and x[10] == "_"]
):
with urllib.request.urlopen(root + path) as f:
data = f.read()
with open(f"{rootpath}/{path}", "wb") as f:
f.write(data)
for path in tqdm.tqdm(glob.glob(f"{rootpath}/*.zip")):
os.system(f"cd {rootpath}; unzip {path}")
for path in tqdm.tqdm(glob.glob(f"{rootpath}/*.shp")):
geopandas.read_file(path).to_file(f"{path}.geojson", driver="GeoJSON")
results = {}
for path in tqdm.tqdm(glob.glob(f"{rootpath}/*.geojson")):
with open(path) as f:
res = json.load(f)
assert set(res) == {"type", "crs", "features"}
assert res["type"] == results.get("type", res["type"])
assert res["crs"] == results.get("crs", res["crs"])
results["features"] = results.get("features", []) + res["features"]
results["features"] = [
dict(
**x,
id=x["properties"]["STATEFP20"]
+ x["properties"]["COUNTYFP20"]
+ x["properties"]["COUSUBFP20"],
)
for x in results["features"]
]
return results
def get_fips_to_state():
return {x.fips: x.abbr for x in us.states.STATES_AND_TERRITORIES + [us.states.DC]}
@permacache("population_density/get_fips_to_counties")
def get_fips_to_counties():
fips_to_state = get_fips_to_state()
fips_to_counties = {
a + c: f"{b.title()}, {fips_to_state[a]}"
for a, bcs in addfips.AddFIPS()._counties.items()
for b, c in bcs.items()
if a in fips_to_state
}
fips_to_counties["02063"] = "Chugach, AK"
# fips_to_counties["02AL"] = "Alaska"
return fips_to_counties
@permacache("population_density/get_subfips_to_subcounty_name")
def get_subfips_to_subcounty_name():
subcounties_geojson = load_subcounties_geojson()
fips_to_counties = get_fips_to_counties()
subfips_to_state = {}
for x in subcounties_geojson["features"]:
fips = x["id"][:5]
if fips not in fips_to_counties:
print(fips)
continue
subfips_to_state[x["id"]] = (
x["properties"]["NAME20"] + ", " + fips_to_counties[fips]
)
return subfips_to_state
@permacache("population_density/load_data/load_county_geojson_2")
def load_county_geojson():
tempdir = tempfile.TemporaryDirectory()
rootpath = tempdir.name
os.system(f"mkdir -p {rootpath}")
zip = requests.get(
"https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_county_500k.zip"
).content
with open(f"{rootpath}/hi.zip", "wb") as f:
f.write(zip)
os.system(f"cd {rootpath}; unzip hi.zip")
geopandas.read_file(f"{rootpath}/cb_2018_us_county_500k.shp").to_file(
f"{rootpath}/hi.geojson", driver="GeoJSON"
)
with open(f"{rootpath}/hi.geojson") as f:
counties_geojson = json.load(f)
counties_geojson["features"] = [
dict(**x, id=x["properties"]["STATEFP"] + x["properties"]["COUNTYFP"])
for x in counties_geojson["features"]
]
return counties_geojson