forked from ThreatConnect-Inc/tcex-util
-
Notifications
You must be signed in to change notification settings - Fork 0
/
datetime_operation.py
268 lines (235 loc) · 9.78 KB
/
datetime_operation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""TcEx Framework Module"""
# standard library
from collections.abc import Generator
from datetime import datetime
# third-party
import arrow as _arrow
from arrow import Arrow
from dateutil import parser
from dateutil.relativedelta import relativedelta
class DatetimeOperation:
"""TcEx Utilities Datetime Operations Class"""
@classmethod
def any_to_datetime(
cls,
datetime_expression: int | str | datetime | Arrow,
tz: str | None = None,
) -> Arrow:
"""Return a arrow object from datetime expression.
Args:
datetime_expression: The datetime expression to parse into an Arrow datetime object.
tz: If provided, the parsed Arrow datetime object will be converted to the timezone
resulting from this timezone expression. Accepts values like 'US/Pacific', '-07:00',
'UTC'.
When this parameter is None, the returned Arrow datetime object will retain
its timezone information if datetime_expression is timezone-aware. If
datetime_expression is not timezone-aware, then the returned Arrow datetime object
will have UTC timezone info.
"""
value = str(datetime_expression)
# note: order matters. For example, _parse_timestamp could parse inputs that would have
# been meant for one of the default parser formats
parser_methods = [
cls._parse_default_arrow_formats,
cls._parse_non_default_arrow_formats,
cls._parse_timestamp,
cls._parse_humanized_input,
cls._parse_date_utils,
]
for method in parser_methods:
try:
parsed = method(value)
except Exception: # nosec
# value could not be parsed by current method.
pass
else:
# convert timezone if tz arg provided, else return parsed object
return cls._convert_timezone(parsed, tz) if tz is not None else parsed
raise RuntimeError( # pragma: no cover
f'Value "{value}" of type "{type(datetime_expression)}" '
'could not be parsed as a date time object.'
)
@property
def arrow(self):
"""Return arrow for use in Apps."""
return _arrow
def chunk_date_range(
self,
start_date: int | str | datetime | Arrow,
end_date: int | str | datetime | Arrow,
chunk_size: int = 1,
chunk_unit: str = 'months',
date_format: str | None = None,
) -> Generator[tuple[Arrow | str, Arrow | str], None, None]:
"""Chunk a date range based on unit and size
Args:
start_date: Date time expression or datetime object.
end_date: Date time expression or datetime object.
chunk_size: Chunk size for the provided units.
chunk_unit: A value of year, quarter, month, day, week, hour, minute, second (plural
versions of valid values also acceptable)
date_format: If None datetime object will be returned. Any other value
must be a valid strftime format (%s for epoch seconds).
"""
start_date = self.any_to_datetime(start_date)
end_date = self.any_to_datetime(end_date)
interval_args = {
'frame': chunk_unit,
'start': start_date,
'end': end_date,
'interval': chunk_size,
'bounds': '[]',
'exact': True,
}
try:
for range_tuple in Arrow.interval(**interval_args):
if date_format is not None:
range_tuple = range_tuple[0].strftime(date_format), range_tuple[1].strftime(
date_format
)
yield range_tuple
except Exception as ex: # pragma: no cover
raise RuntimeError(
'Could not generate date range. Please verify that chunk_size, chunk_unit, '
'and date_format values are valid.'
) from ex
def timedelta(
self, time_input1: int | str | datetime | Arrow, time_input2: int | str | datetime | Arrow
) -> dict:
"""Calculate the time delta between two time expressions.
Args:
time_input1: The end time expression (larger time value).
time_input2: The start time input string (smaller time value).
"""
time_input1_ = self.any_to_datetime(time_input1).datetime
time_input2_ = self.any_to_datetime(time_input2).datetime
diff = time_input1_ - time_input2_ # timedelta
delta = relativedelta(time_input1_, time_input2_) # relativedelta
# totals
total_months = (delta.years * 12) + delta.months
total_weeks = (delta.years * 52) + (total_months * 4) + delta.weeks
total_days = diff.days # handles leap days
total_hours = (total_days * 24) + delta.hours
total_minutes = (total_hours * 60) + delta.minutes
total_seconds = (total_minutes * 60) + delta.seconds
total_microseconds = (total_seconds * 1000) + delta.microseconds
return {
'datetime_1': time_input1_.isoformat(),
'datetime_2': time_input2_.isoformat(),
'years': delta.years,
'months': delta.months,
'weeks': delta.weeks,
'days': delta.days,
'hours': delta.hours,
'minutes': delta.minutes,
'seconds': delta.seconds,
'microseconds': delta.microseconds,
'total_months': total_months,
'total_weeks': total_weeks,
'total_days': total_days,
'total_hours': total_hours,
'total_minutes': total_minutes,
'total_seconds': total_seconds,
'total_microseconds': total_microseconds,
}
@classmethod
def _convert_timezone(cls, arrow_dt: Arrow, tz: str):
"""Convert Arrow datetime's timezone
Args:
arrow_dt: Arrow datetime object that will have its timezone converted
tz: timezone expression. Accepts values like 'US/Pacific', '-07:00', 'UTC'.
"""
try:
return arrow_dt.to(tz)
except Exception as ex: # pragma: no cover
raise RuntimeError(
f'Could not convert datetime to timezone "{tz}". Please verify timezone input.'
) from ex
@staticmethod
def _parse_default_arrow_formats(value: str) -> Arrow:
"""Attempt to parse value using default Arrow formats.
The value is simply passed into Arrow's "get" method. The following are the default
date formats (found in arrow.parser.DateTimeParser.parse_iso):
"YYYY-MM-DD",
"YYYY-M-DD",
"YYYY-M-D",
"YYYY/MM/DD",
"YYYY/M/DD",
"YYYY/M/D",
"YYYY.MM.DD",
"YYYY.M.DD",
"YYYY.M.D",
"YYYYMMDD",
# Year-Day-of-year: 2020-364 == 2020-12-29T00:00:00+00:00
"YYYY-DDDD",
# same as above, but without separating dash
"YYYYDDDD",
"YYYY-MM",
"YYYY/MM",
"YYYY.MM",
"YYYY",
# ISO week date: 2011-W05-4, 2019-W17
"W"
"""
return _arrow.get(value)
@staticmethod
def _parse_humanized_input(value: str) -> Arrow:
"""Attempt to dehumanize time inputs. Example: 'Two hours ago'."""
now = _arrow.utcnow()
plurals = {
'second': 'seconds',
'minute': 'minutes',
'hour': 'hours',
'day': 'days',
'week': 'weeks',
'month': 'months',
'year': 'years',
}
value = value.lower().strip()
if value == 'now':
return now
# pluralize singular time terms as applicable. Arrow does not support singular terms
terms = [plurals.get(term, term) for term in value.split()]
value = ' '.join(terms)
return now.dehumanize(value)
@staticmethod
def _parse_non_default_arrow_formats(value: str) -> Arrow:
"""Attempt to parse value using non-default Arrow formats
These are formats that Arrow provides constants for but are not used in the "get"
method (Arrow method that parses values into datetimes) by default.
Note: passing formats to test against overrides the default formats. Defaults are not used.
"""
return _arrow.get(
value,
[
_arrow.FORMAT_ATOM,
_arrow.FORMAT_COOKIE,
_arrow.FORMAT_RFC822,
_arrow.FORMAT_RFC850,
_arrow.FORMAT_RFC1036,
_arrow.FORMAT_RFC1123,
_arrow.FORMAT_RFC2822,
_arrow.FORMAT_RFC3339,
_arrow.FORMAT_RSS,
_arrow.FORMAT_W3C,
],
)
@staticmethod
def _parse_timestamp(value: str) -> Arrow:
"""Attempt to parse epoch timestamp in seconds, milliseconds, or microseconds.
Note: passing formats to test against overrides the default formats. Defaults are not used.
"""
# note: order matters. Must try to parse as milliseconds/microseconds first (x)
# before trying to parse as seconds (X), else error occurs if passing a ms/ns value
try:
# attempt to parse as string first using microsecond/millisecond and second specifiers
return _arrow.get(value, ['x', 'X'])
except (_arrow.parser.ParserError, ValueError):
# could not parse as string, try to parse as float
return _arrow.get(float(value))
@staticmethod
def _parse_date_utils(value: str) -> Arrow:
"""Attempt to supplement arrows parsing ability with date util.
Arrow doesn't support RFC_5322 used in HTTP 409 headers
"""
return _arrow.get(parser.parse(value))