-
Notifications
You must be signed in to change notification settings - Fork 235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding Sync: Update filters to be robust against remote invite rooms #17450
Changes from 13 commits
3a6eec1
7f72d80
18ec437
a67157e
ffb61cf
9b77c97
d609205
8ca8ae5
4f50054
7ae2425
b35a2da
4bb34d3
a07ee22
2c9ec61
27a97a0
517bfc4
d22bae4
8624fb0
a77b70e
df5093b
7c33c83
474b480
82fa037
6f05bb3
d4c4de1
0ace82d
8c97eaa
8dd5a47
5267b03
329bf09
d7dcbfd
60ec4e3
affee22
8ee1708
fe2d84b
4ae3079
05bbcb0
376d900
28a36e8
ac7ca67
15e157e
19968e1
8f5e4bf
3c144b3
f3b4c9f
1732b02
20902e6
e9b76cc
484007e
7acb1ad
45ed4ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to handle invite/knock rooms when filtering. | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,18 +17,26 @@ | |
# [This file includes modifications made by New Vector Limited] | ||
# | ||
# | ||
import enum | ||
import logging | ||
from itertools import chain | ||
from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple | ||
|
||
import attr | ||
from immutabledict import immutabledict | ||
|
||
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership | ||
from synapse.api.constants import ( | ||
AccountDataTypes, | ||
Direction, | ||
EventContentFields, | ||
EventTypes, | ||
Membership, | ||
) | ||
from synapse.events import EventBase | ||
from synapse.events.utils import strip_event | ||
from synapse.handlers.relations import BundledAggregations | ||
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary | ||
from synapse.storage.databases.main.state import ROOM_UNKNOWN_SENTINEL | ||
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership | ||
from synapse.storage.roommember import MemberSummary | ||
from synapse.types import ( | ||
|
@@ -37,6 +45,7 @@ | |
Requester, | ||
RoomStreamToken, | ||
StateMap, | ||
StrCollection, | ||
StreamKeyType, | ||
StreamToken, | ||
UserID, | ||
|
@@ -51,6 +60,12 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
class Sentinel(enum.Enum): | ||
# defining a sentinel in this way allows mypy to correctly handle the | ||
# type of a dictionary lookup and subsequent type narrowing. | ||
UNSET_SENTINEL = object() | ||
Comment on lines
+88
to
+91
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This currently works and aligns with other sentinels we already have in the codebase. But I can't use the same pattern in I can't figure out a way that has these mypy type narrowing benefits while also being immutable. Seems like we need python/mypy#15553 or adjust our custom mypy plugin that checks whether something In any case, it works and we could just merge but it would be nice to find a pattern to use everywhere. |
||
|
||
|
||
# The event types that clients should consider as new activity. | ||
DEFAULT_BUMP_EVENT_TYPES = { | ||
EventTypes.Message, | ||
|
@@ -1085,6 +1100,67 @@ async def filter_rooms( | |
A filtered dictionary of room IDs along with membership information in the | ||
room at the time of `to_token`. | ||
""" | ||
room_id_to_stripped_state_map: Dict[str, Optional[List[Any]]] = {} | ||
|
||
async def _bulk_fetch_stripped_state_for_rooms( | ||
room_ids: StrCollection, | ||
) -> None: | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Fetch stripped state for a list of rooms (stripped state is only applicable to invite/knock rooms). | ||
""" | ||
# Fetch what we haven't before | ||
room_ids_to_fetch = [ | ||
room_id | ||
for room_id in room_ids | ||
if room_id not in room_id_to_stripped_state_map | ||
] | ||
|
||
# Gather a list of event IDs we can grab stripped state from | ||
invite_or_knock_event_ids: List[str] = [] | ||
for room_id in room_ids_to_fetch: | ||
if sync_room_map[room_id].membership in ( | ||
Membership.INVITE, | ||
Membership.KNOCK, | ||
): | ||
event_id = sync_room_map[room_id].event_id | ||
# If this is an invite/knock then there should be an event_id | ||
assert event_id is not None | ||
invite_or_knock_event_ids.append(event_id) | ||
else: | ||
room_id_to_stripped_state_map[room_id] = None | ||
|
||
invite_or_knock_events = await self.store.get_events( | ||
invite_or_knock_event_ids | ||
) | ||
for invite_or_knock_event in invite_or_knock_events.values(): | ||
room_id = invite_or_knock_event.room_id | ||
membership = invite_or_knock_event.membership | ||
|
||
if membership == Membership.INVITE: | ||
# Scrutinize unsigned things | ||
invite_room_state = invite_or_knock_event.unsigned.get( | ||
"invite_room_state", None | ||
) | ||
# `invite_room_state` should be a list of stripped events | ||
if isinstance(invite_room_state, list): | ||
room_id_to_stripped_state_map[room_id] = invite_room_state | ||
else: | ||
room_id_to_stripped_state_map[room_id] = None | ||
elif membership == Membership.KNOCK: | ||
# Scrutinize unsigned things | ||
knock_room_state = invite_or_knock_event.unsigned.get( | ||
"knock_room_state", None | ||
) | ||
# `knock_room_state` should be a list of stripped events | ||
if isinstance(knock_room_state, list): | ||
room_id_to_stripped_state_map[room_id] = knock_room_state | ||
else: | ||
room_id_to_stripped_state_map[room_id] = None | ||
else: | ||
raise AssertionError( | ||
f"Unexpected membership {membership} (this is a problem with Synapse itself)" | ||
) | ||
|
||
filtered_room_id_set = set(sync_room_map.keys()) | ||
|
||
# Filter for Direct-Message (DM) rooms | ||
|
@@ -1104,31 +1180,71 @@ async def filter_rooms( | |
if not sync_room_map[room_id].is_dm | ||
} | ||
|
||
if filters.spaces: | ||
if filters.spaces is not None: | ||
raise NotImplementedError() | ||
|
||
# Filter for encrypted rooms | ||
if filters.is_encrypted is not None: | ||
# Lookup the encryption state from the database. Since this function is | ||
# cached, need to make a mutable copy via `dict(...)`. | ||
room_id_to_encryption = dict( | ||
await self.store.bulk_get_room_encryption(filtered_room_id_set) | ||
) | ||
room_ids_with_results = [ | ||
room_id | ||
for room_id, encryption in room_id_to_encryption.items() | ||
if encryption is not ROOM_UNKNOWN_SENTINEL | ||
] | ||
|
||
# We might not have room state for remote invite/knocks if we are the first | ||
# person on our server to see the room. The best we can do is look in the | ||
# optional stripped state from the invite/knock event. | ||
room_ids_without_results = filtered_room_id_set.difference( | ||
room_ids_with_results | ||
) | ||
await _bulk_fetch_stripped_state_for_rooms(room_ids_without_results) | ||
|
||
# Update our `room_id_to_encryption` map based on the stripped state | ||
for room_id in room_ids_without_results: | ||
stripped_state = room_id_to_stripped_state_map.get( | ||
room_id, Sentinel.UNSET_SENTINEL | ||
) | ||
assert stripped_state is not Sentinel.UNSET_SENTINEL, ( | ||
f"Stripped state left unset for room {room_id}. " | ||
+ "Make sure you're calling `_bulk_fetch_stripped_state_for_rooms(...)` " | ||
+ "with that room_id. (this is a problem with Synapse itself)" | ||
) | ||
|
||
if stripped_state is not None: | ||
for event in stripped_state: | ||
event_type = event.get("type") | ||
event_content = event.get("content") | ||
# Scrutinize unsigned stripped state | ||
if isinstance(event_type, str) and isinstance( | ||
event_content, dict | ||
): | ||
if event_type == EventTypes.RoomEncryption: | ||
room_id_to_encryption[room_id] = event_content.get( | ||
EventContentFields.ENCRYPTION_ALGORITHM | ||
) | ||
break | ||
else: | ||
# Didn't see any encryption events in the stripped state | ||
room_id_to_encryption[room_id] = None | ||
|
||
# Make a copy so we don't run into an error: `Set changed size during | ||
# iteration`, when we filter out and remove items | ||
for room_id in filtered_room_id_set.copy(): | ||
state_at_to_token = await self.storage_controllers.state.get_state_at( | ||
room_id, | ||
to_token, | ||
state_filter=StateFilter.from_types( | ||
[(EventTypes.RoomEncryption, "")] | ||
), | ||
# Partially-stated rooms should have all state events except for the | ||
# membership events so we don't need to wait because we only care | ||
# about retrieving the `EventTypes.RoomEncryption` state event here. | ||
# Plus we don't want to block the whole sync waiting for this one | ||
# room. | ||
await_full_state=False, | ||
) | ||
is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, "")) | ||
encryption = room_id_to_encryption.get(room_id, ROOM_UNKNOWN_SENTINEL) | ||
|
||
# Just remove rooms if we can't determine their encryption status | ||
if encryption is ROOM_UNKNOWN_SENTINEL: | ||
filtered_room_id_set.remove(room_id) | ||
continue | ||
|
||
# If we're looking for encrypted rooms, filter out rooms that are not | ||
# encrypted and vice versa | ||
is_encrypted = encryption is not None | ||
if (filters.is_encrypted and not is_encrypted) or ( | ||
not filters.is_encrypted and is_encrypted | ||
): | ||
|
@@ -1154,15 +1270,60 @@ async def filter_rooms( | |
# provided in the list. `None` is a valid type for rooms which do not have a | ||
# room type. | ||
if filters.room_types is not None or filters.not_room_types is not None: | ||
room_to_type = await self.store.bulk_get_room_type( | ||
{ | ||
room_id | ||
for room_id in filtered_room_id_set | ||
# We only know the room types for joined rooms | ||
if sync_room_map[room_id].membership == Membership.JOIN | ||
} | ||
# Lookup the room type from the database. Since this function is | ||
# cached, need to make a mutable copy via `dict(...)`. | ||
room_id_to_type = dict( | ||
await self.store.bulk_get_room_type(filtered_room_id_set) | ||
) | ||
room_ids_with_results = [ | ||
room_id | ||
for room_id, room_type in room_id_to_type.items() | ||
if room_type is not ROOM_UNKNOWN_SENTINEL | ||
] | ||
|
||
# We might not have room state for remote invite/knocks if we are the first | ||
# person on our server to see the room. The best we can do is look in the | ||
# optional stripped state from the invite/knock event. | ||
room_ids_without_results = filtered_room_id_set.difference( | ||
room_ids_with_results | ||
) | ||
for room_id, room_type in room_to_type.items(): | ||
await _bulk_fetch_stripped_state_for_rooms(room_ids_without_results) | ||
|
||
# Update our `room_id_to_type` map based on the stripped state | ||
for room_id in room_ids_without_results: | ||
stripped_state = room_id_to_stripped_state_map.get( | ||
room_id, Sentinel.UNSET_SENTINEL | ||
) | ||
assert stripped_state is not Sentinel.UNSET_SENTINEL, ( | ||
f"Stripped state left unset for room {room_id}. " | ||
+ "Make sure you're calling `_bulk_fetch_stripped_state_for_rooms(...)` " | ||
+ "with that room_id. (this is a problem with Synapse itself)" | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this true that either the server is in the room or the membership is INVITE/KNOCK? I'm wondering about the case of e.g. leaves where the server is no longer in the room? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code does work fine when the server leaves the room (added some tests for that). For the non- That does bring up a good point though. Ideally, we shouldn't be using current state for I think we can assume the room type isn't secret since it's defined at the the start of the room in the For the encrypted state, most of the time it's set right away when the room is created but it's possible to set it later. Can we assume it's not considered leaking data? I would say yes especially since it's one of the stripped state events that's just given away. If we can make these assumptions, we can keep our bulk lookup shortcuts which would be nice. |
||
|
||
if stripped_state is not None: | ||
for event in stripped_state: | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
event_type = event.get("type") | ||
event_content = event.get("content") | ||
# Scrutinize unsigned stripped state | ||
if isinstance(event_type, str) and isinstance( | ||
event_content, dict | ||
): | ||
if event_type == EventTypes.Create: | ||
room_id_to_type[room_id] = event_content.get( | ||
EventContentFields.ROOM_TYPE | ||
) | ||
break | ||
|
||
# Make a copy so we don't run into an error: `Set changed size during | ||
# iteration`, when we filter out and remove items | ||
for room_id in filtered_room_id_set.copy(): | ||
room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL) | ||
|
||
# Just remove rooms if we can't determine their type | ||
if room_type is ROOM_UNKNOWN_SENTINEL: | ||
filtered_room_id_set.remove(room_id) | ||
continue | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These filters have gotten complex. There is an inkling to just say, "we only apply filters to joined rooms". But the problem with that is when you're joined to a room and filtering with sliding sync, as soon as you leave the room, you won't get that final update down sync because the room was filtered out. It's important that the filters apply regardless of your membership. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way we can deduplicate some of these things? Maybe we need to have filter classes or something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I shared the logic in a new function |
||
if ( | ||
filters.room_types is not None | ||
and room_type not in filters.room_types | ||
|
@@ -1175,13 +1336,13 @@ async def filter_rooms( | |
): | ||
filtered_room_id_set.remove(room_id) | ||
|
||
if filters.room_name_like: | ||
if filters.room_name_like is not None: | ||
raise NotImplementedError() | ||
|
||
if filters.tags: | ||
if filters.tags is not None: | ||
raise NotImplementedError() | ||
|
||
if filters.not_tags: | ||
if filters.not_tags is not None: | ||
raise NotImplementedError() | ||
|
||
# Assemble a new sync room map but only with the `filtered_room_id_set` | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,10 +72,18 @@ | |
|
||
_T = TypeVar("_T") | ||
|
||
|
||
MAX_STATE_DELTA_HOPS = 100 | ||
|
||
|
||
# Freeze so it's immutable and we can use it as a cache value | ||
@attr.s(slots=True, frozen=True, auto_attribs=True) | ||
class Sentinel: | ||
pass | ||
|
||
|
||
ROOM_UNKNOWN_SENTINEL = Sentinel() | ||
|
||
|
||
@attr.s(slots=True, frozen=True, auto_attribs=True) | ||
class EventMetadata: | ||
"""Returned by `get_metadata_for_events`""" | ||
|
@@ -300,8 +308,14 @@ async def get_create_event_for_room(self, room_id: str) -> EventBase: | |
|
||
@cached(max_entries=10000) | ||
async def get_room_type(self, room_id: str) -> Optional[str]: | ||
"""Get the room type for a given room. The server must be joined to the | ||
given room. | ||
""" | ||
Get the room type for a given room. | ||
Returns: | ||
The room type if known (`None` is a valid room type) | ||
Raises: | ||
NotFoundError if the room is unknown | ||
""" | ||
|
||
row = await self.db_pool.simple_select_one( | ||
|
@@ -324,9 +338,19 @@ async def get_room_type(self, room_id: str) -> Optional[str]: | |
@cachedList(cached_method_name="get_room_type", list_name="room_ids") | ||
async def bulk_get_room_type( | ||
self, room_ids: Set[str] | ||
) -> Mapping[str, Optional[str]]: | ||
"""Bulk fetch room types for the given rooms, the server must be in all | ||
the rooms given. | ||
) -> Mapping[str, Union[Optional[str], Sentinel]]: | ||
""" | ||
Bulk fetch room types for the given rooms. | ||
Since this function is cached, any missing values would be cached as `None`. In | ||
order to distinguish between an unencrypted room that has `None` encryption and | ||
a room that is unknown to the server where we might want to omit the value | ||
(which would make it cached as `None`), instead we use the sentinel value | ||
`ROOM_UNKNOWN_SENTINEL`. | ||
Returns: | ||
A mapping from room ID to the room's type (`None` is a valid room type). | ||
Rooms unknown to this server will return `ROOM_UNKNOWN_SENTINEL`. | ||
""" | ||
|
||
rows = await self.db_pool.simple_select_many_batch( | ||
|
@@ -342,9 +366,137 @@ async def bulk_get_room_type( | |
# mind if we do this in a loop. | ||
results = dict(rows) | ||
for room_id in room_ids - results.keys(): | ||
create_event = await self.get_create_event_for_room(room_id) | ||
room_type = create_event.content.get(EventContentFields.ROOM_TYPE) | ||
results[room_id] = room_type | ||
try: | ||
create_event = await self.get_create_event_for_room(room_id) | ||
room_type = create_event.content.get(EventContentFields.ROOM_TYPE) | ||
results[room_id] = room_type | ||
except NotFoundError: | ||
# We use the sentinel value to distinguish between `None` which is a | ||
# valid room type and a room that is unknown to the server so the value | ||
# is just unset. | ||
results[room_id] = ROOM_UNKNOWN_SENTINEL | ||
|
||
return results | ||
|
||
@cached(max_entries=10000) | ||
async def get_room_encryption(self, room_id: str) -> Optional[str]: | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Get the encryption algorithm for a given room. | ||
Returns: | ||
The encryption algorithm if the room is encrypted, otherwise `None`. | ||
Raises: | ||
NotFoundError if the room is unknown | ||
""" | ||
|
||
row = await self.db_pool.simple_select_one( | ||
table="room_stats_state", | ||
keyvalues={"room_id": room_id}, | ||
retcols=("encryption",), | ||
allow_none=True, | ||
desc="get_room_is_encrypted", | ||
) | ||
|
||
if row is not None: | ||
return row[0] | ||
|
||
# If we haven't updated `room_stats_state` with the room yet, query the state | ||
# directly. | ||
state_map = await self.get_partial_filtered_current_state_ids( | ||
room_id, | ||
state_filter=StateFilter.from_types( | ||
[ | ||
(EventTypes.Create, ""), | ||
(EventTypes.RoomEncryption, ""), | ||
] | ||
), | ||
) | ||
# We can use the create event as a canary to tell whether the server has seen | ||
# the room before | ||
create_event_id = state_map.get((EventTypes.Create, "")) | ||
encryption_event_id = state_map.get((EventTypes.RoomEncryption, "")) | ||
if create_event_id is None: | ||
raise NotFoundError( | ||
f"Unknown room {room_id} does not have any state to determine room encryption" | ||
) | ||
|
||
if encryption_event_id is None: | ||
return None | ||
|
||
encryption_event = await self.get_event(encryption_event_id) | ||
return encryption_event.content.get(EventContentFields.ENCRYPTION_ALGORITHM) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we have a shortcut to check the |
||
@cachedList(cached_method_name="get_room_encryption", list_name="room_ids") | ||
async def bulk_get_room_encryption( | ||
self, room_ids: Set[str] | ||
) -> Mapping[str, Union[Optional[str], Sentinel]]: | ||
""" | ||
Bulk fetch room encryption for the given rooms. | ||
Since this function is cached, any missing values would be cached as `None`. In | ||
order to distinguish between an unencrypted room that has `None` encryption and | ||
a room that is unknown to the server where we might want to omit the value | ||
(which would make it cached as `None`), instead we use the sentinel value | ||
`ROOM_UNKNOWN_SENTINEL`. | ||
Returns: | ||
A mapping from room ID to the room's encryption algorithm if the room is | ||
encrypted, otherwise `None`. Rooms unknown to this server will return | ||
`ROOM_UNKNOWN_SENTINEL`. | ||
""" | ||
|
||
rows = await self.db_pool.simple_select_many_batch( | ||
table="room_stats_state", | ||
column="room_id", | ||
iterable=room_ids, | ||
retcols=("room_id", "encryption"), | ||
desc="bulk_get_encryption", | ||
) | ||
|
||
# If we haven't updated `room_stats_state` with the room yet, query the state | ||
# directly. This should happen only rarely so we don't mind if we do this in a | ||
# loop. | ||
results = dict(rows) | ||
encryption_event_ids: List[str] = [] | ||
for room_id in room_ids - results.keys(): | ||
state_map = await self.get_partial_filtered_current_state_ids( | ||
room_id, | ||
state_filter=StateFilter.from_types( | ||
[ | ||
(EventTypes.Create, ""), | ||
(EventTypes.RoomEncryption, ""), | ||
] | ||
), | ||
) | ||
# We can use the create event as a canary to tell whether the server has | ||
# seen the room before | ||
create_event_id = state_map.get((EventTypes.Create, "")) | ||
encryption_event_id = state_map.get((EventTypes.RoomEncryption, "")) | ||
|
||
if create_event_id is None: | ||
# We use the sentinel value to distinguish between `None` which is a | ||
# valid room type and a room that is unknown to the server so the value | ||
# is just unset. | ||
results[room_id] = ROOM_UNKNOWN_SENTINEL | ||
continue | ||
|
||
if encryption_event_id is None: | ||
results[room_id] = None | ||
else: | ||
encryption_event_ids.append(encryption_event_id) | ||
|
||
encryption_event_map = await self.get_events(encryption_event_ids) | ||
|
||
for encryption_event_id in encryption_event_ids: | ||
encryption_event = encryption_event_map.get(encryption_event_id) | ||
# If the state curent state says there is an encryption event, we should | ||
# have it in the database. | ||
assert encryption_event is not None | ||
|
||
results[encryption_event.room_id] = encryption_event.content.get( | ||
EventContentFields.ENCRYPTION_ALGORITHM | ||
) | ||
|
||
return results | ||
|
||
|
Large diffs are not rendered by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
is_encrypted
filter androom_types
filter both share the same challenges in fetching room state so solving one isn't more work than the other.We will also need to apply these patterns for the other filters like
room_name_like
so figuring out how to share the work between filters also makes sense.We're going to implement these other filters at some point anyway.