Skip to content

Commit

Permalink
Merge pull request #11662 from nextcloud/feat/noid/proxy-cache-messages
Browse files Browse the repository at this point in the history
feat(federation): Add a proxy cache for messages
  • Loading branch information
nickvergessen authored Feb 28, 2024
2 parents 39f2ae5 + 84192eb commit 78e0482
Show file tree
Hide file tree
Showing 15 changed files with 507 additions and 14 deletions.
2 changes: 1 addition & 1 deletion appinfo/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ And in the works for the [coming versions](https://github.com/nextcloud/spreed/m
]]></description>

<version>19.0.0-dev.2</version>
<version>19.0.0-dev.3</version>
<licence>agpl</licence>

<author>Daniel Calviño Sánchez</author>
Expand Down
8 changes: 6 additions & 2 deletions lib/AppInfo/Application.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@
use OCA\Talk\Events\SystemMessagesMultipleSentEvent;
use OCA\Talk\Events\UserJoinedRoomEvent;
use OCA\Talk\Federation\CloudFederationProviderTalk;
use OCA\Talk\Federation\Listener as FederationListener;
use OCA\Talk\Federation\Proxy\TalkV1\Notifier\MessageSentListener as TalkV1MessageSentListener;
use OCA\Talk\Federation\Proxy\TalkV1\Notifier\RoomModifiedListener as TalkV1RoomModifiedListener;
use OCA\Talk\Files\Listener as FilesListener;
use OCA\Talk\Files\TemplateLoader as FilesTemplateLoader;
use OCA\Talk\Flow\RegisterOperationsListener;
Expand Down Expand Up @@ -278,7 +279,10 @@ public function register(IRegistrationContext $context): void {
$context->registerEventListener(TranscriptionFailedEvent::class, RecordingListener::class);

// Federation listeners
$context->registerEventListener(RoomModifiedEvent::class, FederationListener::class);
$context->registerEventListener(RoomModifiedEvent::class, TalkV1RoomModifiedListener::class);
$context->registerEventListener(ChatMessageSentEvent::class, TalkV1MessageSentListener::class);
$context->registerEventListener(SystemMessageSentEvent::class, TalkV1MessageSentListener::class);
$context->registerEventListener(SystemMessagesMultipleSentEvent::class, TalkV1MessageSentListener::class);

// Signaling listeners (External)
$context->registerEventListener(AttendeesAddedEvent::class, SignalingListener::class);
Expand Down
2 changes: 1 addition & 1 deletion lib/Chat/Parser/SystemMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ protected function getGuestName(Room $room, string $actorType, string $actorId):
}
}

