-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3Functions.py
140 lines (116 loc) · 5.38 KB
/
s3Functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import boto3
import re
import base64
def upload_file(file_name, bucket, object_name=None):
"""
Upload a file to an S3 bucket
:param file_name: File to upload
:param bucket: Bucket to upload to
:param object_name: S3 object name. If not specified, file_name is used
:return: True if file was uploaded, else False
"""
# If S3 object_name was not specified, use file_name
if object_name is None:
object_name = file_name
# Upload the file
s3_client = boto3.client('s3')
try:
s3_client.upload_file(file_name, bucket, object_name)
except Exception as e:
print(e)
return False
return True
def delete_file(bucket, object_name):
"""
Delete a file from an S3 bucket
:param bucket: Bucket of the file
:param object_name: S3 object name
:return: True if file was deleted, else False
"""
s3_client = boto3.client('s3')
try:
s3_client.delete_object(Bucket=bucket, Key=object_name)
except Exception as e:
print(e)
return False
return True
def download_file(bucket, object_name, file_name=None):
"""
Download a file from an S3 bucket
:param bucket: Bucket to download from
:param object_name: S3 object name
:param file_name: File name to download the object to.
If not specified, object_name is used.
:return: True if file was downloaded, else False
"""
# If file_name was not specified, use object_name
if file_name is None:
file_name = object_name
# Download the file
s3_client = boto3.client('s3')
try:
s3_client.download_file(bucket, object_name, file_name)
except Exception as e:
print(e)
return False
return True
def find_object_with_highest_number(bucket_name):
s3_client = boto3.client('s3')
response = s3_client.list_objects_v2(Bucket=bucket_name)
max_number = -1
object_with_max_number = None
if 'Contents' in response:
for obj in response['Contents']:
object_name = obj['Key']
# Assuming the number is at the end and is preceded by a non-numeric character
# For example: 'file-123', 'image_456.png'
match = re.search(r'(\d+)(?!.*\d)', object_name)
if match:
number = int(match.group())
if number > max_number:
max_number = number
object_with_max_number = object_name
return object_with_max_number
def decode_file(base64_string):
# Decode the base64 string and save the snapshot
with open('snapshot.csv', 'wb') as file:
file.write(base64.b64decode(base64_string))
def upload_snapshot(file_name):
# Get the latest snapshot number
object_name = find_object_with_highest_number('dailysupplysnapshot')
# Increment the number
if object_name:
match = re.search(r'(\d+)(?!.*\d)', object_name)
if match:
number = int(match.group())
number += 1
object_name = object_name.replace(str(match.group()), str(number))
else:
object_name = 'snapshot_1'
# Upload the file
if upload_file(file_name, 'dailysupplysnapshot', object_name):
print(f"Uploaded {file_name} as {object_name}")
else:
print("Upload failed")
# Replace 'my_bucket' with your S3 bucket name
# Replace 'my_file.txt' with the path to your file
# Replace 'my_object_name' with the desired object name in S3 (optional)
# if upload_file('Analysis/Daily Snapshot.csv', 'dailysupplysnapshot', 'snapshot_1'):
# print("Upload successful")
# else:
# print("Upload failed")
# if download_file('dailysupplysnapshot', 'snapshot_1', 'snapshot.csv'):
# print("Download successful")
# else:
# print("Download failed")
# To delete the file you just uploaded
# if delete_file('my_bucket', 'my_object_name'):
# print("Deletion successful")
# else:
# print("Deletion failed")
# highest_number_object = find_object_with_highest_number('dailysupplysnapshot')
# if highest_number_object:
# print(f"The object with the highest number: {highest_number_object}")
# else:
# print("No objects found or no numeric endings detected.")
# decode_file('SXRlbSBOdW1iZXIg4oCTIDYgZGlnaXQsTkRDIE51bWJlcixVUEMgTnVtYmVyLENvbnN0YW50LEN1c3RvbWVyLVNwZWNpZmljIEl0ZW0gTnVtYmVyLERlc2NyaXB0aW9uLFBhY2sgU2l6ZSBEaXZpc29yLFNpemUgUXR5LFJYL09UQyBJbmRpY2F0b3IsQVdQIFByaWNlLEFjcXVpc2l0aW9uIFByaWNlLFJldGFpbCBQcmljZSxDb250cmFjdCBGbGFnLEdlbmVyaWMgRGVzY3JpcHRpb24sUmV0YWlsIFBhY2sgUXVhbnRpdHksV0FDIFByaWNlLEl0ZW0gTnVtYmVyIOKAkyA4IGRpZ2l0CjAwMTEzOTAsNDExNjc0MzEwMDIsMDQxMTY3NDMxMDIzLCAgICAgICAgICAgICAgICAgICAgICAgICwgICAgICAgICAgLEFMTEVHUkEgRCAxMkhSIDYwTUcgQ0FQTEVUIDEwQywwMDAwMDAxMDAwMCwxMCAgICAgICxPLDAwMDAwMDE4NjYsMDAwMDAwMTE3MywwMDAwMDAxNzYwLE4sZmV4b2ZlbmFkaW5lL3BzZXVkb2VwaGVkcmluZSBPLDAwMDAxLDAwMDAwMDEwNjYsMTAwODQ5MTYKMDAxMTM5NSw0MTE2NzQzMTAwNCwwNDExNjc0MzEwNDcsICAgICAgICAgICAgICAgICAgICAgICAgLCAgICAgICAgICAsQUxMRUdSQSBEIDEySFIgNjBNRyBDQVBMRVQgMjBDLDAwMDAwMDIwMDAwLDIwICAgICAgLE8sMDAwMDAwMjc2MiwwMDAwMDAxNzM2LDAwMDAwMDI2MDQsTixmZXhvZmVuYWRpbmUvcHNldWRvZXBoZWRyaW5lIE8sMDAwMDEsMDAwMDAwMTU3OCwxMDA4NDkxNwowMDExNDIxLDQxMTY3NDMyMDA1LDA0MTE2NzQzMjA1MSwgICAgICAgICAgICAgICAgICAgICAgICAsICAgICAgICAgICxBTExFR1JBIEQgMjRIUiAxODBNRyBUQUJMRVQgMTAsMDAwMDAwMTAwMDAsMTAgICAgICAsTywwMDAwMDAyMTE2LDAwMDAwMDEzMzAsMDAwMDAwMTk5NSxOLGZleG9mZW5hZGluZS9wc2V1ZG9lcGhlZHJpbmUgTywwMDAwMSwwMDAwMDAxMjA5LDEwMDg0OTE5')