Skip to content

Commit

Permalink
리시버 로직 원자적으로 동작하도록 변경
Browse files Browse the repository at this point in the history
  • Loading branch information
onee-only committed Dec 16, 2024
1 parent af25260 commit ece1fb2
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 21 deletions.
13 changes: 11 additions & 2 deletions board/event/handler/internal/board_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from event import EventBroker
from board.data import Point, Tile
from board.data.handler import BoardHandler
Expand Down Expand Up @@ -80,6 +81,8 @@ async def receive_try_pointing(message: Message[TryPointingPayload]):
pointable = True
break

publish_coroutines = []

pub_message = Message(
event=PointEvent.POINTING_RESULT,
header={"receiver": sender},
Expand All @@ -89,11 +92,12 @@ async def receive_try_pointing(message: Message[TryPointingPayload]):
)
)

await EventBroker.publish(pub_message)
publish_coroutines.append(EventBroker.publish(pub_message))

cursor_pos = message.payload.cursor_position

if not pointable:
await asyncio.gather(*publish_coroutines)
return

# 인터랙션 범위 체크
Expand All @@ -102,19 +106,22 @@ async def receive_try_pointing(message: Message[TryPointingPayload]):
pointer.x > cursor_pos.x + 1 or \
pointer.y < cursor_pos.y - 1 or \
pointer.y > cursor_pos.y + 1:
await asyncio.gather(*publish_coroutines)
return

# 보드 상태 업데이트하기
tile = Tile.from_int(tiles.data[4]) # 3x3칸 중 가운데
click_type = message.payload.click_type

if tile.is_open:
await asyncio.gather(*publish_coroutines)
return

match (click_type):
# 닫힌 타일 열기
case ClickType.GENERAL_CLICK:
if tile.is_flag:
await asyncio.gather(*publish_coroutines)
return

tile.is_open = True
Expand All @@ -136,7 +143,9 @@ async def receive_try_pointing(message: Message[TryPointingPayload]):
)
)

await EventBroker.publish(pub_message)
publish_coroutines.append(EventBroker.publish(pub_message))

await asyncio.gather(*publish_coroutines)