protected function parseMissedCall(Room $room, array $parameters, string $currentActorId): array {
protected function parseMissedCall(Room $room, array $parameters, ?string $currentActorId): array {
if ($parameters['users'][0] !== $currentActorId) {
return [
$this->l->t('You missed a call from {user}'),
Expand Down
30 changes: 30 additions & 0 deletions lib/Federation/BackendNotifier.php
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,36 @@ public function sendRoomModifiedUpdate(
$this->sendUpdateToRemote($remote, $notification);
}

/**
* Send information to remote participants that a message was posted
* Sent from Host server to Remote participant server
*/
public function sendMessageUpdate(
string $remoteServer,
int $localAttendeeId,
#[SensitiveParameter]
string $accessToken,
string $localToken,
array $messageData,
): void {
$remote = $this->prepareRemoteUrl($remoteServer);

$notification = $this->cloudFederationFactory->getCloudFederationNotification();
$notification->setMessage(
FederationManager::NOTIFICATION_MESSAGE_POSTED,
FederationManager::TALK_ROOM_RESOURCE,
(string) $localAttendeeId,
[
'remoteServerUrl' => $this->getServerRemoteUrl(),
'sharedSecret' => $accessToken,
'remoteToken' => $localToken,
'messageData' => $messageData,
],
);

$this->sendUpdateToRemote($remote, $notification);
}

/**
* @param string $remote
* @param array{notificationType: string, resourceType: string, providerId: string, notification: array} $data
Expand Down
57 changes: 57 additions & 0 deletions lib/Federation/CloudFederationProviderTalk.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
use OCA\Talk\Model\AttendeeMapper;
use OCA\Talk\Model\Invitation;
use OCA\Talk\Model\InvitationMapper;
use OCA\Talk\Model\ProxyCacheMessages;
use OCA\Talk\Model\ProxyCacheMessagesMapper;
use OCA\Talk\Participant;
use OCA\Talk\Room;
use OCA\Talk\Service\ParticipantService;
Expand All @@ -55,6 +57,8 @@
use OCP\Federation\ICloudFederationShare;
use OCP\Federation\ICloudIdManager;
use OCP\HintException;
use OCP\ICache;
use OCP\ICacheFactory;
use OCP\ISession;
use OCP\IUser;
use OCP\IUserManager;
Expand All @@ -64,6 +68,7 @@
use SensitiveParameter;

class CloudFederationProviderTalk implements ICloudFederationProvider {
protected ?ICache $proxyCacheMessages;

public function __construct(
private ICloudIdManager $cloudIdManager,
Expand All @@ -81,7 +86,10 @@ public function __construct(
private ISession $session,
private IEventDispatcher $dispatcher,
private LoggerInterface $logger,
private ProxyCacheMessagesMapper $proxyCacheMessagesMapper,
ICacheFactory $cacheFactory,
) {
$this->proxyCacheMessages = $cacheFactory->isAvailable() ? $cacheFactory->createDistributed('talk/pcm/') : null;
}

/**
Expand Down Expand Up @@ -185,6 +193,8 @@ public function notificationReceived($notificationType, $providerId, array $noti
return $this->shareUnshared((int) $providerId, $notification);
case FederationManager::NOTIFICATION_ROOM_MODIFIED:
return $this->roomModified((int) $providerId, $notification);
case FederationManager::NOTIFICATION_MESSAGE_POSTED:
return $this->messagePosted((int) $providerId, $notification);
}

throw new BadRequestException([$notificationType]);
Expand Down Expand Up @@ -297,6 +307,53 @@ private function roomModified(int $remoteAttendeeId, array $notification): array
return [];
}

/**
* @param int $remoteAttendeeId
* @param array{remoteServerUrl: string, sharedSecret: string, remoteToken: string, messageData: array{remoteMessageId: int, actorType: string, actorId: string, actorDisplayName: string, messageType: string, systemMessage: string, expirationDatetime: string, message: string, messageParameter: string}} $notification
* @return array
* @throws ActionNotSupportedException
* @throws AuthenticationFailedException
* @throws ShareNotFound
*/
private function messagePosted(int $remoteAttendeeId, array $notification): array {
$invite = $this->getByRemoteAttendeeAndValidate($notification['remoteServerUrl'], $remoteAttendeeId, $notification['sharedSecret']);
try {
$room = $this->manager->getRoomById($invite->getLocalRoomId());
} catch (RoomNotFoundException) {
throw new ShareNotFound();
}

// Sanity check to make sure the room is a remote room
if (!$room->isFederatedRemoteRoom()) {
throw new ShareNotFound();
}

$message = new ProxyCacheMessages();
$message->setLocalToken($room->getToken());
$message->setRemoteServerUrl($notification['remoteServerUrl']);
$message->setRemoteToken($notification['remoteToken']);
$message->setRemoteMessageId($notification['messageData']['remoteMessageId']);
$message->setActorType($notification['messageData']['actorType']);
$message->setActorId($notification['messageData']['actorId']);
$message->setActorDisplayName($notification['messageData']['actorDisplayName']);
$message->setMessageType($notification['messageData']['messageType']);
$message->setSystemMessage($notification['messageData']['systemMessage']);
$message->setExpirationDateTime(new \DateTimeImmutable($notification['messageData']['expirationDatetime']));
$message->setMessage($notification['messageData']['message']);
$message->setMessageParameters($notification['messageData']['messageParameter']);
$this->proxyCacheMessagesMapper->insert($message);

if ($this->proxyCacheMessages instanceof ICache) {
$cacheKey = sha1(json_encode([$notification['remoteServerUrl'], $notification['remoteToken']]));
$cacheData = $this->proxyCacheMessages->get($cacheKey);
if ($cacheData === null || $cacheData < $notification['messageData']['remoteMessageId']) {
$this->proxyCacheMessages->set($cacheKey, $notification['messageData']['remoteMessageId'], 300);
}
}

return [];
}

/**
* @throws AuthenticationFailedException
* @throws ActionNotSupportedException
Expand Down
1 change: 1 addition & 0 deletions lib/Federation/FederationManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class FederationManager {
public const NOTIFICATION_SHARE_DECLINED = 'SHARE_DECLINED';
public const NOTIFICATION_SHARE_UNSHARED = 'SHARE_UNSHARED';
public const NOTIFICATION_ROOM_MODIFIED = 'ROOM_MODIFIED';
public const NOTIFICATION_MESSAGE_POSTED = 'MESSAGE_POSTED';
public const TOKEN_LENGTH = 64;

public function __construct(
Expand Down
34 changes: 29 additions & 5 deletions lib/Federation/Proxy/TalkV1/Controller/ChatController.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,22 @@
use OCA\Talk\Room;
use OCP\AppFramework\Http;
use OCP\AppFramework\Http\DataResponse;
use OCP\ICache;
use OCP\ICacheFactory;

/**
* @psalm-import-type TalkChatMentionSuggestion from ResponseDefinitions
* @psalm-import-type TalkChatMessageWithParent from ResponseDefinitions
*/
class ChatController {
protected ?ICache $proxyCacheMessages;

public function __construct(
protected ProxyRequest $proxy,
protected UserConverter $userConverter,
ICacheFactory $cacheFactory,
) {
$this->proxyCacheMessages = $cacheFactory->isAvailable() ? $cacheFactory->createDistributed('talk/pcm/') : null;
}

/**
Expand Down Expand Up @@ -126,11 +132,23 @@ public function receiveMessages(
int $includeLastKnown,
int $noStatusUpdate,
int $markNotificationsAsRead): DataResponse {

// FIXME
// Poor-mans timeout, should later on cancel/trigger earlier,
// when we received a OCM message notifying us about a chat message
sleep(max(0, $timeout - 5));
$cacheKey = sha1(json_encode([$room->getRemoteServer(), $room->getRemoteToken()]));

if ($lookIntoFuture) {
if ($this->proxyCacheMessages instanceof ICache) {
for ($i = 0; $i <= $timeout; $i++) {
$cacheData = (int) $this->proxyCacheMessages->get($cacheKey);
if ($lastKnownMessageId !== $cacheData) {
break;
}
sleep(1);
}
} else {
// Poor-mans timeout, should later on cancel/trigger earlier,
// by checking the PCM database table
sleep(max(0, $timeout - 5));
}
}

$proxy = $this->proxy->get(
$participant->getAttendee()->getInvitedCloudId(),
Expand Down Expand Up @@ -159,6 +177,12 @@ public function receiveMessages(
}
if ($proxy->getHeader('X-Chat-Last-Given')) {
$headers['X-Chat-Last-Given'] = (string) (int) $proxy->getHeader('X-Chat-Last-Given');
if ($this->proxyCacheMessages instanceof ICache) {
$cacheData = $this->proxyCacheMessages->get($cacheKey);
if ($cacheData === null || $cacheData < $headers['X-Chat-Last-Given']) {
$this->proxyCacheMessages->set($cacheKey, (int) $headers['X-Chat-Last-Given'], 300);
}
}
}

/** @var TalkChatMessageWithParent[] $data */
Expand Down
112 changes: 112 additions & 0 deletions lib/Federation/Proxy/TalkV1/Notifier/MessageSentListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
<?php

declare(strict_types=1);

/**
* @copyright Copyright (c) 2024 Joas Schilling <coding@schilljs.com>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

namespace OCA\Talk\Federation\Proxy\TalkV1\Notifier;

use OCA\Talk\Chat\ChatManager;
use OCA\Talk\Chat\MessageParser;
use OCA\Talk\Events\ASystemMessageSentEvent;
use OCA\Talk\Events\ChatMessageSentEvent;
use OCA\Talk\Events\SystemMessageSentEvent;
use OCA\Talk\Events\SystemMessagesMultipleSentEvent;
use OCA\Talk\Federation\BackendNotifier;
use OCA\Talk\Model\Attendee;
use OCA\Talk\Service\ParticipantService;
use OCP\EventDispatcher\Event;
use OCP\EventDispatcher\IEventListener;
use OCP\Federation\ICloudIdManager;
use OCP\L10N\IFactory;

/**
* @template-implements IEventListener<Event>
*/
class MessageSentListener implements IEventListener {
public function __construct(
protected BackendNotifier $backendNotifier,
protected ParticipantService $participantService,
protected ICloudIdManager $cloudIdManager,
protected MessageParser $messageParser,
protected IFactory $l10nFactory,
) {
}

public function handle(Event $event): void {
if (!$event instanceof ChatMessageSentEvent
&& !$event instanceof SystemMessageSentEvent
&& !$event instanceof SystemMessagesMultipleSentEvent) {
return;
}

if ($event instanceof ASystemMessageSentEvent && $event->shouldSkipLastActivityUpdate()) {
return;
}

// FIXME once we store/cache the info skip this if the room has no federation participant
// if (!$event->getRoom()->hasFederatedParticipants()) {
// return;
// }

// Try to have as neutral as possible messages
$l = $this->l10nFactory->get('spreed', 'en', 'en');
$chatMessage = $this->messageParser->createMessage($event->getRoom(), null, $event->getComment(), $l);
$this->messageParser->parseMessage($chatMessage);

if (!$chatMessage->getVisibility()) {
return;
}

$expireDate = $event->getComment()->getExpireDate();

$messageData = [
'remoteMessageId' => (int) $event->getComment()->getId(),
'actorType' => $chatMessage->getActorType(),
'actorId' => $chatMessage->getActorId(),
'actorDisplayName' => $chatMessage->getActorDisplayName(),
'messageType' => $chatMessage->getMessageType(),
'systemMessage' => $chatMessage->getMessageType() === ChatManager::VERB_SYSTEM ? $chatMessage->getMessageRaw() : '',
'expirationDatetime' => $expireDate ? $expireDate->format(\DateTime::ATOM) : '',
'message' => $chatMessage->getMessage(),
'messageParameter' => json_encode($chatMessage->getMessageParameters()),
];

$notifiedServers = [];
$participants = $this->participantService->getParticipantsByActorType($event->getRoom(), Attendee::ACTOR_FEDERATED_USERS);
foreach ($participants as $participant) {
$cloudId = $this->cloudIdManager->resolveCloudId($participant->getAttendee()->getActorId());

if (isset($notifiedServers[$cloudId->getRemote()])) {
continue;
}
$notifiedServers[$cloudId->getRemote()] = true;

$this->backendNotifier->sendMessageUpdate(
$cloudId->getRemote(),
$participant->getAttendee()->getId(),
$participant->getAttendee()->getAccessToken(),
$event->getRoom()->getToken(),
$messageData,
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
*
*/

namespace OCA\Talk\Federation;
namespace OCA\Talk\Federation\Proxy\TalkV1\Notifier;

use OCA\Talk\Events\ARoomModifiedEvent;
use OCA\Talk\Events\RoomModifiedEvent;
use OCA\Talk\Federation\BackendNotifier;
use OCA\Talk\Model\Attendee;
use OCA\Talk\Service\ParticipantService;
use OCP\EventDispatcher\Event;
Expand All @@ -34,7 +35,7 @@
/**
* @template-implements IEventListener<Event>
*/
class Listener implements IEventListener {
class RoomModifiedListener implements IEventListener {
public function __construct(
protected BackendNotifier $backendNotifier,
protected ParticipantService $participantService,
Expand Down
Loading

0 comments on commit 78e0482

Please sign in to comment.