Skip to content

Commit

Permalink
Merge pull request #127 from akunszt/issue-4920-s3-server-side-encryp…
Browse files Browse the repository at this point in the history
…tion

s3: add server side encryption support
  • Loading branch information
alltilla authored Jun 4, 2024
2 parents 335d100 + 5e2361b commit 8785f7e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
14 changes: 14 additions & 0 deletions modules/python-modules/syslogng/modules/s3/s3_destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def __init_options(self, options: Dict[str, Any]) -> None:
self.max_pending_uploads = int(options["max_pending_uploads"])
self.flush_grace_period = int(options["flush_grace_period"])
self.region: Optional[str] = str(options["region"])
self.server_side_encryption = str(options["server_side_encryption"])
self.kms_key = str(options["kms_key"])
self.storage_class = str(options["storage_class"]).upper().replace("-", "_")
self.canned_acl = str(options["canned_acl"]).lower().replace("_", "-")
except KeyError:
Expand Down Expand Up @@ -101,6 +103,16 @@ def __init_options(self, options: Dict[str, Any]) -> None:
if self.region == "":
self.region = None

if self.server_side_encryption != "" and self.server_side_encryption != "aws:kms":
assert False, "server-side-encryption() supports only aws:kms"

if self.server_side_encryption == "aws:kms" and self.kms_key == "":
assert False, "kms-key() must be set when server-side-encryption() is aws:kms"

if self.kms_key != "" and self.server_side_encryption == "":
self.logger.warn("ignoring kms-key() as server-side-encryption() is disabled")
self.kms_key = ""

