-
Notifications
You must be signed in to change notification settings - Fork 1
/
copy_object_between_s3.py
96 lines (85 loc) · 3.22 KB
/
copy_object_between_s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from s3_client_lib.s3_multipart_client import S3MultipartClient
import logging
import requests
from s3_client_lib.utils import CHUNK_SIZE_128M
FORMAT = "%(asctime)-15s %(levelname)s %(message)s"
logging.basicConfig(
filename="example.log", filemode="w", format=FORMAT, level=logging.DEBUG
)
logger = logging.getLogger()
s3_config_localstack_main = {
"AWS_S3_ENDPOINT_URL": "http://localhost:4566",
"AWS_SECRET_ACCESS_KEY": "adsf",
"AWS_ACCESS_KEY_ID": "asdff",
}
s3_config_localstack_backup = {
"AWS_S3_ENDPOINT_URL": "http://localhost:4566",
"AWS_SECRET_ACCESS_KEY": "adsf",
"AWS_ACCESS_KEY_ID": "asdff",
}
client_main = S3MultipartClient(
s3_config_localstack_main["AWS_S3_ENDPOINT_URL"],
s3_config_localstack_main["AWS_ACCESS_KEY_ID"],
s3_config_localstack_main["AWS_SECRET_ACCESS_KEY"],
)
client_backup = S3MultipartClient(
s3_config_localstack_backup["AWS_S3_ENDPOINT_URL"],
s3_config_localstack_backup["AWS_ACCESS_KEY_ID"],
s3_config_localstack_backup["AWS_SECRET_ACCESS_KEY"],
)
def random_file(path):
import os
with open(path, 'wb') as fout:
fout.write(os.urandom(1024))
if __name__ == "__main__":
## setup of the test
main_bucket = "bucket"
backup_bucket = "bucketbackup"
object_name = "test"
client_main.create_bucket_if_not_exists(main_bucket)
client_backup.create_bucket_if_not_exists(backup_bucket)
random_file("/tmp/test_file")
print(client_main.upload_local_file("/tmp/test_file", main_bucket, object_name))
client_main.update_metadata_object(
main_bucket, object_name, metadata={"test_metadata": "metadata"}
)
# prepare multipart uplaod for backup
upload_id = client_backup.create_multipart_upload(
backup_bucket, object_name
)
part_no = 1
parts = []
# get binary data from main bucket
response = client_main.get_object(main_bucket, object_name)
for idx, chunk in enumerate(response['Body'].iter_chunks(CHUNK_SIZE_128M)):
part_no = idx + 1
# sign part
url = client_backup.sign_part_upload(
backup_bucket, object_name, upload_id, part_no
)
# upload part via signed url
res = requests.put(url, data=chunk)
logger.info(f'part=#{part_no} headers: {res.headers} {dir(res)}')
etag = res.headers.get('ETag', '')
logger.info(f"part=#{part_no} {etag}")
# store etag to parts list
parts.append({'ETag': etag.replace('"', ''), 'PartNumber': part_no})
logger.info(f'finishing {backup_bucket} - {object_name} parts={parts}, upload_id={upload_id}')
# finish uploading part
res = client_backup.finish_multipart_upload(
backup_bucket,
object_name,
parts,
upload_id,
)
logger.info(f"result of finishing multipart {res}")
# get metadata if object has some
s3_object = client_main.get_object_head(main_bucket, object_name)
obj_metadata = s3_object.get('Metadata', {})
if any(obj_metadata):
result = client_backup.update_metadata_object(
backup_bucket, object_name, obj_metadata
)
# check
s3_object = client_backup.get_object_head(backup_bucket, object_name)
logger.info(f"Finishing copy of the object between S3 {s3_object}")