Skip to content

Commit

Permalink
fix(federation): Fix updating PCM after editing or deleting a message
Browse files Browse the repository at this point in the history
Signed-off-by: Joas Schilling <coding@schilljs.com>
  • Loading branch information
nickvergessen committed Mar 18, 2024
1 parent a330ccc commit 8cacba6
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 65 deletions.
4 changes: 2 additions & 2 deletions lib/Chat/ChatManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public function addSystemMessage(

$shouldFlush = $this->notificationManager->defer();

$event = new BeforeSystemMessageSentEvent($chat, $comment, silent: $silent, skipLastActivityUpdate: $shouldSkipLastMessageUpdate, parent: $replyTo);
$event = new BeforeSystemMessageSentEvent($chat, $comment, silent: $silent, parent: $replyTo, skipLastActivityUpdate: $shouldSkipLastMessageUpdate);
$this->dispatcher->dispatchTyped($event);
try {
$this->commentsManager->save($comment);
Expand Down Expand Up @@ -229,7 +229,7 @@ public function addSystemMessage(
}
}

$event = new SystemMessageSentEvent($chat, $comment, silent: $silent, skipLastActivityUpdate: $shouldSkipLastMessageUpdate, parent: $replyTo);
$event = new SystemMessageSentEvent($chat, $comment, silent: $silent, parent: $replyTo, skipLastActivityUpdate: $shouldSkipLastMessageUpdate);
$this->dispatcher->dispatchTyped($event);
} catch (NotFoundException $e) {
}
Expand Down
126 changes: 80 additions & 46 deletions lib/Federation/CloudFederationProviderTalk.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
use OCA\Talk\Events\AAttendeeRemovedEvent;
use OCA\Talk\Events\ARoomModifiedEvent;
use OCA\Talk\Events\AttendeesAddedEvent;
use OCA\Talk\Exceptions\CannotReachRemoteException;
use OCA\Talk\Exceptions\ParticipantNotFoundException;
use OCA\Talk\Exceptions\RoomNotFoundException;
use OCA\Talk\Federation\Proxy\TalkV1\UserConverter;
Expand All @@ -46,6 +47,7 @@
use OCA\Talk\Participant;
use OCA\Talk\Room;
use OCA\Talk\Service\ParticipantService;
use OCA\Talk\Service\ProxyCacheMessageService;
use OCA\Talk\Service\RoomService;
use OCP\AppFramework\Db\DoesNotExistException;
use OCP\AppFramework\Http;
Expand Down Expand Up @@ -90,6 +92,7 @@ public function __construct(
private IEventDispatcher $dispatcher,
private LoggerInterface $logger,
private ProxyCacheMessageMapper $proxyCacheMessageMapper,
private ProxyCacheMessageService $pcmService,
private FederationChatNotifier $federationChatNotifier,
private UserConverter $userConverter,
ICacheFactory $cacheFactory,
Expand Down Expand Up @@ -345,6 +348,15 @@ private function messagePosted(int $remoteAttendeeId, array $notification): arra
throw new ShareNotFound(FederationManager::OCM_RESOURCE_NOT_FOUND);
}

$removeParentMessage = null;
if ($notification['messageData']['systemMessage'] === 'message_edited'
|| $notification['messageData']['systemMessage'] === 'message_deleted') {
$metaData = json_decode($notification['messageData']['metaData'], true);
if (isset($metaData['replyToMessageId'])) {
$removeParentMessage = $metaData['replyToMessageId'];
}
}

// We transform the parameters when storing in the PCM, so we only have
// to do it once for each message.
// Note: `messageParameters` (array during parsing) vs `messageParameter` (string during sending)
Expand All @@ -357,55 +369,58 @@ private function messagePosted(int $remoteAttendeeId, array $notification): arra
/** @var array{remoteMessageId: int, actorType: string, actorId: string, actorDisplayName: string, messageType: string, systemMessage: string, expirationDatetime: string, message: string, messageParameter: string, creationDatetime: string, metaData: string} $converted */
$notification['messageData'] = $converted;


