Skip to content

Commit

Permalink
Merge pull request data-for-change#2679 from ziv17/264-remove-streets…
Browse files Browse the repository at this point in the history
…-update-from-cbs-load

264 remove streets update from cbs load
  • Loading branch information
atalyaalon authored Jul 4, 2024
2 parents 4478d54 + 4f67f20 commit 227c4a3
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 107 deletions.
108 changes: 7 additions & 101 deletions anyway/parsers/cbs/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import math
import pandas as pd
from sqlalchemy import or_, event
from typing import Tuple, Dict, List, Any
from typing import Dict, List

from anyway.parsers.cbs import preprocessing_cbs_files
from anyway import field_names, localization
Expand Down Expand Up @@ -76,7 +76,6 @@
LocationAccuracy,
ProviderCode,
VehicleDamage,
Streets,
AccidentMarkerView,
InvolvedView,
InvolvedMarkerView,
Expand Down Expand Up @@ -716,9 +715,7 @@ def get_files(directory):
return output_files_dict


def import_to_datastore(
directory, provider_code, year, batch_size
) -> Tuple[int, Dict[int, List[dict]]]:
def import_to_datastore(directory, provider_code, year, batch_size) -> int:
"""
goes through all the files in a given directory, parses and commits them
Returns number of new items, and new streets dict.
Expand All @@ -744,100 +741,14 @@ def import_to_datastore(
new_items += vehicles_count

logging.debug("\t{0} items in {1}".format(new_items, time_delta(started)))
return new_items, files_from_cbs[STREETS]
return new_items
except ValueError as e:
failed_dirs[directory] = str(e)
if "Not found" in str(e):
return 0, {}
return 0
raise e


def import_streets_into_db():
items = []
max_name_len = 0
for k, street_hebrew in yishuv_street_dict.items():
yishuv_symbol, street = k
yishuv_name_street_num = yishuv_name_dict.get((yishuv_symbol, street_hebrew), None)
if yishuv_name_street_num is None or yishuv_name_street_num != street:
logging.error(
f"streets data mismatch:"
f"yishuv_street_dict entry: {k}->{street_hebrew}"
f",yishuv_name_dict entry: {(yishuv_symbol, street_hebrew)}->{yishuv_name_street_num}"
)
continue
name_len = len(street_hebrew)
if name_len > max_name_len:
max_name_len = name_len
street_entry = {
"yishuv_symbol": yishuv_symbol,
"street": street,
"street_hebrew": street_hebrew[: min(name_len, Streets.MAX_NAME_LEN)],
}
items.append(street_entry)
logging.debug(
f"Writing to db: {len(yishuv_street_dict)}:{len(yishuv_name_dict)} -> {len(items)} rows"
)
db.session.query(Streets).delete()
db.session.bulk_insert_mappings(Streets, items)
db.session.commit()
if max_name_len > Streets.MAX_NAME_LEN:
logging.error(
f"Importing streets table: Street hebrew name length exceeded: max name: {max_name_len}"
)
else:
logging.debug(f"Max street name len:{max_name_len}")
logging.debug(f"Done. {len(yishuv_street_dict)}:{len(yishuv_name_dict)}")


yishuv_street_dict: Dict[Tuple[int, int], str] = {}
yishuv_name_dict: Dict[Tuple[int, str], int] = {}


def load_existing_streets():
streets = db.session.query(Streets).all()
for s in streets:
s_dict = {
"yishuv_symbol": s.yishuv_symbol,
"street": s.street,
"street_hebrew": s.street_hebrew,
}
add_street_remove_name_duplicates(s_dict)
add_street_remove_num_duplicates(s_dict)
logging.debug(f"Loaded streets: {len(yishuv_street_dict)}:{len(yishuv_name_dict)}")


def add_to_streets(streets_map: Dict[int, List[dict]]):
for yishuv_symbol, streets_list in streets_map.items():
for street in streets_list:
my_street = {
"yishuv_symbol": yishuv_symbol,
"street": street[field_names.street_sign],
"street_hebrew": street[field_names.street_name],
}
add_street_remove_name_duplicates(my_street)
add_street_remove_num_duplicates(my_street)


def add_street_remove_num_duplicates(street: Dict[str, Any]):
k = (street["yishuv_symbol"], street["street"])
v = yishuv_street_dict.get(k, None)
if v is not None and v != street["street_hebrew"]:
logging.error(f"Duplicate street code: {k}-> {v} and {street['street_hebrew']}")
yishuv_street_dict[k] = street["street_hebrew"]
if v is None:
yishuv_street_dict[k] = street["street_hebrew"]


def add_street_remove_name_duplicates(street: Dict[str, Any]):
k = (street["yishuv_symbol"], street["street_hebrew"])
v = yishuv_name_dict.get(k, None)
if v is not None and v != street["street"]:
logging.error(f"Duplicate street name: {k}-> {v} and {street['street']}")
yishuv_name_dict[k] = street["street"]
if v is None:
yishuv_name_dict[k] = street["street"]


def delete_invalid_entries(batch_size):
"""
deletes all markers in the database with null latitude or longitude
Expand Down Expand Up @@ -1112,7 +1023,7 @@ def recreate_table_for_location_extraction():
db.session.execute("""TRUNCATE cbs_locations""")
db.session.execute("""INSERT INTO cbs_locations
(SELECT ROW_NUMBER() OVER (ORDER BY road1) as id, LOCATIONS.*
FROM
FROM
(SELECT DISTINCT road1,
road2,
non_urban_intersection_hebrew,
Expand All @@ -1135,7 +1046,6 @@ def recreate_table_for_location_extraction():

def main(batch_size, source, load_start_year=None):
try:
load_existing_streets()
total = 0
started = datetime.now()
if source == "s3":
Expand All @@ -1161,11 +1071,10 @@ def main(batch_size, source, load_start_year=None):
)
logging.debug("Importing Directory " + cbs_files_dir)
preprocessing_cbs_files.update_cbs_files_names(cbs_files_dir)
num_new, streets = import_to_datastore(
num_new = import_to_datastore(
cbs_files_dir, provider_code, year, batch_size
)
total += num_new
add_to_streets(streets)
shutil.rmtree(s3_data_retriever.local_temp_directory)
elif source == "local_dir_for_tests_only":
path = "static/data/cbs"
Expand All @@ -1184,13 +1093,10 @@ def main(batch_size, source, load_start_year=None):
)
provider_code = get_provider_code(parent_directory)
logging.debug("Importing Directory " + directory)
num_new, streets = import_to_datastore(
num_new = import_to_datastore(
directory, provider_code, int(year), batch_size
)
total += num_new
add_to_streets(streets)

import_streets_into_db()

fill_db_geo_data()
failed = [
Expand Down
10 changes: 4 additions & 6 deletions tests/parsers/cbs/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@ def mock_shutil(monkeypatch):

def test_import_streets_is_called_once_when_source_is_s3(monkeypatch, mock_s3_data_retriever, mock_shutil):
# Arrange
import_streets_mock = MagicMock()
monkeypatch.setattr('anyway.parsers.cbs.executor.import_streets_into_db', import_streets_mock)
monkeypatch.setattr('anyway.parsers.cbs.executor.load_existing_streets', MagicMock())
monkeypatch.setattr('anyway.parsers.cbs.executor.delete_cbs_entries', MagicMock())
delete_cbs_entries = MagicMock()
monkeypatch.setattr('anyway.parsers.cbs.executor.delete_cbs_entries', delete_cbs_entries)
monkeypatch.setattr('anyway.parsers.cbs.executor.fill_db_geo_data', MagicMock())
monkeypatch.setattr('anyway.parsers.cbs.executor.create_tables', MagicMock())

# Act
main(batch_size=MagicMock(), source='s3')

# Assert
import_streets_mock.assert_called_once()
delete_cbs_entries.assert_called_once()


def test_cbs_parsing_failed_is_raised_when_something_bad_happens(monkeypatch):
monkeypatch.setattr('anyway.parsers.cbs.executor.load_existing_streets',
monkeypatch.setattr('anyway.parsers.cbs.executor.create_tables',
MagicMock(side_effect=Exception('something bad')))

with pytest.raises(CBSParsingFailed, match='Exception occurred while loading the cbs data: something bad'):
Expand Down

0 comments on commit 227c4a3

Please sign in to comment.