-
Notifications
You must be signed in to change notification settings - Fork 0
/
lambda_function.py
132 lines (94 loc) · 3.82 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import requests # Available via AWS Lambda Layer
import json # Available via AWS Lambda Layer
import os # Available via AWS Lambda Layer
import logging # Available via AWS Lambda Layer
import boto3 # Available via AWS Lambda Layer
import pandas as pd # Available via AWS Lambda Layer
import numpy as np # Available via AWS Lambda Layer
from botocore.session import Session
from botocore.config import Config
import tmdbsimple as tmdb # https://github.com/celiao/tmdbsimple
# max_attempts: retry count / read_timeout: socket timeout / connect_timeout: new connection timeout
s = Session()
c = s.create_client('s3', config=Config(connect_timeout=20, read_timeout=60, retries={'max_attempts': 10}))
# Logging events are sent to CloudWatch Logs
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_tmdb_api_key():
"""Use the AWS secrets manager (chanced as Lambda Layer) to get the TMDB API key.
Returns:
str: TMDB_API_KEY
"""
headers = {"X-Aws-Parameters-Secrets-Token": os.environ.get("AWS_SESSION_TOKEN")}
secrets_extension_endpoint = (
"http://localhost:"
+ "2773"
+ "/secretsmanager/get?secretId="
+ "<< your secrets arn >>"
)
r = requests.get(secrets_extension_endpoint, headers=headers)
secret = json.loads(r.text)["SecretString"]
secret = json.loads(secret)
TMDB_API_KEY = secret["TMDB_API"]
return TMDB_API_KEY
def write_to_s3(df, type, imdb_id):
"""
Wite a dataframe to S3 as JSON
Parameters:
df: dataframe to write
type: type of data (crew or credits)
imdb_id: imdb id of the movie
Returns:
"Success" upon completion
"""
# Output the Credits to S3
output = json.loads(df.to_json(orient='records'))
string = str(output)
encoded_string = string.encode("utf-8")
bucket_name = "lambda-tmdb"
file_name = "out.json"
s3_path = "output/" + type + "/" + imdb_id + "-" + type + "-" + file_name
s3 = boto3.resource('s3')
object = s3.Object(bucket_name, s3_path)
object.put(Body=encoded_string)
return "Success"
def lambda_handler(event, context):
"""
Call like this: Gateway URL + ?ids=tt0162346&ids=tt0326900
"""
# Get the IDs from the Query String
params = event["multiValueQueryStringParameters"]
id_list = params['ids']
# Get credentials from from Secrets Manager
KEY = get_tmdb_api_key()
tmdb.REQUESTS_TIMEOUT = (25)
tmdb.API_KEY = KEY
for i in range(len(id_list)):
imdb_id = id_list[i]
logging.info(f"RUN OF IMDB ID: {imdb_id}")
movie = tmdb.Find(id=imdb_id).info(external_source='imdb_id')
if movie['movie_results'] == []:
logging.info(f"NO MOVIE FOUND: {imdb_id}")
else:
movie_id = movie['movie_results'][0]['id']
movie = tmdb.Movies(movie_id)
credits = movie.credits()
keyValList = ['Visual Effects']
res = [d for d in credits['crew'] if d['known_for_department'] in keyValList]
df = pd.DataFrame(res)
df.fillna('None', inplace=True)
if len(df) == 0:
logging.info(f"NO CREW FOUND: {imdb_id}")
else:
df_crew = df.drop(columns='credit_id')
df_credits = df[['id', 'credit_id']]
# As long as the movie has people, write it to S3
write_to_s3(df_crew, "crew", imdb_id)
logging.info("CREW ADDED TO S3")
write_to_s3(df_credits, "credits", imdb_id)
logging.info("CREDITS ADDED TO S3")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": "Success",
}