$message = new ProxyCacheMessage();
$message->setLocalToken($room->getToken());
$message->setRemoteServerUrl($notification['remoteServerUrl']);
$message->setRemoteToken($notification['remoteToken']);
$message->setRemoteMessageId($notification['messageData']['remoteMessageId']);
$message->setActorType($notification['messageData']['actorType']);
$message->setActorId($notification['messageData']['actorId']);
$message->setActorDisplayName($notification['messageData']['actorDisplayName']);
$message->setMessageType($notification['messageData']['messageType']);
$message->setSystemMessage($notification['messageData']['systemMessage']);
if ($notification['messageData']['expirationDatetime']) {
$message->setExpirationDatetime(new \DateTime($notification['messageData']['expirationDatetime']));
}
$message->setMessage($notification['messageData']['message']);
$message->setMessageParameters($notification['messageData']['messageParameter']);
$message->setCreationDatetime(new \DateTime($notification['messageData']['creationDatetime']));
$message->setMetaData($notification['messageData']['metaData']);

try {
$this->proxyCacheMessageMapper->insert($message);

$lastMessageId = $room->getLastMessageId();
if ($notification['messageData']['remoteMessageId'] > $lastMessageId) {
$lastMessageId = (int) $notification['messageData']['remoteMessageId'];
$message = null;
if ($removeParentMessage === null) {
$message = new ProxyCacheMessage();
$message->setLocalToken($room->getToken());
$message->setRemoteServerUrl($notification['remoteServerUrl']);
$message->setRemoteToken($notification['remoteToken']);
$message->setRemoteMessageId($notification['messageData']['remoteMessageId']);
$message->setActorType($notification['messageData']['actorType']);
$message->setActorId($notification['messageData']['actorId']);
$message->setActorDisplayName($notification['messageData']['actorDisplayName']);
$message->setMessageType($notification['messageData']['messageType']);
$message->setSystemMessage($notification['messageData']['systemMessage']);
if ($notification['messageData']['expirationDatetime']) {
$message->setExpirationDatetime(new \DateTime($notification['messageData']['expirationDatetime']));
}
$this->roomService->setLastMessageInfo($room, $lastMessageId, new \DateTime());
$message->setMessage($notification['messageData']['message']);
$message->setMessageParameters($notification['messageData']['messageParameter']);
$message->setCreationDatetime(new \DateTime($notification['messageData']['creationDatetime']));
$message->setMetaData($notification['messageData']['metaData']);

if ($this->proxyCacheMessages instanceof ICache) {
$cacheKey = sha1(json_encode([$notification['remoteServerUrl'], $notification['remoteToken']]));
$cacheData = $this->proxyCacheMessages->get($cacheKey);
if ($cacheData === null || $cacheData < $notification['messageData']['remoteMessageId']) {
$this->proxyCacheMessages->set($cacheKey, $notification['messageData']['remoteMessageId'], 300);
try {
$this->proxyCacheMessageMapper->insert($message);

$lastMessageId = $room->getLastMessageId();
if ($notification['messageData']['remoteMessageId'] > $lastMessageId) {
$lastMessageId = (int) $notification['messageData']['remoteMessageId'];
}
$this->roomService->setLastMessageInfo($room, $lastMessageId, new \DateTime());

if ($this->proxyCacheMessages instanceof ICache) {
$cacheKey = sha1(json_encode([$notification['remoteServerUrl'], $notification['remoteToken']]));
$cacheData = $this->proxyCacheMessages->get($cacheKey);
if ($cacheData === null || $cacheData < $notification['messageData']['remoteMessageId']) {
$this->proxyCacheMessages->set($cacheKey, $notification['messageData']['remoteMessageId'], 300);
}
}
} catch (DBException $e) {
// DBException::REASON_UNIQUE_CONSTRAINT_VIOLATION happens when
// multiple users are in the same conversation. We are therefore
// informed multiple times about the same remote message.
if ($e->getReason() !== DBException::REASON_UNIQUE_CONSTRAINT_VIOLATION) {
$this->logger->error('Error saving proxy cache message failed: ' . $e->getMessage(), ['exception' => $e]);
throw $e;
}
}
} catch (DBException $e) {
// DBException::REASON_UNIQUE_CONSTRAINT_VIOLATION happens when
// multiple users are in the same conversation. We are therefore
// informed multiple times about the same remote message.
if ($e->getReason() !== DBException::REASON_UNIQUE_CONSTRAINT_VIOLATION) {
$this->logger->error('Error saving proxy cache message failed: ' . $e->getMessage(), ['exception' => $e]);
throw $e;
}

$message = $this->proxyCacheMessageMapper->findByRemote(
$notification['remoteServerUrl'],
$notification['remoteToken'],
$notification['messageData']['remoteMessageId'],
);
$message = $this->pcmService->findByRemote(
$notification['remoteServerUrl'],
$notification['remoteToken'],
$notification['messageData']['remoteMessageId'],
);
}
} else {
}

try {
Expand All @@ -415,6 +430,23 @@ private function messagePosted(int $remoteAttendeeId, array $notification): arra
return [];
}

if ($removeParentMessage !== null) {
try {
$this->pcmService->syncRemoteMessage($room, $participant, $removeParentMessage);
} catch (\InvalidArgumentException|CannotReachRemoteException) {
$oldMessage = $this->pcmService->findByRemote(
$notification['remoteServerUrl'],
$notification['remoteToken'],
$removeParentMessage,
);
$this->pcmService->delete($oldMessage);
$this->logger->info('Failed to resync chat message #' . $removeParentMessage . ' after being notified by host ' . $notification['remoteServerUrl']);
}

// Update the last activity so the left sidebar refreshes the data as well
$this->roomService->setLastMessageInfo($room, $room->getLastMessageId(), new \DateTime());
}

$this->logger->debug('Setting unread info for local federated user ' . $invite->getUserId() . ' in ' . $room->getToken() . ' to ' . json_encode($notification['unreadInfo']), [
'app' => 'spreed-federation',
]);
Expand All @@ -427,7 +459,9 @@ private function messagePosted(int $remoteAttendeeId, array $notification): arra
$notification['unreadInfo']['lastReadMessage'],
);

$this->federationChatNotifier->handleChatMessage($room, $participant, $message, $notification);
if ($message instanceof ProxyCacheMessage) {
$this->federationChatNotifier->handleChatMessage($room, $participant, $message, $notification);
}

return [];
}
Expand Down
19 changes: 12 additions & 7 deletions lib/Federation/Proxy/TalkV1/Notifier/MessageSentListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ public function handle(Event $event): void {
return;
}

if ($event instanceof ASystemMessageSentEvent && $event->shouldSkipLastActivityUpdate()) {
return;
}

// FIXME once we store/cache the info skip this if the room has no federation participant
// if (!$event->getRoom()->hasFederatedParticipants()) {
// return;
Expand All @@ -76,6 +72,14 @@ public function handle(Event $event): void {
$chatMessage = $this->messageParser->createMessage($event->getRoom(), null, $event->getComment(), $l);
$this->messageParser->parseMessage($chatMessage);

$systemMessage = $chatMessage->getMessageType() === ChatManager::VERB_SYSTEM ? $chatMessage->getMessageRaw() : '';
if ($systemMessage !== 'message_edited'
&& $systemMessage !== 'message_deleted'
&& $event instanceof ASystemMessageSentEvent
&& $event->shouldSkipLastActivityUpdate()) {
return;
}

if (!$chatMessage->getVisibility()) {
return;
}
Expand All @@ -86,8 +90,9 @@ public function handle(Event $event): void {
$metaData = $event->getComment()->getMetaData() ?? [];
$parent = $event->getParent();
if ($parent instanceof IComment) {
$metaData[ProxyCacheMessage::METADATA_REPLYTO_TYPE] = $parent->getActorType();
$metaData[ProxyCacheMessage::METADATA_REPLYTO_ID] = $parent->getActorId();
$metaData[ProxyCacheMessage::METADATA_REPLY_TO_ACTOR_TYPE] = $parent->getActorType();
$metaData[ProxyCacheMessage::METADATA_REPLY_TO_ACTOR_ID] = $parent->getActorId();
$metaData[ProxyCacheMessage::METADATA_REPLY_TO_MESSAGE_ID] = (int) $parent->getId();
}

$messageData = [
Expand All @@ -96,7 +101,7 @@ public function handle(Event $event): void {
'actorId' => $chatMessage->getActorId(),
'actorDisplayName' => $chatMessage->getActorDisplayName(),
'messageType' => $chatMessage->getMessageType(),
'systemMessage' => $chatMessage->getMessageType() === ChatManager::VERB_SYSTEM ? $chatMessage->getMessageRaw() : '',
'systemMessage' => $systemMessage,
'expirationDatetime' => $expireDate ? $expireDate->format(\DateTime::ATOM) : '',
'message' => $chatMessage->getMessage(),
'messageParameter' => json_encode($chatMessage->getMessageParameters(), JSON_THROW_ON_ERROR),
Expand Down
5 changes: 3 additions & 2 deletions lib/Model/ProxyCacheMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@
* @psalm-import-type TalkRoomProxyMessage from ResponseDefinitions
*/
class ProxyCacheMessage extends Entity implements \JsonSerializable {
public const METADATA_REPLYTO_TYPE = 'replyToActorType';
public const METADATA_REPLYTO_ID = 'replyToActorId';
public const METADATA_REPLY_TO_ACTOR_TYPE = 'replyToActorType';
public const METADATA_REPLY_TO_ACTOR_ID = 'replyToActorId';
public const METADATA_REPLY_TO_MESSAGE_ID = 'replyToMessageId';


protected string $localToken = '';
Expand Down
8 changes: 4 additions & 4 deletions lib/Notification/FederationChatNotifier.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ public function handleChatMessage(Room $room, Participant $participant, ProxyCac
* @param array{silent?: bool, last_edited_time?: int, last_edited_by_type?: string, last_edited_by_id?: string, replyToActorType?: string, replyToActorId?: string} $metaData
*/
protected function isRepliedTo(Room $room, Participant $participant, array $metaData): bool {
if (!isset($metaData[ProxyCacheMessage::METADATA_REPLYTO_TYPE])
|| !isset($metaData[ProxyCacheMessage::METADATA_REPLYTO_ID])
|| $metaData[ProxyCacheMessage::METADATA_REPLYTO_TYPE] !== Attendee::ACTOR_FEDERATED_USERS) {
if (!isset($metaData[ProxyCacheMessage::METADATA_REPLY_TO_ACTOR_TYPE])
|| !isset($metaData[ProxyCacheMessage::METADATA_REPLY_TO_ACTOR_ID])
|| $metaData[ProxyCacheMessage::METADATA_REPLY_TO_ACTOR_TYPE] !== Attendee::ACTOR_FEDERATED_USERS) {
return false;
}

$repliedTo = $this->userConverter->convertTypeAndId($room, $metaData[ProxyCacheMessage::METADATA_REPLYTO_TYPE], $metaData[ProxyCacheMessage::METADATA_REPLYTO_ID]);
$repliedTo = $this->userConverter->convertTypeAndId($room, $metaData[ProxyCacheMessage::METADATA_REPLY_TO_ACTOR_TYPE], $metaData[ProxyCacheMessage::METADATA_REPLY_TO_ACTOR_ID]);
return $repliedTo['type'] === $participant->getAttendee()->getActorType()
&& $repliedTo['id'] === $participant->getAttendee()->getActorId();
}
Expand Down
16 changes: 15 additions & 1 deletion lib/Service/ProxyCacheMessageService.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public function findByRemote(string $remoteServerUrl, string $remoteToken, int $
return $this->mapper->findByRemote($remoteServerUrl, $remoteToken, $remoteMessageId);
}

public function delete(ProxyCacheMessage $message): ProxyCacheMessage {
return $this->mapper->delete($message);
}

public function deleteExpiredMessages(): void {
$this->mapper->deleteExpiredMessages($this->timeFactory->getDateTime());
}
Expand All @@ -81,7 +85,12 @@ public function syncRemoteMessage(Room $room, Participant $participant, int $mes
/** @var TalkChatMessageWithParent $messageData */
$messageData = $ocsResponse->getData()[0];

$proxy = new ProxyCacheMessage();
try {
$proxy = $this->mapper->findByRemote($room->getRemoteServer(), $room->getRemoteToken(), $messageId);
} catch (DoesNotExistException) {
$proxy = new ProxyCacheMessage();
}

$proxy->setLocalToken($room->getToken());
$proxy->setRemoteServerUrl($room->getRemoteServer());
$proxy->setRemoteToken($room->getRemoteToken());
Expand Down Expand Up @@ -111,6 +120,11 @@ public function syncRemoteMessage(Room $room, Participant $participant, int $mes
}
$proxy->setMetaData(json_encode($metaData));

if ($proxy->getId() !== null) {
$this->mapper->update($proxy);
return $proxy;
}

try {
$this->mapper->insert($proxy);
} catch (DBException $e) {
Expand Down
2 changes: 1 addition & 1 deletion lib/Service/RoomFormatter.php
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public function formatRoomV4(
$room->getLastMessageId(),
);
$roomData['lastMessage'] = $this->userConverter->convertAttendee($room, $cachedMessage->jsonSerialize(), 'actorType', 'actorId', 'actorDisplayName');
} catch (DoesNotExistException $e) {
} catch (DoesNotExistException) {
}
}

Expand Down
16 changes: 14 additions & 2 deletions tests/integration/features/federation/chat.feature
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,16 @@ Feature: federation/chat
| id | name | type | remoteServer | remoteToken |
| room | room | 2 | LOCAL | room |
Then user "participant2" is participant of the following rooms (v4)
| id | type |
| room | 2 |
| id | type | lastMessage |
| room | 2 | {federated_user} accepted the invitation |
And user "participant1" sends message "Message 1" to room "room" with 201
Then user "participant2" is participant of the following rooms (v4)
| id | type | lastMessage |
| room | 2 | Message 1 |
When user "participant2" sends reply "Message 1-1" on message "Message 1" to room "LOCAL::room" with 201
Then user "participant2" is participant of the following rooms (v4)
| id | type | lastMessage |
| room | 2 | Message 1-1 |
Then user "participant1" sees the following messages in room "room" with 200
| room | actorType | actorId | actorDisplayName | message | messageParameters | parentMessage |
| room | federated_users | participant2@{$REMOTE_URL} | participant2-displayname | Message 1-1 | [] | Message 1 |
Expand All @@ -103,6 +109,9 @@ Feature: federation/chat
| room | federated_users | participant1@{$BASE_URL} | participant1-displayname | Message 1 | [] | |
And user "participant1" edits message "Message 1" in room "room" to "Message 1 - Edit 1" with 200
And user "participant2" edits message "Message 1-1" in room "LOCAL::room" to "Message 1-1 - Edit 1" with 200
Then user "participant2" is participant of the following rooms (v4)
| id | type | lastMessage |
| room | 2 | Message 1-1 - Edit 1 |
Then user "participant1" sees the following messages in room "room" with 200
| room | actorType | actorId | actorDisplayName | message | messageParameters | parentMessage | lastEditActorType | lastEditActorId | lastEditActorDisplayName |
| room | federated_users | participant2@{$REMOTE_URL} | participant2-displayname | Message 1-1 - Edit 1 | [] | Message 1 - Edit 1 | federated_users | participant2@{$REMOTE_URL} | participant2-displayname |
Expand All @@ -125,6 +134,9 @@ Feature: federation/chat
| room | actorType | actorId | actorDisplayName | message | messageParameters | parentMessage |
| room | users | participant2 | participant2-displayname | Message deleted by you | {"actor":{"type":"user","id":"participant2","name":"participant2-displayname"}} | Message deleted by author |
| room | federated_users | participant1@{$BASE_URL} | participant1-displayname | Message deleted by author | {"actor":{"type":"user","id":"participant1","name":"participant1-displayname","server":"{$BASE_URL}"}} | |
Then user "participant2" is participant of the following rooms (v4)
| id | type | lastMessage |
| room | 2 | Message deleted by author |

Scenario: Read marker checking
Given the following "spreed" app config is set
Expand Down

0 comments on commit 8cacba6

Please sign in to comment.