-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.py
197 lines (172 loc) · 8.81 KB
/
handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from pyflink.table import TableEnvironment, Schema, DataTypes, FormatDescriptor
from pyflink.table.catalog import ObjectPath
from pyflink.table.confluent import ConfluentTableDescriptor, ConfluentSettings, ConfluentTools
from pyflink.table.expressions import col, lit
import uuid
from functools import reduce
import boto3
from botocore.exceptions import ClientError
import json
import logging
__copyright__ = "Copyright (c) 2024-2025 Jeffrey Jonathan Jennings"
__credits__ = ["Jeffrey Jonathan Jennings"]
__license__ = "MIT"
__maintainer__ = "Jeffrey Jonathan Jennings"
__email__ = "j3@thej3.com"
__status__ = "dev"
# Confluent Cloud for Apache Flink Secrets Keys
ENVIRONMENT_ID = "environment.id"
FLINK_API_KEY = "flink.api.key"
FLINK_API_SECRET = "flink.api.secret"
FLINK_CLOUD = "flink.cloud"
FLINK_COMPUTE_POOL_ID = "flink.compute.pool.id"
FLINK_PRINCIPAL_ID = "flink.principal.id"
FLINK_REGION = "flink.region"
ORGANIZATION_ID = "organization.id"
def lambda_handler(event, context):
"""
This AWS Lambda handler function is the main entry point for the Flink app. This
function fetches from the AWS Secrets Manager the Confluent Cloud for Apache Flink,
the settings (e.g., Flink Compute Pool API key, Compute Pool ID, etc.) to create
a TableEnvironment with the Confluent Cloud for Apache Flink settings. Then it
read data from two Kafka topics, combines the data, and writes the combined data
to a Kafka sink topic in which the Lambda function created.
Args(s):
event (Dict) : The event JSON object data, which contains data
for the `statusCode`, and `body` attributes.
context (LambdaContext): The Lambda metadata that provides invocation, function,
and execution environment information.
Returns:
`statusCode` with a message in the `body`:
200 for a successfully run of the function.
400 for a missing required field.
500 for a critical error.
"""
# Set up the logger.
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Check all required fields for the event exist.
required_event_fields = ["catalog_name", "database_name", "ccaf_secrets_path"]
for field in required_event_fields:
if field not in event:
logger.error(f"Missing required field: {field}")
return {
'statusCode': 400,
'body': json.dumps({'error': f'Missing required field: {field}'})
}
# Get the catalog name, database name, and secrets path from the event.
catalog_name = event.get("catalog_name", "").lower()
database_name = event.get("database_name", "").lower()
secrets_path = event.get("ccaf_secrets_path", "")
try:
get_secret_value_response = boto3.client('secretsmanager').get_secret_value(SecretId=secrets_path)
settings = json.loads(get_secret_value_response['SecretString'])
# Create the TableEnvironment with the Confluent Cloud for Apache Flink settings.
tbl_env = TableEnvironment.create(
ConfluentSettings
.new_builder()
.set_cloud(settings[FLINK_CLOUD])
.set_region(settings[FLINK_REGION])
.set_flink_api_key(settings[FLINK_API_KEY])
.set_flink_api_secret(settings[FLINK_API_SECRET])
.set_organization_id(settings[ORGANIZATION_ID])
.set_environment_id(settings[ENVIRONMENT_ID])
.set_compute_pool_id(settings[FLINK_COMPUTE_POOL_ID])
.set_principal_id(settings[FLINK_PRINCIPAL_ID])
.build()
)
except ClientError as e:
logger.error("Failed to get secrets from the AWS Secrets Manager because of %s.", e)
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
# The catalog name and database name are used to set the current catalog and database.
tbl_env.use_catalog(catalog_name)
tbl_env.use_database(database_name)
catalog = tbl_env.get_catalog(catalog_name)
# The Kafka sink table Confluent Cloud environment Table Descriptor with Avro serialization.
flight_avro_table_descriptor = (
ConfluentTableDescriptor
.for_managed()
.schema(
Schema
.new_builder()
.column("departure_airport_code", DataTypes.STRING())
.column("flight_number", DataTypes.STRING())
.column("email_address", DataTypes.STRING())
.column("departure_time", DataTypes.STRING())
.column("arrival_time", DataTypes.STRING())
.column("arrival_airport_code", DataTypes.STRING())
.column("confirmation_code", DataTypes.STRING())
.column("airline", DataTypes.STRING())
.build())
.distributed_by_into_buckets(1, "departure_airport_code", "flight_number")
.key_format(FormatDescriptor.for_format("avro-registry").build())
.value_format(FormatDescriptor.for_format("avro-registry").build())
.build()
)
try:
# Checks if the table exists. If it does not, it will be created.
flight_avro_table_path = ObjectPath(tbl_env.get_current_database(), "flight_avro")
if not catalog.table_exists(flight_avro_table_path):
tbl_env.create_table(
flight_avro_table_path.get_full_name(),
flight_avro_table_descriptor
)
logger.info(f"Sink table '{flight_avro_table_path.get_full_name()}' created successfully.")
else:
logger.info(f"Sink table '{flight_avro_table_path.get_full_name()}' already exists.")
except Exception as e:
logger.error(f"A critical error occurred during the processing of the table because {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
# The first table is the SkyOne table that is read in.
airline = tbl_env.from_path(f"{catalog_name}.{database_name}.skyone_avro")
# Get the schema and columns from the airline table.
schema = airline.get_schema()
# The columns that are not needed in the table the represents general airline flight data.
exclude_airline_columns = ["key", "flight_duration", "ticket_price", "aircraft", "booking_agency_email", "$rowtime"]
# Get only the columns that are not in the excluded columns list.
flight_expressions = [col(field) for field in schema.get_field_names() if field not in exclude_airline_columns]
flight_columns = [field for field in schema.get_field_names() if field not in exclude_airline_columns]
# The first table is the SkyOne table that is read in. In this select, the airline column is set to the
# literal word "SkyOne", and the columns are "unpacked" from the airline table.
skyone_airline = airline.select(*flight_expressions, lit("SkyOne"))
# The second table is the Sunset table that is read in. In this select, the airline column is set to the
# literal word "Sunset", and the columns are "unpacked" from the airline table.
sunset_airline = airline.select(*flight_expressions, lit("Sunset"))
# Build a compound expression, ensuring each column is not null
filter_condition = reduce(
lambda accumulated_columns, current_column: accumulated_columns & col(current_column).is_not_null, flight_columns[1:], col(flight_columns[0]).is_not_null
)
# Combine the two tables.
combined_airlines = (
skyone_airline.union_all(sunset_airline)
.alias(*flight_columns, "airline")
.filter(filter_condition)
)
# Insert the combined record into the sink table.
try:
# Supply a friendly statement name to easily identify the Flink Statement in the Cloud Console.
# However, the name is required to be unique across environments and regions, so a UUID is appended.
statement_name = "combined-flight-data-" + str(uuid.uuid4())
tbl_env.get_config().set("client.statement-name", statement_name)
# Execute the insert statement.
table_result = combined_airlines.execute_insert(flight_avro_table_path.get_full_name())
# Get the processed statement name.
processed_statement_name = ConfluentTools.get_statement_name(table_result)
success_message = f"Data processed and inserted successfully as: {processed_statement_name}"
logger.info(success_message)
return {
'statusCode': 200,
'body': json.dumps({'message': success_message})
}
except Exception as e:
logger.error(f"An error occurred during data insertion: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}