@EventBroker.add_receiver(MoveEvent.CHECK_MOVABLE)
@staticmethod
Expand Down
30 changes: 30 additions & 0 deletions board/event/handler/test/board_handler_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from cursor.data import Color
from board.data import Point, Tile, Tiles
from board.event.handler import BoardEventHandler
Expand Down Expand Up @@ -255,6 +256,35 @@ async def test_try_pointing_pointable_closed_general_click(self, mock: AsyncMock
self.assertEqual(fetched_tile, expected_tile)
self.assertEqual(got.payload.tile, expected_tile)

@patch("event.EventBroker.publish")
async def test_try_pointing_pointable_closed_general_click_race(self, mock: AsyncMock):
cursor_pos = Point(0, 0)
pointer = Point(1, 0)

# 코루틴 스위칭을 위해 sleep. 이게 되는 이유를 모르겠다.
async def sleep(_):
await asyncio.sleep(0)
mock.side_effect = sleep

message = Message(
event=PointEvent.TRY_POINTING,
header={"sender": self.sender_id},
payload=TryPointingPayload(
cursor_position=cursor_pos,
new_pointer=pointer,
click_type=ClickType.GENERAL_CLICK,
color=Color.BLUE
)
)

await asyncio.gather(
BoardEventHandler.receive_try_pointing(message),
BoardEventHandler.receive_try_pointing(message)
)

# 첫번째: pointing-result, tile-state-changed 두번째: pointing-result 발행하는지 확인
self.assertEqual(len(mock.mock_calls), 3)

@patch("event.EventBroker.publish")
async def test_try_pointing_pointable_closed_general_click_flag(self, mock: AsyncMock):
cursor_pos = Point(0, 0)
Expand Down
56 changes: 39 additions & 17 deletions cursor/event/handler/internal/cursor_event_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from cursor.data import Cursor
from cursor.data.handler import CursorHandler
from board.data import Point, Tile
Expand Down Expand Up @@ -39,6 +40,8 @@ async def receive_new_conn(message: Message[NewConnPayload]):
cursor = CursorHandler.create_cursor(message.payload.conn_id)
cursor.set_size(message.payload.width, message.payload.height)

publish_coroutines = []

new_cursor_message = Message(
event="multicast",
header={"target_conns": [cursor.conn_id],
Expand All @@ -50,7 +53,7 @@ async def receive_new_conn(message: Message[NewConnPayload]):
)
)

await EventBroker.publish(new_cursor_message)
publish_coroutines.append(EventBroker.publish(new_cursor_message))

start_p = Point(
x=cursor.position.x - cursor.width,
Expand All @@ -67,9 +70,11 @@ async def receive_new_conn(message: Message[NewConnPayload]):
for other_cursor in cursors_in_range:
CursorHandler.add_watcher(watcher=cursor, watching=other_cursor)

await publish_new_cursors_event(
target_cursors=[cursor],
cursors=cursors_in_range
publish_coroutines.append(
publish_new_cursors_event(
target_cursors=[cursor],
cursors=cursors_in_range
)
)

cursors_with_view_including = CursorHandler.view_includes(cursor.position, cursor.conn_id)
Expand All @@ -78,11 +83,15 @@ async def receive_new_conn(message: Message[NewConnPayload]):
for other_cursor in cursors_with_view_including:
CursorHandler.add_watcher(watcher=other_cursor, watching=cursor)

await publish_new_cursors_event(
target_cursors=cursors_with_view_including,
cursors=[cursor]
publish_coroutines.append(
publish_new_cursors_event(
target_cursors=cursors_with_view_including,
cursors=[cursor]
)
)

await asyncio.gather(*publish_coroutines)

@EventBroker.add_receiver(PointEvent.POINTING)
@staticmethod
async def receive_pointing(message: Message[PointingPayload]):
Expand Down Expand Up @@ -219,7 +228,6 @@ async def receive_movable_result(message: Message[MovableResultPayload]):
cursor.position = new_position

# TODO: 새로운 방식으로 커서들 찾기. 최적화하기.
# set을 사용하면 제약이 있음.

# 새로운 뷰의 커서들 찾기
top_left = Point(cursor.position.x - cursor.width, cursor.position.y + cursor.height)
Expand All @@ -236,16 +244,20 @@ async def receive_movable_result(message: Message[MovableResultPayload]):
if not in_view:
CursorHandler.remove_watcher(watcher=cursor, watching=other_cursor)

publish_coroutines = []

new_watchings = list(filter(lambda c: c.conn_id not in original_watching_ids, cursors_in_view))
if len(new_watchings) > 0:
# 새로운 watching 커서들 연관관계 설정
for other_cursor in new_watchings:
CursorHandler.add_watcher(watcher=cursor, watching=other_cursor)

# 새로운 커서들 전달
await publish_new_cursors_event(
target_cursors=[cursor],
cursors=new_watchings
publish_coroutines.append(
publish_new_cursors_event(
target_cursors=[cursor],
cursors=new_watchings
)
)

# 새로운 위치를 바라보고 있는 커서들 찾기, 본인 제외
Expand All @@ -268,7 +280,8 @@ async def receive_movable_result(message: Message[MovableResultPayload]):
color=cursor.color,
)
)
await EventBroker.publish(message)

publish_coroutines.append(EventBroker.publish(message))

# 범위 벗어나면 watcher 제거
for watcher in original_watchers:
Expand All @@ -283,11 +296,15 @@ async def receive_movable_result(message: Message[MovableResultPayload]):
CursorHandler.add_watcher(watcher=other_cursor, watching=cursor)

# 새로운 커서들에게 본인 커서 전달
await publish_new_cursors_event(
target_cursors=new_watchers,
cursors=[cursor]
publish_coroutines.append(
publish_new_cursors_event(
target_cursors=new_watchers,
cursors=[cursor]
)
)

await asyncio.gather(*publish_coroutines)

@EventBroker.add_receiver(InteractionEvent.TILE_STATE_CHANGED)
@staticmethod
async def receive_tile_state_changed(message: Message[TileStateChangedPayload]):
Expand All @@ -299,6 +316,8 @@ async def receive_tile_state_changed(message: Message[TileStateChangedPayload]):
# 닫힌 타일의 mine, number 정보는 버리기
pub_tile = tile.copy(hide_info=True)

publish_coroutines = []

# 변경된 타일을 보고있는 커서들에게 전달
view_cursors = CursorHandler.view_includes(position)
if len(view_cursors) > 0:
Expand All @@ -311,9 +330,10 @@ async def receive_tile_state_changed(message: Message[TileStateChangedPayload]):
tile=pub_tile
)
)
await EventBroker.publish(pub_message)
publish_coroutines.append(EventBroker.publish(pub_message))

if not (tile.is_open and tile.is_mine):
await asyncio.gather(*publish_coroutines)
return

# 주변 8칸 커서들 죽이기
Expand All @@ -335,7 +355,9 @@ async def receive_tile_state_changed(message: Message[TileStateChangedPayload]):
revive_at=revive_at.astimezone().isoformat()
)
)
await EventBroker.publish(pub_message)
publish_coroutines.append(EventBroker.publish(pub_message))

