-
Notifications
You must be signed in to change notification settings - Fork 0
/
sparkSQL
185 lines (170 loc) · 7.21 KB
/
sparkSQL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#Create the DIM_ZIP table holding zip information and coordinates
create external table ext_us (
country string,
zip string,
city string,
state string,
stateabrv string,
field_1 string,
field_2 string,
field_3 string,
field_4 string,
coords_1 float,
coords_2 float
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
location '/data/pac/us';
CREATE TABLE stg_us as
select zip, coords_1, coords_2, city, stateabrv as state from ext_us;
create external table ext_zip (
col_value string
)
location '/data/pac/zip';
CREATE TABLE stg_zip as
SELECT
regexp_extract(col_value, '^(?:"([^,]*)"\,?){1}', 1) zip,
regexp_extract(col_value, '^(?:"([^,]*)"\,?){2}', 1) coords_1,
regexp_extract(col_value, '^(?:"([^,]*)"\,?){3}', 1) coords_2,
regexp_extract(col_value, '^(?:"([^,]*)"\,?){4}', 1) city,
regexp_extract(col_value, '^(?:"([^,]*)"\,?){5}', 1) state
from ext_zip;
create table dim_zip as
select zip.* from stg_zip zip left outer join stg_us us on (zip.zip = us.zip) where us.zip is null
union all
select us.* from stg_us us;
#Create the EXT_CN table holding the candidate information
create external table ext_cn (
candidateId string,
candidateName string,
candidateParty string,
candidateElectionYear int,
candidateOfficeState string,
candidateOffice string,
candidateOfficeDistrict string,
incumbentChallengerStatus string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\|'
location '/data/pac/cn';
#Create the EXT_CLL table holding the candidate to committee linkage
create external table ext_ccl (
candidateId string,
field_1 string,
field_2 string,
committeeId string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\|'
location '/data/pac/ccl';
#Create the EXT_CM table holding the committee information
create external table ext_cm (
committeeId string,
committeeName string,
field_1 string,
field_2 string,
field_3 string,
field_4 string,
field_5 string,
field_6 string,
committeeDesignation string,
committeeType string,
committeeParty string,
field_7 string,
interestGroupCategory string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\|'
location '/data/pac/cm';
#Create the EXT_ITCONT table holding the individual contributions
create external table ext_itcont (
committeeId string,
field_1 string,
reportType string,
primaryGeneralIndicator string,
microfilmLocation string,
transactionType string,
entityType string,
name string,
city string,
state string,
zip string,
employer string,
occupation string,
transactionDate string,
transactionAmount double,
field_2 string,
transactionID string,
reportID string,
field_3 string,
memo string,
recordNumber double
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\|'
location '/data/pac/itcont';
#add committee info to contributions
create table wrk_indv_cont stored as SEQUENCEFILE as
select itcont.reporttype,itcont.primarygeneralindicator,itcont.microfilmlocation,itcont.transactiontype,
itcont.entitytype,itcont.name,itcont.city,itcont.state,itcont.zip,itcont.employer,itcont.occupation,
itcont.transactiondate,itcont.transactionamount,itcont.transactionid,itcont.reportid,itcont.memo,itcont.recordnumber,
named_struct("committeeid",cm.committeeid,"committeename",cm.committeename,
"committeedesignation",cm.committeedesignation,"committeetype",cm.committeetype,
"committeeparty",cm.committeeparty,"interestgroupcategory",cm.interestgroupcategory) committee
from ext_itcont itcont join ext_cm cm on (itcont.committeeid = cm.committeeid);
#add custom UDAFs to create a map/array from an aggregation of any primitive and complex types: https://github.com/wdavidw/hive-udf
add jar /home/cbejjani/src/hive-udf/target/adaltas-hive-udf-0.0.1-SNAPSHOT.jar;
CREATE TEMPORARY FUNCTION collect_map as 'com.adaltas.UDAFToMap';
CREATE TEMPORARY FUNCTION collect_array as 'com.adaltas.UDAFToArray';
#add candidate information
create table wrk_indv_cont_2 as
select reporttype,primarygeneralindicator,microfilmlocation,transactiontype,
entitytype,name,city,state,zip,employer,occupation,
transactiondate,transactionamount,transactionid,reportid,memo,recordnumber,
committee,
collect_array(candidate) candidates
from (
select indv.*,
named_struct("candidateid",cn.candidateid,"candidatename",cn.candidatename,"candidateparty",cn.candidateparty,"candidateelectionyear",cn.candidateelectionyear,
"candidateofficestate",cn.candidateofficestate,"candidateoffice",cn.candidateoffice,"candidateofficedistrict",cn.candidateofficedistrict,
"incumbentchallengerstatus",cn.incumbentchallengerstatus) candidate
from wrk_indv_cont indv left outer join ext_ccl ccl on (indv.committee.committeeid = ccl.committeeid) left outer join ext_cn cn on (ccl.candidateid = cn.candidateid)
) c
group by reporttype,primarygeneralindicator,microfilmlocation,transactiontype,
entitytype,name,city,state,zip,employer,occupation,
transactiondate,transactionamount,transactionid,reportid,memo,recordnumber,
committee;
#add geo-location info
create table fct_indv_cont as
select reporttype,primarygeneralindicator,microfilmlocation,transactiontype,
entitytype,name,employer,occupation,concat(substr(transactiondate,5,4),substr(transactiondate,0,2),substr(transactiondate,3,2)) transactiondate,transactionamount,
transactionid,reportid,memo,recordnumber,committee, candidates,
named_struct("location",named_struct("lat",CAST(NVL(loc.coords_1,0) AS FLOAT),"lon",CAST(NVL(loc.coords_2,0) AS FLOAT)),"zip",indv.zip,"city",indv.city,"state",indv.state) loc
from wrk_indv_cont_2 indv left outer join dim_zip loc on (indv.zip = loc.zip and lower(indv.city) = lower(loc.city));
#create index in Elasticsearch
#IMPORTANT: RUN THE FOLLOWING COMMANDS FROM HIVE NOT SPARK SQL SINCE THE Elasticsearch on Hadoop SerDes WON'T WORK WITH SPARK. Spark does not use hadoop writables for serialization and does not understand them.
add jar gs://secure-grove-90717/lib/elasticsearch-hadoop-2.0.2.jar;
create table es_indv_cont (
reporttype string,
primarygeneralindicator string,
microfilmlocation string,
transactiontype string,
entitytype string,
name string,
employer string,
occupation string,
transactiondate string,
transactionamount double,
transactionid string,
reportid string,
memo string,
recordnumber double,
committee struct<committeeid:string,committeename:string,committeedesignation:string,committeetype:string,committeeparty:string,interestgroupcategory:string>,
candidates array<struct<candidateid:string,candidatename:string,candidateparty:string,candidateelectionyear:int,candidateofficestate:string,candidateoffice:string,candidateofficedistrict:string,incumbentchallengerstatus:string>>,
loc struct<location:struct<lat:float,lon:float>,zip:string,city:string,state:string>
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'usfec/ndiv_contrib',
'es.nodes' = 'elasticsearch-40fr');
INSERT OVERWRITE TABLE es_indv_cont
select * from fct_indv_cont;