Skip to content

Commit

Permalink
Raise ValueError if the account in DB does not have the device ID ass…
Browse files Browse the repository at this point in the history
…igned stored locally
  • Loading branch information
Grennith committed Feb 21, 2024
1 parent cb86902 commit d6dd7b1
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 40 deletions.
3 changes: 2 additions & 1 deletion mapadroid/account_handler/AbstractAccountHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@ async def get_assigned_username(self, device_id: int) -> Optional[str]:
pass

@abstractmethod
async def is_burnt(self, device_id: int) -> bool:
async def is_burnt(self, device_id: int, account_id: Optional[int] = None) -> bool:
"""
Args:
device_id: The device_id to which an account may be assigned
account_id: Optionally the account id of the entry expected
Returns: False if no auth has an assignment to the device or the auth entry was found not to be burnt
Expand Down
10 changes: 5 additions & 5 deletions mapadroid/account_handler/AccountHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ async def notify_logout(self, device_id: int) -> None:
session.add(currently_assigned)
await session.commit()

async def is_burnt(self, device_id: int) -> bool:
async def is_burnt(self, device_id: int, account_id: Optional[int] = None) -> bool:
async with self._db_wrapper as session, session:
existing_auth: Optional[SettingsPogoauth] = await SettingsPogoauthHelper.get_assigned_to_device(
session, device_id=device_id)
if not existing_auth:
# TODO: Raise?
logger.warning("No auth assigned to device to determine whether it's running a burnt account")
return False
if not existing_auth or existing_auth.account_id != account_id:
logger.warning("No auth assigned to device to determine whether it's running a burnt account "
"or the account id deviates")
raise ValueError("Missing auth to account or account id deviates")
logger.info("Checking account {} (ID {}) assigned to {} for whether it was burnt or not",
existing_auth.username, existing_auth.account_id,
device_id)
Expand Down
94 changes: 60 additions & 34 deletions mapadroid/ocr/screenPath.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,23 @@ async def __handle_login_screen(self, global_dict: dict, diff: int) -> None:
temp_dict: dict = {}
n_boxes = len(global_dict['text'])
logger.debug("Selecting login with: {}", global_dict)
if (self._worker_state.active_account_last_set + 300 < time.time()
or (self._worker_state.active_account
and await self._account_handler.is_burnt(self._worker_state.device_id))):
last_used_account_valid: bool = False
try:
last_used_account_valid = (self._worker_state.active_account
and await self._account_handler.is_burnt(self._worker_state.device_id,
self._worker_state.active_account.account_id))
except ValueError as e:
logger.warning("Account last used does not match the assignment of accounts stored in DB")
self._worker_state.active_account = None
self._worker_state.active_account_last_set = 0
if self._worker_state.active_account_last_set + 300 < time.time() or not last_used_account_valid:
logger.info("Detected login screen, fetching new account to use since last account was assigned more "
"than 5 minutes ago OR current account was marked burnt")
location_to_scan: Optional[Location] = None
if not location_to_scan \
or self._worker_state.current_location.lat == 0 and self._worker_state.current_location.lng == 0:
# Default location, use the middle of the geofence...
geofence_helper: Optional[GeofenceHelper] = await self._mapping_manager\
geofence_helper: Optional[GeofenceHelper] = await self._mapping_manager \
.routemanager_get_geofence_helper(self._worker_state.area_id)
if geofence_helper:
lat, lon = geofence_helper.get_middle_from_fence()
Expand All @@ -135,6 +142,8 @@ async def __handle_login_screen(self, global_dict: dict, diff: int) -> None:
logger.info("Account for {}: {}", self._worker_state.origin, account_to_use.username)
self._worker_state.active_account = account_to_use
self._worker_state.active_account_last_set = int(time.time())
else:
logger.info("Account was set recently and is still assigned to device {} in DB")
if not self._worker_state.active_account:
logger.error("No account set for device, sleeping 30s")
await asyncio.sleep(30)
Expand Down Expand Up @@ -163,7 +172,8 @@ async def __handle_login_screen(self, global_dict: dict, diff: int) -> None:
# alternative select - calculate down from Facebook button
elif 'Facebook' in temp_dict:
click_x = self._worker_state.resolution_calculator.screen_size_x / 2
click_y = (temp_dict['Facebook'] + 2 * self._worker_state.resolution_calculator.screen_size_y / 10.11)
click_y = (temp_dict[
'Facebook'] + 2 * self._worker_state.resolution_calculator.screen_size_y / 10.11)
logger.info("ScreenType.LOGINSELECT (f) using PTC (logintype in Device Settings)")
await self._communicator.click(int(click_x), int(click_y))
await asyncio.sleep(5)
Expand Down Expand Up @@ -371,9 +381,9 @@ async def __check_pogo_screen_ban_or_loading(self, screentype, y_offset: int = 0
backgroundcolor[0] == 18 and
backgroundcolor[1] == 46 and
backgroundcolor[2] == 86) or (
backgroundcolor[0] == 55 and
backgroundcolor[1] == 72 and
backgroundcolor[2] == 88)):
backgroundcolor[0] == 55 and
backgroundcolor[1] == 72 and
backgroundcolor[2] == 88)):
screentype = ScreenType.WELCOME
return screentype

Expand Down Expand Up @@ -408,7 +418,8 @@ async def __handle_google_login(self, screentype) -> ScreenType:
usernames: Optional[str] = None
if self._worker_state.active_account and self._worker_state.active_account.login_type == LoginType.ptc.name:
logger.warning('Really dont know how i get there ... using first @ggl address ... :)')
usernames: Optional[str] = await self.get_devicesettings_value(MappingManagerDevicemappingKey.GGL_LOGIN_MAIL, '@gmail.com')
usernames: Optional[str] = await self.get_devicesettings_value(
MappingManagerDevicemappingKey.GGL_LOGIN_MAIL, '@gmail.com')
elif self._worker_state.active_account:
usernames: Optional[str] = self._worker_state.active_account.username
else:
Expand Down Expand Up @@ -442,8 +453,8 @@ async def __handle_retry_screen(self, diff, global_dict) -> None:
await self.clear_game_data()
# after clear_game_data there should be no reason to click this button as game gets killed
# but let's leave it here if Niantic decides this is a bug rather than QOL change
#click_text = 'DIFFERENT,AUTRE,AUTORISER,ANDERES,KONTO,ACCOUNT'
#await self.__click_center_button_text(click_text, diff, global_dict)
# click_text = 'DIFFERENT,AUTRE,AUTORISER,ANDERES,KONTO,ACCOUNT'
# await self.__click_center_button_text(click_text, diff, global_dict)

async def __click_center_button_text(self, click_text, diff, global_dict):
n_boxes = len(global_dict['text'])
Expand Down Expand Up @@ -512,8 +523,8 @@ async def __handle_ptc_login(self) -> ScreenType:
accept_y = int(int(match.group(2)) + ((int(match.group(4)) - int(match.group(2))) / 2))
# button for actual login
if MadGlobals.application_args.enable_login_tracking and self._worker_state.active_account.login_type == LoginType.ptc.name:
# Check whether a PTC login rate limit applies before trying to login using credentials as this may trigger
# just as a plain startup of already logged in account/device
# Check whether a PTC login rate limit applies before trying to login using credentials as this may trigger
# just as a plain startup of already logged in account/device
logger.debug("Login tracking enabled")
if not await self.check_ptc_login_ban(increment_count=True):
logger.warning("Potential PTC ban, aborting PTC login for now. Sleeping 30s")
Expand Down Expand Up @@ -560,7 +571,9 @@ async def __handle_ggl_consent_screen(self) -> ScreenType:
if not result:
logger.error("Failed getting/analyzing screenshot")
return ScreenType.ERROR
if (self._worker_state.resolution_calculator.screen_size_x != 720 and self._worker_state.resolution_calculator.screen_size_y != 1280) and (self._worker_state.resolution_calculator.screen_size_x != 1080 and self._worker_state.resolution_calculator.screen_size_y != 1920) and (
if (
self._worker_state.resolution_calculator.screen_size_x != 720 and self._worker_state.resolution_calculator.screen_size_y != 1280) and (
self._worker_state.resolution_calculator.screen_size_x != 1080 and self._worker_state.resolution_calculator.screen_size_y != 1920) and (
self._worker_state.resolution_calculator.screen_size_x != 1440 and self._worker_state.resolution_calculator.screen_size_y != 2560):
logger.warning("The google consent screen can only be handled on 720x1280, 1080x1920 and 1440x2560 screens "
f"(width is {self._worker_state.resolution_calculator.screen_size_x}, height is {self._worker_state.resolution_calculator.screen_size_y})")
Expand Down Expand Up @@ -607,12 +620,15 @@ async def __handle_birthday_screen(self) -> None:

# After having restarted pogo, we should again be on the birthday screen now and PD is turned off
self._nextscreen = ScreenType.RETURNING
click_x = int((self._worker_state.resolution_calculator.screen_size_x / 2) + (self._worker_state.resolution_calculator.screen_size_x / 4))
click_x = int((self._worker_state.resolution_calculator.screen_size_x / 2) + (
self._worker_state.resolution_calculator.screen_size_x / 4))
click_y = int(self._worker_state.resolution_calculator.screen_size_y / 1.69)
await self._communicator.click(click_x, click_y)
await self._communicator.touch_and_hold(click_x, click_y, click_x, int(click_y - (self._worker_state.resolution_calculator.screen_size_y / 2)), 200)
await self._communicator.touch_and_hold(click_x, click_y, click_x, int(click_y - (
self._worker_state.resolution_calculator.screen_size_y / 2)), 200)
await asyncio.sleep(1)
await self._communicator.touch_and_hold(click_x, click_y, click_x, int(click_y - (self._worker_state.resolution_calculator.screen_size_y / 2)), 200)
await self._communicator.touch_and_hold(click_x, click_y, click_x, int(click_y - (
self._worker_state.resolution_calculator.screen_size_y / 2)), 200)
await asyncio.sleep(1)
await self._communicator.click(click_x, click_y)
await asyncio.sleep(1)
Expand All @@ -622,7 +638,7 @@ async def __handle_birthday_screen(self) -> None:
await asyncio.sleep(1)

async def __handle_welcome_screen(self) -> ScreenType:
#self._nextscreen = ScreenType.TOS
# self._nextscreen = ScreenType.TOS
screenshot_path = await self.get_screenshot_path()
coordinates: Optional[ScreenCoordinates] = await self._worker_state.pogo_windows.look_for_button(
screenshot_path,
Expand All @@ -635,9 +651,10 @@ async def __handle_welcome_screen(self) -> ScreenType:
return ScreenType.NOTRESPONDING

async def __handle_tos_screen(self) -> ScreenType:
#self._nextscreen = ScreenType.PRIVACY
# self._nextscreen = ScreenType.PRIVACY
screenshot_path = await self.get_screenshot_path()
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 2), int(self._worker_state.resolution_calculator.screen_size_y * 0.47))
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 2),
int(self._worker_state.resolution_calculator.screen_size_y * 0.47))
coordinates: Optional[ScreenCoordinates] = await self._worker_state.pogo_windows.look_for_button(
screenshot_path,
2.20, 3.01,
Expand All @@ -649,7 +666,7 @@ async def __handle_tos_screen(self) -> ScreenType:
return ScreenType.NOTRESPONDING

async def __handle_privacy_screen(self) -> ScreenType:
#self._nextscreen = ScreenType.WILLOWCHAR
# self._nextscreen = ScreenType.WILLOWCHAR
screenshot_path = await self.get_screenshot_path()
coordinates: Optional[ScreenCoordinates] = await self._worker_state.pogo_windows.look_for_button(
screenshot_path,
Expand All @@ -666,10 +683,12 @@ async def __handle_character_selection_screen(self) -> ScreenType:
await self._communicator.click(100, 100)
await asyncio.sleep(1)
await asyncio.sleep(1)
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 4), int(self._worker_state.resolution_calculator.screen_size_y / 2))
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 4),
int(self._worker_state.resolution_calculator.screen_size_y / 2))
await asyncio.sleep(2)
for _ in range(3):
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x * 0.91), int(self._worker_state.resolution_calculator.screen_size_y * 0.94))
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x * 0.91),
int(self._worker_state.resolution_calculator.screen_size_y * 0.94))
await asyncio.sleep(2)

if not await self._take_screenshot(delay_before=await self.get_devicesettings_value(
Expand All @@ -692,10 +711,11 @@ async def __handle_character_selection_screen(self) -> ScreenType:
async def __handle_catch_tutorial(self) -> ScreenType:
for _ in range(2):
await self._communicator.click(100, 100)
for x in range(1,10):
for y in range(1,10):
click_x = int(self._worker_state.resolution_calculator.screen_size_x * x/10)
click_y = int(self._worker_state.resolution_calculator.screen_size_y * y/20 + self._worker_state.resolution_calculator.screen_size_y / 2)
for x in range(1, 10):
for y in range(1, 10):
click_x = int(self._worker_state.resolution_calculator.screen_size_x * x / 10)
click_y = int(
self._worker_state.resolution_calculator.screen_size_y * y / 20 + self._worker_state.resolution_calculator.screen_size_y / 2)
await self._communicator.click(click_x, click_y)
await asyncio.sleep(5)
if not await self._take_screenshot(delay_before=await self.get_devicesettings_value(
Expand All @@ -704,16 +724,19 @@ async def __handle_catch_tutorial(self) -> ScreenType:
logger.error("Failed getting screenshot")
return ScreenType.ERROR
screenshot_path = await self.get_screenshot_path()
globaldict = await self._worker_state.pogo_windows.get_screen_text(screenshot_path, self._worker_state.origin)
starter = ['Bulbasaur', 'Charmander', 'Squirtle', 'Bisasam', 'Glumanda', 'Schiggy', 'Bulbizarre', 'Salameche', 'Carapuce']
globaldict = await self._worker_state.pogo_windows.get_screen_text(screenshot_path,
self._worker_state.origin)
starter = ['Bulbasaur', 'Charmander', 'Squirtle', 'Bisasam', 'Glumanda', 'Schiggy', 'Bulbizarre',
'Salameche', 'Carapuce']
if any(text in starter for text in globaldict['text']):
logger.debug("Found Pokémon")
break

for _ in range(3):
click_x = int(self._worker_state.resolution_calculator.screen_size_x / 2)
click_y = int(self._worker_state.resolution_calculator.screen_size_y * 0.93)
await self._communicator.touch_and_hold(click_x, click_y, click_x, int(click_y - (self._worker_state.resolution_calculator.screen_size_y / 2)), 200)
await self._communicator.touch_and_hold(click_x, click_y, click_x, int(click_y - (
self._worker_state.resolution_calculator.screen_size_y / 2)), 200)
await asyncio.sleep(15)

if not await self._take_screenshot(delay_before=await self.get_devicesettings_value(
Expand All @@ -731,7 +754,8 @@ async def __handle_catch_tutorial(self) -> ScreenType:
await self._communicator.click(coordinates.x, coordinates.y)
logger.info("Catched Pokémon.")
await asyncio.sleep(12)
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 2), int(self._worker_state.resolution_calculator.screen_size_y * 0.93))
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 2),
int(self._worker_state.resolution_calculator.screen_size_y * 0.93))
await asyncio.sleep(2)
return ScreenType.UNDEFINED

Expand All @@ -752,8 +776,10 @@ async def __handle_name_screen(self) -> ScreenType:
await self._communicator.enter_text(username)
await self._communicator.click(100, 100)
await asyncio.sleep(2)
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 2), int(self._worker_state.resolution_calculator.screen_size_y * 0.66))
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 2), int(self._worker_state.resolution_calculator.screen_size_y * 0.51))
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 2),
int(self._worker_state.resolution_calculator.screen_size_y * 0.66))
await self._communicator.click(int(self._worker_state.resolution_calculator.screen_size_x / 2),
int(self._worker_state.resolution_calculator.screen_size_y * 0.51))
await asyncio.sleep(2)

if not await self._take_screenshot(delay_before=await self.get_devicesettings_value(
Expand All @@ -763,7 +789,7 @@ async def __handle_name_screen(self) -> ScreenType:
return ScreenType.ERROR
screenshot_path = await self.get_screenshot_path()
globaldict = await self._worker_state.pogo_windows.get_screen_text(screenshot_path, self._worker_state.origin)
errortext = ['available.','verfugbar.','disponible.']
errortext = ['available.', 'verfugbar.', 'disponible.']
if any(text in errortext for text in globaldict['text']):
logger.warning('Account name is not available. Marking account as permabanned!')
await self._account_handler.mark_burnt(self._worker_state.device_id,
Expand Down

0 comments on commit d6dd7b1

Please sign in to comment.