await asyncio.gather(*publish_coroutines)

@EventBroker.add_receiver(NewConnEvent.CONN_CLOSED)
@staticmethod
Expand Down
33 changes: 31 additions & 2 deletions cursor/event/handler/test/cursor_event_handler_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from cursor.data import Cursor, Color
from cursor.data.handler import CursorHandler
from cursor.event.handler import CursorEventHandler
Expand Down Expand Up @@ -145,6 +146,34 @@ async def test_new_conn_receive_without_cursors(self, mock: AsyncMock):
self.assertEqual(got.payload.position.y, 0)
self.assertIn(got.payload.color, Color)

@patch("event.EventBroker.publish")
async def test_new_conn_receive_without_cursors_race(self, mock: AsyncMock):
conn_1 = "1"
conn_2 = "2"
height = 1
width = 1

new_conn_1_msg = Message(
event=NewConnEvent.NEW_CONN,
payload=NewConnPayload(conn_id=conn_1, width=width, height=height)
)
new_conn_2_msg = Message(
event=NewConnEvent.NEW_CONN,
payload=NewConnPayload(conn_id=conn_2, width=width, height=height)
)

# 코루틴 스위칭을 위해 sleep. 이게 되는 이유를 모르겠다.
async def sleep(_):
await asyncio.sleep(0)
mock.side_effect = sleep

await asyncio.gather(
CursorEventHandler.receive_new_conn(new_conn_1_msg),
CursorEventHandler.receive_new_conn(new_conn_2_msg)
)
# 첫번째 conn: my-cursor, 두번째 conn: my-cursor, cursors * 2
self.assertEqual(len(mock.mock_calls), 4)

@patch("event.EventBroker.publish")
async def test_receive_new_conn_with_cursors(self, mock: AsyncMock):
# /docs/example/cursor-location.png
Expand Down Expand Up @@ -767,7 +796,7 @@ async def test_receive_movable_result_c_left(self, mock: AsyncMock):
self.assertEqual(len(mock.mock_calls), 2)

# cursors
got = mock.mock_calls[0].args[0]
got = mock.mock_calls[1].args[0]
self.assertEqual(type(got), Message)
self.assertEqual(got.event, "multicast")
# origin_event
Expand All @@ -788,7 +817,7 @@ async def test_receive_movable_result_c_left(self, mock: AsyncMock):
self.assertEqual(got.payload.cursors[1].color, self.cur_b.color)

# moved
got = mock.mock_calls[1].args[0]
got = mock.mock_calls[0].args[0]
self.assertEqual(type(got), Message)
self.assertEqual(got.event, "multicast")
# origin_event
Expand Down

0 comments on commit ece1fb2

Please sign in to comment.