VALID_STORAGE_CLASSES = {
"STANDARD",
"REDUCED_REDUNDANCY",
Expand Down Expand Up @@ -363,6 +375,8 @@ def __create_initial_s3_object(self, target_key: str, timestamp: str) -> S3Objec
target_key=target_key,
timestamp=timestamp,
compress=self.compression,
server_side_encryption=self.server_side_encryption,
kms_key=self.kms_key,
storage_class=self.storage_class,
persist_name=self.persist_name,
executor=self.executor,
Expand Down
54 changes: 53 additions & 1 deletion modules/python-modules/syslogng/modules/s3/s3_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ def __init__(
compress: Optional[bool] = None,
compresslevel: Optional[int] = None,
chunk_size: Optional[int] = None,
server_side_encryption: Optional[str] = None,
kms_key: Optional[str] = None,
storage_class: Optional[str] = None,
canned_acl: Optional[str] = None,
):
Expand All @@ -133,6 +135,8 @@ def __init__(
and compresslevel is not None
and chunk_size is not None
and storage_class is not None
and server_side_encryption is not None
and kms_key is not None
and canned_acl is not None
)
else:
Expand All @@ -152,6 +156,16 @@ def __init__(
raise PersistLoadError from e

if path:
# fields added in later releases can be missing from the persist JSON
# set a sane default value which can be overriden later if the user set
# the appropriate configuration directives
for upgrade_field in {
"canned-acl",
"kms-key",
"server-side-encryption",
}:
cache[upgrade_field] = cache.get(upgrade_field, "")

for field in {
"persist-name",
"bucket",
Expand All @@ -166,12 +180,16 @@ def __init__(
"upload-id",
"uploaded-parts",
"pending-parts",
"canned-acl",
"kms-key",
"server-side-encryption",
}:
try:
cache[field]
except KeyError as e:
raise PersistLoadError from e

# fields
self.__persist_name: str = cache.get("persist-name", persist_name)
self.__bucket: str = cache.get("bucket", bucket)
self.__target_key: str = cache.get("target-key", target_key)
Expand All @@ -181,11 +199,14 @@ def __init__(
self.__compresslevel: bool = cache.get("compresslevel", compresslevel)
self.__chunk_size: bool = cache.get("chunk-size", chunk_size)
self.__storage_class: str = cache.get("storage-class", storage_class)
self.__canned_acl: str = cache.get("canned-acl", canned_acl)
self.__finished: bool = cache.get("finished", False)
self.__upload_id: str = cache.get("upload-id", "")
self.__uploaded_parts: List[Dict[str, Any]] = cache.get("uploaded-parts", [])
self.__pending_parts: Dict[str, Any] = cache.get("pending-parts", dict())
# upgrade fields
self.__canned_acl: str = cache.get("canned-acl", canned_acl)
self.__kms_key: str = cache.get("kms-key", kms_key)
self.__server_side_encryption: str = cache.get("server-side-encryption", server_side_encryption)

self.__flush()

Expand All @@ -202,6 +223,8 @@ def __flush(self) -> None:
"index": self.__index,
"compress": self.__compress,
"compresslevel": self.__compresslevel,
"server-side-encryption": self.__server_side_encryption,
"kms-key": self.__kms_key,
"storage-class": self.__storage_class,
"canned-acl": self.__canned_acl,
"chunk-size": self.__chunk_size,
Expand Down Expand Up @@ -264,6 +287,14 @@ def compresslevel(self) -> bool:
def chunk_size(self) -> bool:
return self.__chunk_size

@property
def server_side_encryption(self) -> str:
return self.__server_side_encryption

@property
def kms_key(self) -> str:
return self.__kms_key

@property
def storage_class(self) -> str:
return self.__storage_class
Expand Down Expand Up @@ -345,6 +376,8 @@ def __init__(
compress: Optional[bool] = None,
chunk_size: Optional[int] = None,
compresslevel: Optional[int] = None,
server_side_encryption: Optional[str] = None,
kms_key: Optional[str] = None,
storage_class: Optional[str] = None,
canned_acl: Optional[str] = None,
persist: Optional[S3ObjectPersist] = None,
Expand Down Expand Up @@ -377,6 +410,8 @@ def __init__(
and compress is not None
and chunk_size is not None
and compresslevel is not None
and server_side_encryption is not None
and kms_key is not None
and storage_class is not None
and canned_acl is not None
)
Expand All @@ -389,6 +424,8 @@ def __init__(
compress=compress,
compresslevel=compresslevel,
chunk_size=chunk_size,
server_side_encryption=server_side_encryption,
kms_key=kms_key,
storage_class=storage_class,
canned_acl=canned_acl,
)
Expand Down Expand Up @@ -431,6 +468,8 @@ def create_initial(
timestamp: str,
compress: bool,
compresslevel: int,
server_side_encryption: str,
kms_key: str,
storage_class: str,
canned_acl: str,
persist_name: str,
Expand All @@ -451,6 +490,8 @@ def create_initial(
target_index=0,
compress=compress,
compresslevel=compresslevel,
server_side_encryption=server_side_encryption,
kms_key=kms_key,
storage_class=storage_class,
canned_acl=canned_acl,
chunk_size=chunk_size,
Expand Down Expand Up @@ -480,6 +521,8 @@ def create_next(self) -> S3Object:
target_index=self.index + 1,
compress=self.__persist.compress,
compresslevel=self.__persist.compresslevel,
server_side_encryption=self.__persist.server_side_encryption,
kms_key=self.__persist.kms_key,
storage_class=self.__persist.storage_class,
canned_acl=self.__persist.canned_acl,
chunk_size=self.__persist.chunk_size,
Expand Down Expand Up @@ -546,19 +589,28 @@ def __ensure_multipart_upload_started(self) -> bool:
if self.__persist.upload_id != "":
return True

sse_arguments = {}
if self.__persist.server_side_encryption != "":
sse_arguments["ServerSideEncryption"] = self.__persist.server_side_encryption

if self.__persist.kms_key != "":
sse_arguments["SSEKMSKeyId"] = self.__persist.kms_key

try:
if self.__persist.canned_acl != "":
response = self.__client.create_multipart_upload(
Bucket=self.bucket,
Key=self.key,
StorageClass=self.__persist.storage_class,
ACL=self.__persist.canned_acl,
**sse_arguments,
)
else:
response = self.__client.create_multipart_upload(
Bucket=self.bucket,
Key=self.key,
StorageClass=self.__persist.storage_class,
**sse_arguments,
)
except (ClientError, EndpointConnectionError) as e:
self.__logger.error(f"Failed to create multipart upload: {self.bucket}/{self.key} => {e}")
Expand Down
4 changes: 4 additions & 0 deletions modules/python-modules/syslogng/modules/s3/scl/s3.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ block destination s3(
max_pending_uploads(32)
flush_grace_period(60)
region("")
server_side_encryption("")
kms_key("")
storage_class("STANDARD")
canned_acl("")
...
Expand All @@ -61,6 +63,8 @@ block destination s3(
"max_pending_uploads" => `max_pending_uploads`
"flush_grace_period" => `flush_grace_period`
"region" => "`region`"
"server_side_encryption" => "`server_side_encryption`"
"kms_key" => "`kms_key`"
"storage_class" => "`storage_class`"
"canned_acl" => "`canned_acl`"
)
Expand Down

0 comments on commit 8785f7e

Please sign in to comment.