-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Do not merge PR #11410
Do not merge PR #11410
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,327 @@ | ||
from __future__ import print_function | ||
import pickle | ||
from googleapiclient.discovery import build | ||
import json | ||
import base64 | ||
import hashlib | ||
import hmac | ||
import requests | ||
from google_auth_oauthlib.flow import InstalledAppFlow | ||
from google.auth.transport.requests import Request | ||
Check notice Code scanning / CodeQL Unused import Note
Import of 'Request' is not used.
|
||
import azure.functions as func | ||
import logging | ||
import os | ||
import time | ||
import re | ||
from .state_manager import AzureStorageQueueHelper | ||
from datetime import datetime, timedelta | ||
|
||
|
||
customer_id = os.environ['WorkspaceID'] | ||
fetchDelay = os.getenv('FetchDelay',10) | ||
chunksize = 9999 | ||
calendarFetchDelay = os.getenv('CalendarFetchDelay',6) | ||
chatFetchDelay = os.getenv('ChatFetchDelay',1) | ||
userAccountsFetchDelay = os.getenv('UserAccountsFetchDelay',3) | ||
loginFetchDelay = os.getenv('LoginFetchDelay',6) | ||
shared_key = os.environ['WorkspaceKey'] | ||
pickle_str = os.environ['GooglePickleString'] | ||
pickle_string = base64.b64decode(pickle_str) | ||
connection_string = os.environ['AzureWebJobsStorage'] | ||
logAnalyticsUri = os.environ.get('logAnalyticsUri') | ||
|
||
''' | ||
customer_id = "dac5fbbc-f2c9-4589-8fe5-abe35946b964" | ||
fetchDelay = 10 | ||
chunksize = 9999 | ||
calendarFetchDelay = 6 | ||
chatFetchDelay = 1 | ||
userAccountsFetchDelay = 3 | ||
loginFetchDelay = 6 | ||
shared_key = "GR3mgHljiYY3y1HvMF9GInYvd7v9nKBTeReW7GGv+eY7jUkuMMlslOLwQx7GleUA2t/4s2JYkd3h8jRt8Zp8LQ==" | ||
pickle_str = "gASVvgMAAAAAAACMGWdvb2dsZS5vYXV0aDIuY3JlZGVudGlhbHOUjAtDcmVkZW50aWFsc5STlCmBlH2UKIwFdG9rZW6UjNp5YTI5LmEwQWZCX2J5Q2dxS09mMGJacDllVngtbFJhbFp3cnBfMU91SUdHVzRzRkI2MDRtMWlnZ0xScTZDNzBwUzhlS0NVRGZWZHhqbjZKMmRHSG5ZcFpLVUVnTzJPRldwR1J5MERTS0lWOVJxTTZPMnVXZmNqN29lTFdWWm9OSmxkaUhRX1o1Szh6NlVfdjY2U3NRZVdhcUtyb1NibmhhX240S2FnVFZCMklhQ2dZS0FiTVNBUkFTRlFIR1gyTWlXM2lGSkN4ZFpSYktWNGloMHdHa3BnMDE3MZSMBmV4cGlyeZSMCGRhdGV0aW1llIwIZGF0ZXRpbWWUk5RDCgfnDAMUCzkFatyUhZRSlIwRX3F1b3RhX3Byb2plY3RfaWSUTowPX3RydXN0X2JvdW5kYXJ5lE6MEF91bml2ZXJzZV9kb21haW6UjA5nb29nbGVhcGlzLmNvbZSMB19zY29wZXOUXZSMPGh0dHBzOi8vd3d3Lmdvb2dsZWFwaXMuY29tL2F1dGgvYWRtaW4ucmVwb3J0cy5hdWRpdC5yZWFkb25seZRhjA9fZGVmYXVsdF9zY29wZXOUTowOX3JlZnJlc2hfdG9rZW6UjGcxLy8wNlpLVXFmaFk0S0puQ2dZSUFSQUFHQVlTTndGLUw5SXJ6d2s0QnJ6MlJaS2FLZ1V0TEg4eW5FRVJyM29vUkRNM2J1UFBWUGpIZGVYbGtuM01BNVpxdm9iR3c5Y1ozWUVvTUJRlIwJX2lkX3Rva2VulE6MD19ncmFudGVkX3Njb3Blc5RdlIw8aHR0cHM6Ly93d3cuZ29vZ2xlYXBpcy5jb20vYXV0aC9hZG1pbi5yZXBvcnRzLmF1ZGl0LnJlYWRvbmx5lGGMCl90b2tlbl91cmmUjCNodHRwczovL29hdXRoMi5nb29nbGVhcGlzLmNvbS90b2tlbpSMCl9jbGllbnRfaWSUjEg5NjcxMzg3NTAzOTUtZXZhczZqb3BudDlxY2V2aHEza211cm9jdTRnZTNpZDEuYXBwcy5nb29nbGV1c2VyY29udGVudC5jb22UjA5fY2xpZW50X3NlY3JldJSMI0dPQ1NQWC1UaHVpUUMtOUxQZG1mUTAzWnJjcU1ybWkwSFhplIwLX3JhcHRfdG9rZW6UTowWX2VuYWJsZV9yZWF1dGhfcmVmcmVzaJSJdWIu" | ||
pickle_string = base64.b64decode(pickle_str) | ||
connection_string = "DefaultEndpointsProtocol=https;AccountName=gworkspacepoidoj7eik2jg;AccountKey=2CLGKVW8Peq+VGBSwBaKjiKjvDfbNd7ulPMAf1taa7+lH7Bxm8KX/jJ5EA/OGrC5Kf/Eub81eq9d+AStpszESA==;EndpointSuffix=core.windows.net" | ||
logAnalyticsUri = "https://dac5fbbc-f2c9-4589-8fe5-abe35946b964.ods.opinsights.azure.com" | ||
''' | ||
MAX_SCRIPT_EXEC_TIME_MINUTES = 10 | ||
SCOPES = ['https://www.googleapis.com/auth/admin.reports.audit.readonly'] | ||
activities = [ | ||
"user_accounts", | ||
"access_transparency", | ||
"admin", | ||
"calendar", | ||
"chat", | ||
"drive", | ||
"gcp", | ||
"gplus", | ||
"groups", | ||
"groups_enterprise", | ||
"jamboard", | ||
"login", | ||
"meet", | ||
"mobile", | ||
"rules", | ||
"saml", | ||
"token", | ||
"context_aware_access", | ||
"chrome", | ||
"data_studio" | ||
] | ||
|
||
|
||
|
||
# Remove excluded activities | ||
excluded_activities = os.environ.get('ExcludedActivities') | ||
if excluded_activities: | ||
excluded_activities = excluded_activities.replace(" ", "").split(",") | ||
activities = [activ for activ in activities if activ not in excluded_activities] | ||
|
||
|
||
if ((logAnalyticsUri in (None, '') or str(logAnalyticsUri).isspace())): | ||
logAnalyticsUri = 'https://' + customer_id + '.ods.opinsights.azure.com' | ||
pattern = r'https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$' | ||
match = re.match(pattern,str(logAnalyticsUri)) | ||
if(not match): | ||
raise Exception("Google Workspace Reports: Invalid Log Analytics Uri.") | ||
|
||
|
||
def get_credentials(): | ||
creds = None | ||
if pickle_string: | ||
try: | ||
creds = pickle.loads(pickle_string) | ||
except Exception as pickle_read_exception: | ||
logging.error('Error while loading pickle string: {}'.format(pickle_read_exception)) | ||
else: | ||
raise Exception("Google Workspace Reports: Pickle_string is empty. Exit.") | ||
return creds | ||
|
||
|
||
|
||
def isBlank (myString): | ||
return not (myString and myString.strip()) | ||
|
||
def isNotBlank (myString): | ||
return bool(myString and myString.strip()) | ||
|
||
def is_json(myjson): | ||
try: | ||
json.loads(myjson) | ||
except ValueError as e: | ||
return False | ||
return True | ||
|
||
# Function to convert string to datetime | ||
def convertToDatetime(date_time,format): | ||
#format = '%b %d %Y %I:%M%p' # The format | ||
datetime_str = datetime.strptime(date_time, format) | ||
return datetime_str | ||
|
||
def get_result(activity,start_time, end_time): | ||
try: | ||
result_activities = [] | ||
service = build('admin', 'reports_v1', credentials=creds, cache_discovery=False, num_retries=3, static_discovery=True) | ||
results = service.activities().list(userKey='all', applicationName=activity, | ||
maxResults=1, startTime=start_time, endTime=end_time).execute() | ||
next_page_token = results.get('nextPageToken', None) | ||
result = results.get('items', []) | ||
result_activities.extend(result) | ||
if result_activities is None or len(result_activities) == 0: | ||
logging.info("Logs not founded for {} activity".format(activity)) | ||
else: | ||
logging.info("Activity - {}, processing {} events".format(activity, len(result_activities))) | ||
except Exception as err: | ||
logging.error("Something wrong while getting the results. Exception error text: {}".format(err)) | ||
return result_activities, next_page_token | ||
Check failure Code scanning / CodeQL Potentially uninitialized local variable Error
Local variable 'next_page_token' may be used before it is initialized.
|
||
|
||
def get_nextpage_results(activity,start_time, end_time, next_page_token): | ||
try: | ||
result_activities = [] | ||
service = build('admin', 'reports_v1', credentials=creds, cache_discovery=False, num_retries=3, static_discovery=True) | ||
results = service.activities().list(userKey='all', applicationName=activity, | ||
maxResults=1000, startTime=start_time, endTime=end_time, pageToken=next_page_token).execute() | ||
next_page_token = results.get('nextPageToken', None) | ||
result = results.get('items', []) | ||
result_activities.extend(result) | ||
if result_activities is None or len(result_activities) == 0: | ||
logging.info("Logs not founded for {} activity".format(activity)) | ||
else: | ||
logging.info("Activity - {}, processing {} events".format(activity, len(result_activities))) | ||
except Exception as err: | ||
logging.error("Something wrong while getting the results. Exception error text: {}".format(err)) | ||
return result_activities, next_page_token | ||
|
||
def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource): | ||
x_headers = 'x-ms-date:' + date | ||
string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource | ||
bytes_to_hash = bytes(string_to_hash, encoding="utf-8") | ||
decoded_key = base64.b64decode(shared_key) | ||
encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode() | ||
authorization = "SharedKey {}:{}".format(customer_id,encoded_hash) | ||
return authorization | ||
|
||
def post_data(customer_id, shared_key, body, log_type,chunk_count): | ||
method = 'POST' | ||
content_type = 'application/json' | ||
resource = '/api/logs' | ||
rfc1123date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') | ||
content_length = len(body) | ||
signature = build_signature(customer_id, shared_key, rfc1123date, content_length, method, content_type, resource) | ||
uri = logAnalyticsUri + resource + '?api-version=2016-04-01' | ||
|
||
headers = { | ||
'content-type': content_type, | ||
'Authorization': signature, | ||
'Log-Type': log_type, | ||
'x-ms-date': rfc1123date, | ||
'time-generated-field': "id_time" | ||
} | ||
response = requests.post(uri,data=body, headers=headers) | ||
if (response.status_code >= 200 and response.status_code <= 299): | ||
logging.info("Logs with {} activity was processed into Azure".format(log_type)) | ||
logging.info("Chunk was processed{} events".format(chunk_count)) | ||
else: | ||
logging.warn("Response code: {}".format(response.status_code)) | ||
return response.status_code | ||
|
||
def expand_data(obj): | ||
new_obj = [] | ||
for event in obj: | ||
nested_events_arr = event["events"] | ||
for nested in nested_events_arr: | ||
head_event_part = event.copy() | ||
head_event_part.pop("events") | ||
if 'name' in nested: | ||
head_event_part.update({'event_name': nested["name"]}) | ||
if 'type' in nested: | ||
head_event_part.update({'event_type': nested["type"]}) | ||
if 'parameters' in nested: | ||
for parameter in nested["parameters"]: | ||
if 'name' in parameter: | ||
for param_name in ["value", "boolValue", "multiValue", "multiMessageValue", "multiIntValue", "messageValue", "intValue"]: | ||
if param_name in parameter: | ||
head_event_part.update({parameter["name"]: parameter[param_name]}) | ||
new_obj.append(head_event_part) | ||
return new_obj | ||
|
||
def gen_chunks_to_object(data,chunksize=100): | ||
chunk = [] | ||
for index, line in enumerate(data): | ||
if (index % chunksize == 0 and index > 0): | ||
yield chunk | ||
del chunk[:] | ||
chunk.append(line) | ||
yield chunk | ||
|
||
def gen_chunks_with_latesttime(data,log_type): | ||
chunks = [data[i:i+chunksize] for i in range(0, len(data), chunksize)] | ||
logging.info("Entered into the chunks mode") | ||
latest_timestamp = ""; | ||
i = 0 | ||
for chunk in chunks: | ||
try: | ||
i = i+1 | ||
logging.debug("Iteration chunk {}".format(i)) | ||
body = json.dumps(chunk) | ||
logging.debug(body) | ||
statuscode = post_data(customer_id, shared_key,body,log_type, len(chunk)) | ||
if (statuscode >= 200 and statuscode <= 299): | ||
latest_timestamp = chunk[-1]["id"]["time"] | ||
dt = datetime.strptime(latest_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') | ||
dt += timedelta(milliseconds=1) | ||
latest_timestamp = dt.strftime('%Y-%m-%dT%H:%M:%S.%f') | ||
latest_timestamp = latest_timestamp[:-3] + 'Z' | ||
logging.info("Chunk Timestamp {}".format(latest_timestamp)) | ||
else: | ||
logging.warn("There is an issue with Posting data to LA - Response code: {}".format(statuscode)) | ||
except Exception as err: | ||
logging.error("Something wrong. Exception error text: {}".format(err)) | ||
return latest_timestamp | ||
|
||
"""This method is used to process and post the results to Log Analytics Workspace | ||
Returns: | ||
latest_timestamp : last processed result timestamp | ||
""" | ||
def process_result(result_obj, start_time, activity): | ||
if result_obj is not None: | ||
result_obj = expand_data(result_obj) | ||
logging.info("Activity - {}, Expanded Events {} ".format(activity, len(result_obj))) | ||
sorted_data = sorted(result_obj, key=lambda x: x["id"]["time"],reverse=False) | ||
json_string = json.dumps(result_obj) | ||
byte_ = json_string.encode("utf-8") | ||
byteLength = len(byte_) | ||
mbLength = byteLength/1024/1024 | ||
if(len(result_obj)) > 0 and int(mbLength) < 25 : | ||
# Sort the json based on the "timestamp" key | ||
body = json.dumps(result_obj) | ||
statuscode = post_data(customer_id, shared_key,body,"GWorkspace_ReportsAPI_"+activity, len(result_obj)) | ||
if (statuscode >= 200 and statuscode <= 299): | ||
latest_timestamp = sorted_data[-1]["id"]["time"] | ||
dt = datetime.strptime(latest_timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') | ||
dt += timedelta(milliseconds=1) | ||
latest_timestamp = dt.strftime('%Y-%m-%dT%H:%M:%S.%f') | ||
latest_timestamp = latest_timestamp[:-3] + 'Z' | ||
logging.info("Successfully send data to LA of size {} MB".format(mbLength)) | ||
else: | ||
logging.warn("There is an issue with Posting data to LA - Response code: {}".format(statuscode)) | ||
latest_timestamp = start_time | ||
else: | ||
latest_timestamp = gen_chunks_with_latesttime(sorted_data, "GWorkspace_ReportsAPI_"+activity) | ||
if(isBlank(latest_timestamp)): | ||
latest_timestamp = start_time | ||
logging.info("The latest timestamp is same as the original start time {} - {}".format(activity,latest_timestamp)) | ||
# Fetch the latest timestamp | ||
logging.info("The latest timestamp got from api activity is {} - {}".format(activity,latest_timestamp)) | ||
return latest_timestamp | ||
|
||
def check_if_script_runs_too_long(script_start_time): | ||
now = int(time.time()) | ||
duration = now - script_start_time | ||
max_duration = int(MAX_SCRIPT_EXEC_TIME_MINUTES * 60 * 0.9) | ||
return duration > max_duration | ||
|
||
def main(queueItem: func.QueueMessage): | ||
logging.getLogger().setLevel(logging.INFO) | ||
script_start_time = int(time.time()) | ||
message_body = json.loads(queueItem.get_body().decode('ascii').replace("'",'"')) | ||
#message_body = {'start_time': '2023-12-03T14:01:00.000000Z', 'end_time': '2023-12-03T15:01:00.000Z', 'activity': 'access_transparency'} | ||
start_time = message_body.get('start_time') | ||
end_time = message_body.get('end_time') | ||
activity = message_body.get('activity') | ||
global creds | ||
creds = get_credentials() | ||
latest_timestamp = "" | ||
mainQueueHelper = AzureStorageQueueHelper(connection_string,"gworkspace-main-queue") | ||
logging.info('Starting GWorkspaceReport-QueueTrigger program at {}'.format(time.ctime(int(time.time()))) ) | ||
logging.info('Queue message received with body {}'.format(message_body) ) | ||
try: | ||
result_obj, next_page_token = get_result(activity,start_time,end_time) | ||
if (result_obj is not None) and (len(result_obj) > 0): | ||
Check failure Code scanning / CodeQL Potentially uninitialized local variable Error
Local variable 'result_obj' may be used before it is initialized.
|
||
latest_timestamp = process_result(result_obj, latest_timestamp, activity) | ||
|
||
while next_page_token is not None and len(next_page_token) > 0: | ||
result_obj, next_page_token = get_nextpage_results(activity,start_time,end_time,next_page_token) | ||
if (result_obj is not None) and (len(result_obj) > 0): | ||
latest_timestamp = process_result(result_obj, latest_timestamp, activity) | ||
if check_if_script_runs_too_long(script_start_time): | ||
logging.info(f'Script is running too long. Stop processing new events and updating state before finishing the script.') | ||
queue_body = {} | ||
queue_body["start_time"] = latest_timestamp | ||
queue_body["end_time"] = end_time | ||
queue_body["activity"] = activity | ||
mainQueueHelper.send_to_queue(queue_body,True) | ||
return | ||
else: | ||
logging.info("No events for {} activity with {} start time and {} end time".format(activity,start_time,end_time)) | ||
|
||
except Exception as err: | ||
logging.error("Something wrong. Exception error text: {}".format(err)) | ||
logging.error( "Error: Google Workspace Reports data connector execution failed with an internal server error.") | ||
raise | ||
logging.info(f'Finish script. at {time.ctime(int(time.time()))}') | ||
|
||
''' | ||
if __name__ == '__main__': | ||
main() | ||
''' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{ | ||
"IsEncrypted": false, | ||
"Values": { | ||
"AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=bluecyclepython;AccountKey=2QOhKt4OigH+x9Wh4ZBuqOL7WXgCAAU+fdiS4544wKR4+zcso5eIHXzfOKb6sQ5kM9eC+vJ9VR3Q+AStnRImX9==;EndpointSuffix=core.windows.net", | ||
"FUNCTIONS_WORKER_RUNTIME": "python", | ||
"GREYNOISE_KEY": "QQlNQhHvmYuRCSgPEk4zUf3qxyBJt4yIwxwm5CiKI3YL8piUvBKHiGAo9wFx8tEX", | ||
"CLIENT_ID": "1e1b4f6a-2c84-4e0e-8488-412e21868b62", | ||
"CLIENT_SECRET": "o~.8Q~7hVruMECN9C9o9wwQXbXjgrbLHAuWYSb8Y", | ||
"TENANT_ID": "17cea101-dd9b-43d0-9d67-f028b6efc55f", | ||
"WORKSPACE_ID": "c64eb659-e5d8-4727-a9cd-ea4a085138e6", | ||
"GREYNOISE_CLASSIFICATIONS": "malicious,unknown" | ||
} | ||
} | ||
|
Check notice
Code scanning / CodeQL
Unused import Note