forked from mailprotector/certbot-lambda
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
97 lines (74 loc) · 2.78 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python3
import os
import shutil
import boto3
import certbot.main
# Let’s Encrypt acme-v02 server that supports wildcard certificates
CERTBOT_SERVER = os.getenv('CERTBOT_URL')
# Temp dir of Lambda runtime
CERTBOT_DIR = '/tmp/certbot'
def rm_tmp_dir():
if os.path.exists(CERTBOT_DIR):
try:
shutil.rmtree(CERTBOT_DIR)
except NotADirectoryError:
os.remove(CERTBOT_DIR)
def obtain_certs(emails, domains, dns_plugin):
certbot_args = [
# Override directory paths so script doesn't have to be run as root
'--config-dir', CERTBOT_DIR,
'--work-dir', CERTBOT_DIR,
'--logs-dir', CERTBOT_DIR,
# Obtain a cert but don't install it
'certonly',
# Run in non-interactive mode
'--non-interactive',
# Agree to the terms of service
'--agree-tos',
# Email of domain administrators
'--email', emails,
# Use dns challenge with dns plugin
'--authenticator', dns_plugin,
'--preferred-challenges', 'dns-01',
# Use this server instead of default acme-v01
'--server', CERTBOT_SERVER,
# Domains to provision certs for (comma separated)
'--domains', domains,
]
return certbot.main.main(certbot_args)
# /tmp/certbot
# ├── live
# │ └── [domain]
# │ ├── README
# │ ├── cert.pem
# │ ├── chain.pem
# │ ├── fullchain.pem
# │ └── privkey.pem
def upload_certs(s3_bucket, s3_prefix, s3_region):
client = boto3.client('s3', s3_region)
cert_dir = os.path.join(CERTBOT_DIR, 'live')
for dirpath, _dirnames, filenames in os.walk(cert_dir):
for filename in filenames:
local_path = os.path.join(dirpath, filename)
relative_path = os.path.relpath(local_path, cert_dir)
s3_key = os.path.join(s3_prefix, relative_path)
print(f'Uploading: {local_path} => s3://{s3_bucket}/{s3_key}')
client.upload_file(local_path, s3_bucket, s3_key)
def guarded_handler(event, context):
# Input parameters from env vars
dns_plugin = os.getenv('DNS_PLUGIN')
# Input parameters from event payload
emails = event['emails']
domains = event['domains']
s3_bucket = event['s3_bucket'] # The S3 bucket to publish certificates
s3_prefix = event['s3_prefix'] # The S3 key prefix to publish certificates
s3_region = event['s3_region'] # The AWS region of the S3 bucket
obtain_certs(emails, domains, dns_plugin)
upload_certs(s3_bucket, s3_prefix, s3_region)
return 'Certificates obtained and uploaded successfully.'
def lambda_handler(event, context):
try:
rm_tmp_dir()
return guarded_handler(event, context)
finally:
rm_tmp_dir()