diff --git a/.github/workflows/publish_docs.yaml b/.github/workflows/publish_docs.yaml index f3d91b9..3a19ce9 100644 --- a/.github/workflows/publish_docs.yaml +++ b/.github/workflows/publish_docs.yaml @@ -1,6 +1,6 @@ - name: Deploy Docs on: + workflow_dispatch: push: tags: - "*" @@ -20,4 +20,4 @@ jobs: REPO: self BRANCH: gh-pages FOLDER: MembraneRTC/build/dokka/html - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/ConnectOptions.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/ConnectOptions.kt deleted file mode 100644 index d5a7abd..0000000 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/ConnectOptions.kt +++ /dev/null @@ -1,16 +0,0 @@ -package org.membraneframework.rtc - -import org.membraneframework.rtc.transport.EventTransport -import org.membraneframework.rtc.utils.Metadata - -/** - * A set of connect options used by the MembraneRTC when connecting to the media server. - * @property transport The transport implementation that will be used for media events exchanged by the client. Should be ready for connect method invocation - * @property config The metadata that will be used by the transport to connect - * @property encoderOptions The encoder options used to encode video - */ -data class ConnectOptions( - val transport: EventTransport, - val config: Metadata, - val encoderOptions: EncoderOptions = EncoderOptions() -) diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/CreateOptions.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/CreateOptions.kt new file mode 100644 index 0000000..85ac841 --- /dev/null +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/CreateOptions.kt @@ -0,0 +1,9 @@ +package org.membraneframework.rtc + +/** + * A set of options used by the MembraneRTC when creating the client. + * @property encoderOptions The encoder options used to encode video + */ +data class CreateOptions( + val encoderOptions: EncoderOptions = EncoderOptions() +) diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt index f79500f..7c2f8e5 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt @@ -11,13 +11,13 @@ import kotlinx.coroutines.sync.withLock import org.membraneframework.rtc.events.OfferData import org.membraneframework.rtc.media.* import org.membraneframework.rtc.models.EncodingReason -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.models.RTCStats import org.membraneframework.rtc.models.TrackContext import org.membraneframework.rtc.models.VadStatus -import org.membraneframework.rtc.transport.EventTransportError import org.membraneframework.rtc.utils.ClosableCoroutineScope import org.membraneframework.rtc.utils.Metadata +import org.membraneframework.rtc.utils.SerializedMediaEvent import org.membraneframework.rtc.utils.TimberDebugTree import org.webrtc.* import org.webrtc.AudioTrack @@ -29,7 +29,7 @@ internal class InternalMembraneRTC @AssistedInject constructor( @Assisted - private val connectOptions: ConnectOptions, + private val createOptions: CreateOptions, @Assisted private val listener: MembraneRTCListener, @Assisted @@ -40,15 +40,18 @@ constructor( peerConnectionManagerFactory: PeerConnectionManager.PeerConnectionManagerFactory, peerConnectionFactoryWrapperFactory: PeerConnectionFactoryWrapper.PeerConnectionFactoryWrapperFactory ) : RTCEngineListener, PeerConnectionListener { - private val rtcEngineCommunication = rtcEngineCommunicationFactory.create(connectOptions.transport, this) - private val peerConnectionFactoryWrapper = peerConnectionFactoryWrapperFactory.create(connectOptions) - private val peerConnectionManager = peerConnectionManagerFactory.create(this, peerConnectionFactoryWrapper) + private val rtcEngineCommunication = rtcEngineCommunicationFactory.create(this) + private val peerConnectionFactoryWrapper = peerConnectionFactoryWrapperFactory.create(createOptions) + private val peerConnectionManager = peerConnectionManagerFactory.create( + this, + peerConnectionFactoryWrapper + ) - private var localPeer: Peer = - Peer(id = "", metadata = connectOptions.config, trackIdToMetadata = mapOf()) + private var localEndpoint: Endpoint = + Endpoint(id = "", type = "webrtc", metadata = mapOf(), trackIdToMetadata = mapOf()) - // mapping from peer's id to the peer himself - private val remotePeers = HashMap() + // mapping from endpoint's id to the endpoint himself + private val remoteEndpoints = HashMap() // mapping from remote track's id to its context private val trackContexts = HashMap() @@ -68,25 +71,12 @@ constructor( @AssistedFactory interface Factory { fun create( - connectOptions: ConnectOptions, + createOptions: CreateOptions, listener: MembraneRTCListener, defaultDispatcher: CoroutineDispatcher ): InternalMembraneRTC } - fun connect() { - coroutineScope.launch { - try { - rtcEngineCommunication.connect() - listener.onConnected() - } catch (e: Exception) { - Timber.i(e, "Failed to connect") - - listener.onError(MembraneRTCError.Transport("Failed to connect")) - } - } - } - fun disconnect() { coroutineScope.launch { rtcEngineCommunication.disconnect() @@ -97,9 +87,14 @@ constructor( } } - fun join() { + fun receiveMediaEvent(event: SerializedMediaEvent) { + rtcEngineCommunication.onEvent(event) + } + + fun connect(endpointMetadata: Metadata) { coroutineScope.launch { - rtcEngineCommunication.join(localPeer.metadata) + localEndpoint = localEndpoint.copy(metadata = endpointMetadata) + rtcEngineCommunication.connect(endpointMetadata) } } @@ -119,18 +114,31 @@ constructor( } localTracks.add(videoTrack) - localPeer = localPeer.withTrack(videoTrack.id(), metadata) + localEndpoint = localEndpoint.withTrack(videoTrack.id(), metadata) + + coroutineScope.launch { + peerConnectionManager.addTrack(videoTrack) + rtcEngineCommunication.renegotiateTracks() + } return videoTrack } fun createLocalAudioTrack(metadata: Metadata = mapOf()): LocalAudioTrack { - val audioTrack = LocalAudioTrack.create(context, peerConnectionFactoryWrapper.peerConnectionFactory).also { + val audioTrack = LocalAudioTrack.create( + context, + peerConnectionFactoryWrapper.peerConnectionFactory + ).also { it.start() } localTracks.add(audioTrack) - localPeer = localPeer.withTrack(audioTrack.id(), metadata) + localEndpoint = localEndpoint.withTrack(audioTrack.id(), metadata) + + coroutineScope.launch { + peerConnectionManager.addTrack(audioTrack) + rtcEngineCommunication.renegotiateTracks() + } return audioTrack } @@ -166,17 +174,15 @@ constructor( } localTracks.add(screencastTrack) - localPeer = localPeer.withTrack(screencastTrack.id(), metadata) + localEndpoint = localEndpoint.withTrack(screencastTrack.id(), metadata) coroutineScope.launch { screencastTrack.startForegroundService(null, null) screencastTrack.start() } - val streamIds = listOf(UUID.randomUUID().toString()) - coroutineScope.launch { - peerConnectionManager.addTrack(screencastTrack, streamIds) + peerConnectionManager.addTrack(screencastTrack) rtcEngineCommunication.renegotiateTracks() } @@ -194,7 +200,7 @@ constructor( peerConnectionManager.removeTrack(track.id()) localTracks.remove(track) - localPeer = localPeer.withoutTrack(trackId) + localEndpoint = localEndpoint.withoutTrack(trackId) track.stop() } rtcEngineCommunication.renegotiateTracks() @@ -202,63 +208,62 @@ constructor( } } - fun updatePeerMetadata(peerMetadata: Metadata) { + fun updateEndpointMetadata(endpointMetadata: Metadata) { coroutineScope.launch { - rtcEngineCommunication.updatePeerMetadata(peerMetadata) - localPeer = localPeer.copy(metadata = peerMetadata) + rtcEngineCommunication.updateEndpointMetadata(endpointMetadata) + localEndpoint = localEndpoint.copy(metadata = endpointMetadata) } } fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { coroutineScope.launch { rtcEngineCommunication.updateTrackMetadata(trackId, trackMetadata) - localPeer = localPeer.withTrack(trackId, trackMetadata) + localEndpoint = localEndpoint.withTrack(trackId, trackMetadata) } } - override fun onPeerAccepted(peerId: String, peersInRoom: List) { - this.localPeer = localPeer.copy(id = peerId) + override fun onConnected(endpointID: String, otherEndpoints: List) { + this.localEndpoint = localEndpoint.copy(id = endpointID) + listener.onConnected(endpointID, otherEndpoints) - listener.onJoinSuccess(localPeer.id, peersInRoom = peersInRoom) - - peersInRoom.forEach { - this.remotePeers[it.id] = it + otherEndpoints.forEach { + this.remoteEndpoints[it.id] = it for ((trackId, metadata) in it.trackIdToMetadata) { - val context = TrackContext(track = null, peer = it, trackId = trackId, metadata = metadata) + val context = TrackContext(track = null, endpoint = it, trackId = trackId, metadata = metadata) this.trackContexts[trackId] = context this.listener.onTrackAdded(context) } } - coroutineScope.launch { - rtcEngineCommunication.renegotiateTracks() - } } - override fun onPeerDenied() { - // TODO: return meaningful data - listener.onJoinError(mapOf()) + override fun onSendMediaEvent(event: SerializedMediaEvent) { + listener.onSendMediaEvent(event) } - override fun onPeerJoined(peer: Peer) { - if (peer.id == this.localPeer.id) { + override fun onEndpointAdded(endpoint: Endpoint) { + if (endpoint.id == this.localEndpoint.id) { return } - remotePeers[peer.id] = peer + remoteEndpoints[endpoint.id] = endpoint - listener.onPeerJoined(peer) + listener.onEndpointAdded(endpoint) } - override fun onPeerLeft(peerId: String) { - val peer = remotePeers.remove(peerId) ?: run { - Timber.e("Failed to process PeerLeft event: Peer not found: $peerId") + override fun onEndpointRemoved(endpointId: String) { + if (endpointId == localEndpoint.id) { + listener.onDisconnected() + return + } + val endpoint = remoteEndpoints.remove(endpointId) ?: run { + Timber.e("Failed to process EndpointLeft event: Endpoint not found: $endpointId") return } - val trackIds: List = peer.trackIdToMetadata.keys.toList() + val trackIds: List = endpoint.trackIdToMetadata.keys.toList() trackIds.forEach { trackContexts.remove(it)?.let { ctx -> @@ -266,16 +271,16 @@ constructor( } } - listener.onPeerLeft(peer) + listener.onEndpointRemoved(endpoint) } - override fun onPeerUpdated(peerId: String, peerMetadata: Metadata) { - val peer = remotePeers.remove(peerId) ?: run { - Timber.e("Failed to process PeerUpdated event: Peer not found: $peerId") + override fun onEndpointUpdated(endpointId: String, endpointMetadata: Metadata) { + val endpoint = remoteEndpoints.remove(endpointId) ?: run { + Timber.e("Failed to process EndpointUpdated event: Endpoint not found: $endpointId") return } - remotePeers[peer.id] = peer.copy(metadata = peerMetadata) + remoteEndpoints[endpoint.id] = endpoint.copy(metadata = endpointMetadata) } override fun onOfferData(integratedTurnServers: List, tracksTypes: Map) { @@ -287,7 +292,7 @@ constructor( } rtcEngineCommunication.sdpOffer( offer.description, - localPeer.trackIdToMetadata, + localEndpoint.trackIdToMetadata, offer.midToTrackIdMapping ) } catch (e: Exception) { @@ -332,20 +337,20 @@ constructor( } } - override fun onTracksAdded(peerId: String, trackIdToMetadata: Map) { - if (localPeer.id == peerId) return + override fun onTracksAdded(endpointId: String, trackIdToMetadata: Map) { + if (localEndpoint.id == endpointId) return - val peer = remotePeers.remove(peerId) ?: run { - Timber.e("Failed to process TracksAdded event: Peer not found: $peerId") + val endpoint = remoteEndpoints.remove(endpointId) ?: run { + Timber.e("Failed to process TracksAdded event: Endpoint not found: $endpointId") return } - val updatedPeer = peer.copy(trackIdToMetadata = trackIdToMetadata) + val updatedEndpoint = endpoint.copy(trackIdToMetadata = trackIdToMetadata) - remotePeers[updatedPeer.id] = updatedPeer + remoteEndpoints[updatedEndpoint.id] = updatedEndpoint - for ((trackId, metadata) in updatedPeer.trackIdToMetadata) { - val context = TrackContext(track = null, peer = peer, trackId = trackId, metadata = metadata) + for ((trackId, metadata) in updatedEndpoint.trackIdToMetadata) { + val context = TrackContext(track = null, endpoint = endpoint, trackId = trackId, metadata = metadata) this.trackContexts[trackId] = context @@ -353,9 +358,9 @@ constructor( } } - override fun onTracksRemoved(peerId: String, trackIds: List) { - val peer = remotePeers[peerId] ?: run { - Timber.e("Failed to process TracksRemoved event: Peer not found: $peerId") + override fun onTracksRemoved(endpointId: String, trackIds: List) { + val endpoint = remoteEndpoints[endpointId] ?: run { + Timber.e("Failed to process TracksRemoved event: Endpoint not found: $endpointId") return } @@ -365,16 +370,16 @@ constructor( this.listener.onTrackRemoved(context) } - val updatedPeer = trackIds.fold(peer) { acc, trackId -> + val updatedEndpoint = trackIds.fold(endpoint) { acc, trackId -> acc.withoutTrack(trackId) } - remotePeers[peerId] = updatedPeer + remoteEndpoints[endpointId] = updatedEndpoint } - override fun onTrackUpdated(peerId: String, trackId: String, metadata: Metadata) { - val peer = remotePeers[peerId] ?: run { - Timber.e("Failed to process TrackUpdated event: Peer not found: $peerId") + override fun onTrackUpdated(endpointId: String, trackId: String, metadata: Metadata) { + val endpoint = remoteEndpoints[endpointId] ?: run { + Timber.e("Failed to process TrackUpdated event: Endpoint not found: $endpointId") return } @@ -385,16 +390,16 @@ constructor( context.metadata = metadata - val updatedPeer = peer + val updatedEndpoint = endpoint .withoutTrack(trackId) .withTrack(trackId, metadata) - remotePeers[peerId] = updatedPeer + remoteEndpoints[endpointId] = updatedEndpoint this.listener.onTrackUpdated(context) } - override fun onTrackEncodingChanged(peerId: String, trackId: String, encoding: String, encodingReason: String) { + override fun onTrackEncodingChanged(endpointId: String, trackId: String, encoding: String, encodingReason: String) { val encodingReasonEnum = EncodingReason.fromString(encodingReason) if (encodingReasonEnum == null) { Timber.e("Invalid encoding reason: $encodingReason") @@ -411,15 +416,6 @@ constructor( return } trackContext.setEncoding(encodingEnum, encodingReasonEnum) - this.listener.onTrackEncodingChanged(peerId, trackId, encoding) - } - - override fun onRemoved(peerId: String, reason: String) { - if (peerId != localPeer.id) { - Timber.e("Received onRemoved media event, but it does not refer to the local peer") - return - } - listener.onRemoved(reason) } override fun onVadNotification(trackId: String, status: String) { @@ -440,18 +436,6 @@ constructor( listener.onBandwidthEstimationChanged(estimation) } - override fun onError(error: EventTransportError) { - if (error is EventTransportError.ConnectionError) { - listener.onError(MembraneRTCError.Transport(error.reason)) - } else { - listener.onError(MembraneRTCError.Transport(error.message ?: "unknown transport message")) - } - } - - override fun onClose() { - Timber.i("Transport has been closed") - } - fun setTargetTrackEncoding(trackId: String, encoding: TrackEncoding) { coroutineScope.launch { rtcEngineCommunication.setTargetTrackEncoding(trackId, encoding) diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt index c7b56b2..d47a64b 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt @@ -7,6 +7,7 @@ import org.membraneframework.rtc.dagger.DaggerMembraneRTCComponent import org.membraneframework.rtc.media.* import org.membraneframework.rtc.models.RTCStats import org.membraneframework.rtc.utils.Metadata +import org.membraneframework.rtc.utils.SerializedMediaEvent import org.webrtc.Logging /** @@ -14,11 +15,11 @@ import org.webrtc.Logging *

* The client is responsible for relaying MembraneRTC Engine specific messages through given reliable transport layer. * Once initialized, the client is responsible for exchanging necessary messages via provided EventTransport passed via `ConnectOptions` and managing underlying - * `PeerConnection`. The goal of the client is to be as lean as possible, meaning that all activities regarding the session such as moderating + * `EndpointConnection`. The goal of the client is to be as lean as possible, meaning that all activities regarding the session such as moderating * should be implemented by the user himself on top of the MembraneRTC. *

- * The user's ability of interacting with the client is greatly limited to the essential actions such as joining/leaving the session, - * adding/removing local tracks and receiving information about remote peers and their tracks that can be played by the user. + * The user's ability of interacting with the client is greatly limited to the essential actions such as connecting to/leaving the session, + * adding/removing local tracks and receiving information about remote endpoints and their tracks that can be played by the user. *

* User can request 3 different types of local tracks that will get forwarded to the server by the client: *

    @@ -27,24 +28,25 @@ import org.webrtc.Logging *
  • `LocalScreencast` - a screencast track capturing a device's screen using MediaProjection mechanism
  • *
*

- * It is recommended to request necessary audio and video tracks before joining the room but it does not mean it can't be done afterwards (in case of screencast) + * It is recommended to request necessary audio and video tracks before connecting to the room but it does not mean it can't be done afterwards (in case of screencast) *

- * Once the user received onConnected notification they can call the join method to initialize joining the session. - * After receiving `onJoinSuccess` a user will receive notification about various peers joining/leaving the session, new tracks being published and ready for playback + * Once the user created MembraneRTC client, they can call the connect method to initialize connecting to the session. + * After receiving `onConnected` a user will receive notification about various endpoints connecting to/leaving the session, new tracks being published and ready for playback * or going inactive. */ class MembraneRTC private constructor( private var client: InternalMembraneRTC ) { - /** - * Starts the join process. + * Tries to connect the RTC Engine. If user is accepted then onConnected will be called. + * In other case {@link Callbacks.onConnectError} is invoked. *

- * Should be called only when a listener received onConnected message. + * @param endpointMetadata - Any information that other endpoints will receive in onEndpointAdded + * after accepting this endpoint */ - fun join() { - client.join() + fun connect(endpointMetadata: Metadata) { + client.connect(endpointMetadata) } /** @@ -56,6 +58,17 @@ private constructor( client.disconnect() } + /** + * Feeds media event received from RTC Engine to MembraneWebRTC. + * This function should be called whenever some media event from RTC Engine + * was received and can result in MembraneWebRTC generating some other + * media events. + * @param mediaEvent - String data received over custom signalling layer. + */ + fun receiveMediaEvent(mediaEvent: SerializedMediaEvent) { + client.receiveMediaEvent(mediaEvent) + } + /** * Creates a video track utilizing device's camera. *

@@ -152,23 +165,23 @@ private constructor( } /** - * Updates the metadata for the current peer. - * @param peerMetadata Data about this peer that other peers will receive upon joining. + * Updates the metadata for the current endpoint. + * @param endpointMetadata Data about this endpoint that other endpoints will receive upon connecting. * * If the metadata is different from what is already tracked in the room, the optional - * callback `onPeerUpdated` will be triggered for other peers in the room. + * callback `onEndpointUpdated` will be triggered for other endpoints in the room. */ - fun updatePeerMetadata(peerMetadata: Metadata) { - client.updatePeerMetadata(peerMetadata) + fun updateEndpointMetadata(endpointMetadata: Metadata) { + client.updateEndpointMetadata(endpointMetadata) } /** * Updates the metadata for a specific track. * @param trackId local track id of audio or video track. - * @param trackMetadata Data about this track that other peers will receive upon joining. + * @param trackMetadata Data about this track that other endpoints will receive upon connecting. * * If the metadata is different from what is already tracked in the room, the optional - * callback `onTrackUpdated` will be triggered for other peers in the room. + * callback `onTrackUpdated` will be triggered for other endpoints in the room. */ fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { client.updateTrackMetadata(trackId, trackMetadata) @@ -217,14 +230,18 @@ private constructor( companion object { /** - * Creates an instance of MembraneRTC client and starts the connecting process. + * Creates an instance of MembraneRTC client. * * @param appContext the context of the current application - * @param options a set of options defining parameters such as event transport or connect metadata * @param listener a listener that will receive all notifications emitted by the MembraneRTC + * @param options a set of options defining parameters such as encoder parameters * @return an instance of the client in connecting state */ - fun connect(appContext: Context, options: ConnectOptions, listener: MembraneRTCListener): MembraneRTC { + fun create( + appContext: Context, + listener: MembraneRTCListener, + options: CreateOptions = CreateOptions() + ): MembraneRTC { val ctx = appContext.applicationContext val component = DaggerMembraneRTCComponent @@ -235,8 +252,6 @@ private constructor( .membraneRTCFactory() .create(options, listener, Dispatchers.Default) - client.connect() - return MembraneRTC(client) } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt index 549e9d0..ff148d6 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt @@ -1,53 +1,43 @@ package org.membraneframework.rtc -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.models.TrackContext +import org.membraneframework.rtc.utils.SerializedMediaEvent import timber.log.Timber interface MembraneRTCListener { - // / Callback invoked when client has successfully connected via transport layer. - fun onConnected() + // Called each time MembraneWebRTC need to send some data to the server. + fun onSendMediaEvent(event: SerializedMediaEvent) - // /Callback invoked when the client has been approved to participate in media exchange. - fun onJoinSuccess(peerID: String, peersInRoom: List) + // Callback invoked when the client has been approved to participate in media exchange. + fun onConnected(endpointID: String, otherEndpoints: List) - // /Callback invoked when client has been denied access to enter the room. - fun onJoinError(metadata: Any) + // Called when endpoint of this MembraneRTC instance was removed + fun onDisconnected() - // /Callback invoked a track is ready to be played. + // Called in case of errors related to multimedia session e.g. ICE connection. + fun onConnectError(metadata: Any) + + // Callback invoked a track is ready to be played. fun onTrackReady(ctx: TrackContext) - // /Callback invoked a peer already present in a room adds a new track. + // Callback invoked a endpoint already present in a room adds a new track. fun onTrackAdded(ctx: TrackContext) - // /Callback invoked when a track will no longer receive any data. + // Callback invoked when a track will no longer receive any data. fun onTrackRemoved(ctx: TrackContext) - // /Callback invoked when track's metadata gets updated + // Callback invoked when track's metadata gets updated fun onTrackUpdated(ctx: TrackContext) - // /Callback invoked when a new peer joins the room. - fun onPeerJoined(peer: Peer) - - // /Callback invoked when a peer leaves the room. - fun onPeerLeft(peer: Peer) - - // /Callback invoked when peer's metadata gets updated. - fun onPeerUpdated(peer: Peer) - - // Callback invoked when received track encoding has changed - @Deprecated("Deprecated, use TrackContext::setOnEncodingChangedListener") - fun onTrackEncodingChanged(peerId: String, trackId: String, encoding: String) { - Timber.i( - "Track encoding changed $trackId -> $encoding" - ) - } + // Callback invoked when a new endpoint joins the room. + fun onEndpointAdded(endpoint: Endpoint) - // /Callback invoked when an errors happens. - fun onError(error: MembraneRTCError) + // Called each time endpoint is removed, called only for other endpoints. + fun onEndpointRemoved(endpoint: Endpoint) - // Callback invoked every time a local peer is removed by the server - fun onRemoved(reason: String) { Timber.e("Peer removed") } + // Called each time endpoint has its metadata updated. + fun onEndpointUpdated(endpoint: Endpoint) // Called every time the server estimates client's bandwidth. // estimation - client's available incoming bitrate estimated diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionFactoryWrapper.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionFactoryWrapper.kt index 0b2932c..678e47f 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionFactoryWrapper.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionFactoryWrapper.kt @@ -7,11 +7,10 @@ import dagger.assisted.AssistedInject import org.membraneframework.rtc.media.SimulcastVideoEncoderFactoryWrapper import org.webrtc.* import org.webrtc.audio.AudioDeviceModule -import java.util.* internal class PeerConnectionFactoryWrapper @AssistedInject constructor( - @Assisted private val connectOptions: ConnectOptions, + @Assisted private val createOptions: CreateOptions, audioDeviceModule: AudioDeviceModule, eglBase: EglBase, appContext: Context @@ -19,7 +18,7 @@ internal class PeerConnectionFactoryWrapper @AssistedFactory interface PeerConnectionFactoryWrapperFactory { fun create( - connectOptions: ConnectOptions + createOptions: CreateOptions ): PeerConnectionFactoryWrapper } @@ -34,7 +33,7 @@ internal class PeerConnectionFactoryWrapper PeerConnectionFactory.builder().setAudioDeviceModule(audioDeviceModule).setVideoEncoderFactory( SimulcastVideoEncoderFactoryWrapper( eglBase.eglBaseContext, - connectOptions.encoderOptions + createOptions.encoderOptions ) ).setVideoDecoderFactory(DefaultVideoDecoderFactory(eglBase.eglBaseContext)).createPeerConnectionFactory() } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt index cbabcf0..a6970d2 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt @@ -51,6 +51,8 @@ internal class PeerConnectionManager private val coroutineScope: CoroutineScope = ClosableCoroutineScope(SupervisorJob()) + private var streamIds: List = listOf(UUID.randomUUID().toString()) + private fun getSendEncodingsFromConfig(simulcastConfig: SimulcastConfig): List { val sendEncodings = Constants.simulcastEncodings() simulcastConfig.activeEncodings.forEach { @@ -59,7 +61,11 @@ internal class PeerConnectionManager return sendEncodings } - suspend fun addTrack(track: LocalTrack, streamIds: List) { + suspend fun addTrack(track: LocalTrack) { + addTrack(track, streamIds) + } + + private suspend fun addTrack(track: LocalTrack, streamIds: List) { val videoParameters = (track as? LocalVideoTrack)?.videoParameters ?: (track as? LocalScreencastTrack)?.videoParameters @@ -208,8 +214,6 @@ internal class PeerConnectionManager this@PeerConnectionManager.peerConnection = pc } - val streamIds = listOf(UUID.randomUUID().toString()) - localTracks.forEach { addTrack(it, streamIds) } @@ -379,7 +383,10 @@ internal class PeerConnectionManager } val params = sender.parameters val encoding = params?.encodings?.find { it.rid == trackEncoding.rid } ?: run { - Timber.e("setTrackEncoding: Invalid encoding $trackEncoding, no such encoding found in peer connection") + Timber.e( + "setTrackEncoding: Invalid encoding $trackEncoding," + + "no such encoding found in peer connection" + ) return } encoding.active = enabled diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt index 3c8a1e7..10f18d7 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt @@ -1,54 +1,43 @@ package org.membraneframework.rtc +import com.google.gson.reflect.TypeToken import dagger.assisted.Assisted import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject import org.membraneframework.rtc.events.* -import org.membraneframework.rtc.transport.EventTransport -import org.membraneframework.rtc.transport.EventTransportError -import org.membraneframework.rtc.transport.EventTransportListener +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.utils.Metadata +import org.membraneframework.rtc.utils.SerializedMediaEvent import timber.log.Timber import kotlin.math.roundToLong internal class RTCEngineCommunication @AssistedInject constructor( - @Assisted - private val transport: EventTransport, @Assisted private val engineListener: RTCEngineListener -) : EventTransportListener { +) { @AssistedFactory interface RTCEngineCommunicationFactory { fun create( - transport: EventTransport, listener: RTCEngineListener ): RTCEngineCommunication } - suspend fun connect() { - transport.connect(this@RTCEngineCommunication) - } - - suspend fun disconnect() { - transport.disconnect() + fun connect(endpointMetadata: Metadata) { + sendEvent(Connect(endpointMetadata)) } - suspend fun join(peerMetadata: Metadata) { - transport.send(Join(peerMetadata)) + fun updateEndpointMetadata(endpointMetadata: Metadata) { + sendEvent(UpdateEndpointMetadata(endpointMetadata)) } - suspend fun updatePeerMetadata(peerMetadata: Metadata) { - transport.send(UpdatePeerMetadata(peerMetadata)) + fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { + sendEvent(UpdateTrackMetadata(trackId, trackMetadata)) } - suspend fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { - transport.send(UpdateTrackMetadata(trackId, trackMetadata)) - } - - suspend fun setTargetTrackEncoding(trackId: String, encoding: TrackEncoding) { - transport.send( + fun setTargetTrackEncoding(trackId: String, encoding: TrackEncoding) { + sendEvent( SelectEncoding( trackId, encoding.rid @@ -56,12 +45,12 @@ constructor( ) } - suspend fun renegotiateTracks() { - transport.send(RenegotiateTracks()) + fun renegotiateTracks() { + sendEvent(RenegotiateTracks()) } - suspend fun localCandidate(sdp: String, sdpMLineIndex: Int) { - transport.send( + fun localCandidate(sdp: String, sdpMLineIndex: Int) { + sendEvent( LocalCandidate( sdp, sdpMLineIndex @@ -69,12 +58,12 @@ constructor( ) } - suspend fun sdpOffer( + fun sdpOffer( sdp: String, trackIdToTrackMetadata: Map, midToTrackId: Map ) { - transport.send( + sendEvent( SdpOffer( sdp, trackIdToTrackMetadata, @@ -83,26 +72,52 @@ constructor( ) } - override fun onEvent(event: ReceivableEvent) { - when (event) { + fun disconnect() { + sendEvent(Disconnect()) + } + + private fun sendEvent(event: SendableEvent) { + val serializedMediaEvent = gson.toJson(event.serializeToMap()) + engineListener.onSendMediaEvent(serializedMediaEvent) + } + + private fun decodeEvent(event: SerializedMediaEvent): ReceivableEvent? { + val type = object : TypeToken>() {}.type + + val rawMessage: Map = gson.fromJson(event, type) + + ReceivableEvent.decode(rawMessage)?.let { + return it + } ?: run { + Timber.d("Failed to decode event $rawMessage") + return null + } + } + + fun onEvent(serializedEvent: SerializedMediaEvent) { + when (val event = decodeEvent(serializedEvent)) { + is Connected -> engineListener.onConnected(event.data.id, event.data.otherEndpoints) is OfferData -> engineListener.onOfferData(event.data.integratedTurnServers, event.data.tracksTypes) - is PeerAccepted -> engineListener.onPeerAccepted(event.data.id, event.data.peersInRoom) - is PeerRemoved -> engineListener.onRemoved(event.data.peerId, event.data.reason) - is PeerDenied -> engineListener.onPeerDenied() - is PeerJoined -> engineListener.onPeerJoined(event.data.peer) - is PeerLeft -> engineListener.onPeerLeft(event.data.peerId) - is PeerUpdated -> engineListener.onPeerUpdated(event.data.peerId, event.data.metadata) + is EndpointRemoved -> engineListener.onEndpointRemoved(event.data.id) + is EndpointAdded -> engineListener.onEndpointAdded( + Endpoint(event.data.id, event.data.type, event.data.metadata, mapOf()) + ) + is EndpointUpdated -> engineListener.onEndpointUpdated(event.data.id, event.data.metadata) is RemoteCandidate -> engineListener.onRemoteCandidate( event.data.candidate, event.data.sdpMLineIndex, event.data.sdpMid ) is SdpAnswer -> engineListener.onSdpAnswer(event.data.type, event.data.sdp, event.data.midToTrackId) - is TrackUpdated -> engineListener.onTrackUpdated(event.data.peerId, event.data.trackId, event.data.metadata) - is TracksAdded -> engineListener.onTracksAdded(event.data.peerId, event.data.trackIdToMetadata) - is TracksRemoved -> engineListener.onTracksRemoved(event.data.peerId, event.data.trackIds) + is TrackUpdated -> engineListener.onTrackUpdated( + event.data.endpointId, + event.data.trackId, + event.data.metadata + ) + is TracksAdded -> engineListener.onTracksAdded(event.data.endpointId, event.data.trackIdToMetadata) + is TracksRemoved -> engineListener.onTracksRemoved(event.data.endpointId, event.data.trackIds) is EncodingSwitched -> engineListener.onTrackEncodingChanged( - event.data.peerId, + event.data.endpointId, event.data.trackId, event.data.encoding, event.data.reason @@ -112,12 +127,4 @@ constructor( else -> Timber.e("Failed to process unknown event: $event") } } - - override fun onError(error: EventTransportError) { - engineListener.onError(error) - } - - override fun onClose() { - engineListener.onClose() - } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt index 967381a..b51f9ab 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt @@ -1,26 +1,23 @@ package org.membraneframework.rtc import org.membraneframework.rtc.events.OfferData -import org.membraneframework.rtc.models.Peer -import org.membraneframework.rtc.transport.EventTransportError +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.utils.Metadata +import org.membraneframework.rtc.utils.SerializedMediaEvent internal interface RTCEngineListener { - fun onPeerAccepted(peerId: String, peersInRoom: List) - fun onPeerDenied() - fun onPeerJoined(peer: Peer) - fun onPeerLeft(peerId: String) - fun onPeerUpdated(peerId: String, peerMetadata: Metadata) + fun onConnected(endpointID: String, otherEndpoints: List) + fun onSendMediaEvent(event: SerializedMediaEvent) + fun onEndpointAdded(endpoint: Endpoint) + fun onEndpointRemoved(endpointId: String) + fun onEndpointUpdated(endpointId: String, endpointMetadata: Metadata) fun onOfferData(integratedTurnServers: List, tracksTypes: Map) fun onSdpAnswer(type: String, sdp: String, midToTrackId: Map) fun onRemoteCandidate(candidate: String, sdpMLineIndex: Int, sdpMid: String?) - fun onTracksAdded(peerId: String, trackIdToMetadata: Map) - fun onTracksRemoved(peerId: String, trackIds: List) - fun onTrackUpdated(peerId: String, trackId: String, metadata: Metadata) - fun onTrackEncodingChanged(peerId: String, trackId: String, encoding: String, encodingReason: String) - fun onRemoved(peerId: String, reason: String) + fun onTracksAdded(endpointId: String, trackIdToMetadata: Map) + fun onTracksRemoved(endpointId: String, trackIds: List) + fun onTrackUpdated(endpointId: String, trackId: String, metadata: Metadata) + fun onTrackEncodingChanged(endpointId: String, trackId: String, encoding: String, encodingReason: String) fun onVadNotification(trackId: String, status: String) fun onBandwidthEstimation(estimation: Long) - fun onError(error: EventTransportError) - fun onClose() } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/events/Event.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/events/Event.kt index 5374972..7d1baa3 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/events/Event.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/events/Event.kt @@ -4,7 +4,7 @@ import com.google.gson.Gson import com.google.gson.JsonParseException import com.google.gson.annotations.SerializedName import com.google.gson.reflect.TypeToken -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.utils.Metadata import org.membraneframework.rtc.utils.Payload import timber.log.Timber @@ -29,10 +29,10 @@ internal inline fun I.convert(): O { sealed class SendableEvent -data class Join(val type: String, val data: Data) : SendableEvent() { +data class Connect(val type: String, val data: Data) : SendableEvent() { data class Data(val metadata: Metadata) - constructor(metadata: Metadata) : this("join", Data(metadata)) + constructor(metadata: Metadata) : this("connect", Data(metadata)) } data class SdpOffer(val type: String, val data: Payload) : SendableEvent() { @@ -92,10 +92,10 @@ data class SelectEncoding(val type: String, val data: Payload) : SendableEvent() ) } -data class UpdatePeerMetadata(val type: String, val data: Data) : SendableEvent() { +data class UpdateEndpointMetadata(val type: String, val data: Data) : SendableEvent() { data class Data(val metadata: Metadata) - constructor(metadata: Metadata) : this("updatePeerMetadata", Data(metadata)) + constructor(metadata: Metadata) : this("updateEndpointMetadata", Data(metadata)) } data class UpdateTrackMetadata(val type: String, val data: Data) : SendableEvent() { @@ -104,24 +104,22 @@ data class UpdateTrackMetadata(val type: String, val data: Data) : SendableEvent constructor(trackId: String, trackMetadata: Metadata) : this("updateTrackMetadata", Data(trackId, trackMetadata)) } -enum class ReceivableEventType { - @SerializedName("peerAccepted") - PeerAccepted, - - @SerializedName("peerDenied") - PeerDenied, +data class Disconnect(val type: String) : SendableEvent() { + constructor() : this("disconnect") +} - @SerializedName("peerJoined") - PeerJoined, +enum class ReceivableEventType { + @SerializedName("connected") + Connected, - @SerializedName("peerLeft") - PeerLeft, + @SerializedName("endpointAdded") + EndpointAdded, - @SerializedName("peerUpdated") - PeerUpdated, + @SerializedName("endpointUpdated") + EndpointUpdated, - @SerializedName("peerRemoved") - PeerRemoved, + @SerializedName("endpointRemoved") + EndpointRemoved, @SerializedName("custom") Custom, @@ -164,23 +162,17 @@ sealed class ReceivableEvent { val eventBase: BaseReceivableEvent = payload.toDataClass() return when (eventBase.type) { - ReceivableEventType.PeerAccepted -> - payload.toDataClass() - - ReceivableEventType.PeerDenied -> - payload.toDataClass() + ReceivableEventType.Connected -> + payload.toDataClass() - ReceivableEventType.PeerJoined -> - payload.toDataClass() + ReceivableEventType.EndpointAdded -> + payload.toDataClass() - ReceivableEventType.PeerLeft -> - payload.toDataClass() + ReceivableEventType.EndpointRemoved -> + payload.toDataClass() - ReceivableEventType.PeerUpdated -> - payload.toDataClass() - - ReceivableEventType.PeerRemoved -> - payload.toDataClass() + ReceivableEventType.EndpointUpdated -> + payload.toDataClass() ReceivableEventType.TracksAdded -> payload.toDataClass() @@ -229,26 +221,25 @@ sealed class ReceivableEvent { } } -data class PeerAccepted(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val id: String, val peersInRoom: List) +data class Connected(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { + data class Data(val id: String, val otherEndpoints: List) } -data class PeerDenied(val type: ReceivableEventType) : ReceivableEvent() - -data class PeerJoined(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peer: Peer) -} - -data class PeerLeft(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String) +data class EndpointAdded(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { + data class Data( + val id: String, + val type: String, + val metadata: Metadata, + val trackIdToMetadata: Map + ) } -data class PeerUpdated(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val metadata: Metadata) +data class EndpointUpdated(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { + data class Data(val id: String, val metadata: Metadata) } -data class PeerRemoved(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val reason: String) +data class EndpointRemoved(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { + data class Data(val id: String, val reason: String) } data class OfferData(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { @@ -264,15 +255,15 @@ data class OfferData(val type: ReceivableEventType, val data: Data) : Receivable } data class TracksAdded(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val trackIdToMetadata: Map) + data class Data(val endpointId: String, val trackIdToMetadata: Map) } data class TracksRemoved(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val trackIds: List) + data class Data(val endpointId: String, val trackIds: List) } data class TrackUpdated(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val trackId: String, val metadata: Metadata) + data class Data(val endpointId: String, val trackId: String, val metadata: Metadata) } data class SdpAnswer(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { @@ -284,7 +275,7 @@ data class RemoteCandidate(val type: ReceivableEventType, val data: Data) : Rece } data class EncodingSwitched(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val trackId: String, val encoding: String, val reason: String) + data class Data(val endpointId: String, val trackId: String, val encoding: String, val reason: String) } data class VadNotification(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/models/Peer.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/models/Endpoint.kt similarity index 64% rename from MembraneRTC/src/main/java/org/membraneframework/rtc/models/Peer.kt rename to MembraneRTC/src/main/java/org/membraneframework/rtc/models/Endpoint.kt index b88422e..5cd6455 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/models/Peer.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/models/Endpoint.kt @@ -2,18 +2,21 @@ package org.membraneframework.rtc.models import org.membraneframework.rtc.utils.Metadata -data class Peer(val id: String, val metadata: Metadata, val trackIdToMetadata: Map) { - fun withTrack(trackId: String, metadata: Metadata): Peer { +data class Endpoint( + val id: String, + val type: String, + val metadata: Metadata, + val trackIdToMetadata: Map +) { + fun withTrack(trackId: String, metadata: Metadata): Endpoint { val newTrackIdToMetadata = this.trackIdToMetadata.toMutableMap() newTrackIdToMetadata[trackId] = metadata - return this.copy(trackIdToMetadata = newTrackIdToMetadata) } - fun withoutTrack(trackId: String): Peer { + fun withoutTrack(trackId: String): Endpoint { val newTrackIdToMetadata = this.trackIdToMetadata.toMutableMap() newTrackIdToMetadata.remove(trackId) - return this.copy(trackIdToMetadata = newTrackIdToMetadata) } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/models/TrackContext.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/models/TrackContext.kt index 8ab0a99..0cad3bb 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/models/TrackContext.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/models/TrackContext.kt @@ -15,11 +15,11 @@ fun interface OnVoiceActivityChangedListener { /** * Track's context i.e. all data that can be useful when operating on track. * - * @property peer Peer this track comes from. - * @property trackId Track id. It is generated by RTC engine and takes form `peer_id:`. + * @property endpoint Endpoint this track comes from. + * @property trackId Track id. It is generated by RTC engine and takes form `endpoint_id:`. * @property metadata Any info that was passed in MembraneWebRTC.createVideoTrack/MembraneWebRTC.createAudioTrack */ -class TrackContext(track: RemoteTrack?, val peer: Peer, val trackId: String, metadata: Metadata) { +class TrackContext(track: RemoteTrack?, val endpoint: Endpoint, val trackId: String, metadata: Metadata) { private var onTrackEncodingChangeListener: (OnEncodingChangedListener)? = null private var onVadNotificationListener: (OnVoiceActivityChangedListener)? = null diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/EventTransport.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/EventTransport.kt deleted file mode 100644 index 22ede11..0000000 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/EventTransport.kt +++ /dev/null @@ -1,45 +0,0 @@ -package org.membraneframework.rtc.transport - -import org.membraneframework.rtc.events.* - -/** - * A base class of exceptions that can be emitted by the EventTransport implementations. - */ -sealed class EventTransportError : Exception() { - data class Unauthorized(val reason: String) : EventTransportError() - data class ConnectionError(val reason: String) : EventTransportError() - data class Unexpected(val reason: String) : EventTransportError() - - override fun toString(): String { - return when (this) { - is Unauthorized -> - "User is unauthorized to use the transport: ${this.reason}" - is ConnectionError -> - "Failed to connect with the remote side: ${this.reason}" - is Unexpected -> - "Encountered unexpected error: ${this.reason}" - } - } -} - -/** - * An interface defining a listener to a EventTransport. - */ -interface EventTransportListener { - fun onEvent(event: ReceivableEvent) - fun onError(error: EventTransportError) - fun onClose() -} - -/** - * Interface defining an event transport that the MembraneRTC uses for - * relaying media events from/to the Membrane RTC Engine. - *

- * An implementation of the transport should parse and forward received events to the listener - * passed with connect method. - */ -interface EventTransport { - suspend fun connect(listener: EventTransportListener) - suspend fun disconnect() - suspend fun send(event: SendableEvent) -} diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt index 931565d..75ae814 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt @@ -1,36 +1,55 @@ package org.membraneframework.rtc.transport -import com.google.gson.GsonBuilder -import com.google.gson.reflect.TypeToken import kotlinx.coroutines.* -import org.membraneframework.rtc.events.ReceivableEvent -import org.membraneframework.rtc.events.SendableEvent -import org.membraneframework.rtc.events.serializeToMap import org.membraneframework.rtc.utils.ClosableCoroutineScope +import org.membraneframework.rtc.utils.SerializedMediaEvent import org.membraneframework.rtc.utils.SocketChannelParams import org.membraneframework.rtc.utils.SocketConnectionParams import org.phoenixframework.Channel import org.phoenixframework.Socket import timber.log.Timber +sealed class PhoenixTransportError : Exception() { + data class Unauthorized(val reason: String) : PhoenixTransportError() + data class ConnectionError(val reason: String) : PhoenixTransportError() + data class Unexpected(val reason: String) : PhoenixTransportError() + + override fun toString(): String { + return when (this) { + is Unauthorized -> + "User is unauthorized to use the transport: ${this.reason}" + is ConnectionError -> + "Failed to connect with the remote side: ${this.reason}" + is Unexpected -> + "Encountered unexpected error: ${this.reason}" + } + } +} + +/** + * An interface defining a listener to a PhoenixTransport. + */ +interface PhoenixTransportListener { + fun onEvent(event: SerializedMediaEvent) + fun onError(error: PhoenixTransportError) + fun onClose() +} + class PhoenixTransport constructor( private val url: String, private val topic: String, private val ioDispatcher: CoroutineDispatcher, private val params: SocketConnectionParams? = emptyMap(), private val socketChannelParams: SocketChannelParams = emptyMap() -) : EventTransport { - +) { private lateinit var coroutineScope: CoroutineScope private var socket: Socket? = null private var channel: Channel? = null - private var listener: EventTransportListener? = null - - private val gson = GsonBuilder().create() + private var listener: PhoenixTransportListener? = null private var joinContinuation: CancellableContinuation? = null - override suspend fun connect(listener: EventTransportListener) { + suspend fun connect(listener: PhoenixTransportListener) { Timber.i("Starting connection...") this.listener = listener @@ -47,11 +66,11 @@ class PhoenixTransport constructor( } val errorRef = socket!!.onError { error, _ -> - continuation.cancel(EventTransportError.ConnectionError(error.toString())) + continuation.cancel(PhoenixTransportError.ConnectionError(error.toString())) } val closeRef = socket!!.onClose { - continuation.cancel(EventTransportError.ConnectionError("closed")) + continuation.cancel(PhoenixTransportError.ConnectionError("closed")) } socketRefs += openRef @@ -62,7 +81,7 @@ class PhoenixTransport constructor( socket!!.off(socketRefs.toList()) socket!!.onError { error, _ -> - this.listener?.onError(EventTransportError.ConnectionError(error.toString())) + this.listener?.onError(PhoenixTransportError.ConnectionError(error.toString())) } socket!!.onClose { @@ -77,25 +96,13 @@ class PhoenixTransport constructor( } ?.receive("error") { _ -> joinContinuation?.resumeWith( - Result.failure(EventTransportError.Unauthorized("couldn't join phoenix channel")) + Result.failure(PhoenixTransportError.Unauthorized("couldn't join phoenix channel")) ) } channel?.on("mediaEvent") { message -> - try { - val data = message.payload["data"] as String - val type = object : TypeToken>() {}.type - - val rawMessage: Map = gson.fromJson(data, type) - - ReceivableEvent.decode(rawMessage)?.let { - listener.onEvent(it) - } ?: run { - Timber.d("Failed to decode event $rawMessage") - } - } catch (e: Exception) { - Timber.e(e) - } + val data = message.payload["data"] as String + listener.onEvent(data) } return suspendCancellableCoroutine { @@ -103,7 +110,7 @@ class PhoenixTransport constructor( } } - override suspend fun disconnect() { + fun disconnect() { if (channel != null) { channel ?.leave() @@ -115,13 +122,10 @@ class PhoenixTransport constructor( } } - override suspend fun send(event: SendableEvent) { - coroutineScope.async { - val payload = mapOf( - "data" to gson.toJson(event.serializeToMap()) - ) - - channel ?.push("mediaEvent", payload) - } + fun send(event: SerializedMediaEvent) { + val payload = mapOf( + "data" to event + ) + channel?.push("mediaEvent", payload) } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/PeerConnectionUtils.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/EndpointConnectionUtils.kt similarity index 100% rename from MembraneRTC/src/main/java/org/membraneframework/rtc/utils/PeerConnectionUtils.kt rename to MembraneRTC/src/main/java/org/membraneframework/rtc/utils/EndpointConnectionUtils.kt diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/types.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/types.kt index 1e92442..0621404 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/types.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/types.kt @@ -4,3 +4,4 @@ typealias Payload = Map typealias Metadata = Map typealias SocketConnectionParams = Map typealias SocketChannelParams = Map +typealias SerializedMediaEvent = String diff --git a/MembraneRTC/src/test/java/org/membraneframework/rtc/test/PeerConnectionManagerTest.kt b/MembraneRTC/src/test/java/org/membraneframework/rtc/test/EndpointConnectionManagerTest.kt similarity index 85% rename from MembraneRTC/src/test/java/org/membraneframework/rtc/test/PeerConnectionManagerTest.kt rename to MembraneRTC/src/test/java/org/membraneframework/rtc/test/EndpointConnectionManagerTest.kt index b0d36d5..2e98549 100644 --- a/MembraneRTC/src/test/java/org/membraneframework/rtc/test/PeerConnectionManagerTest.kt +++ b/MembraneRTC/src/test/java/org/membraneframework/rtc/test/EndpointConnectionManagerTest.kt @@ -18,30 +18,30 @@ import org.membraneframework.rtc.utils.setLocalDescription import org.webrtc.* import org.webrtc.RtpParameters.Encoding -class PeerConnectionManagerTest { +class EndpointConnectionManagerTest { private lateinit var manager: PeerConnectionManager - private lateinit var peerConnectionMock: PeerConnection + private lateinit var endpointConnectionMock: PeerConnection @Before fun createMocks() { - val peerConnectionListenerMock = mockk(relaxed = true) - val peerConnectionFactoryMock = mockk(relaxed = true) + val endpointConnectionListenerMock = mockk(relaxed = true) + val endpointConnectionFactoryMock = mockk(relaxed = true) mockkStatic("org.membraneframework.rtc.utils.SuspendableSdpObserverKt") - mockkStatic("org.membraneframework.rtc.utils.PeerConnectionUtilsKt") + mockkStatic("org.membraneframework.rtc.utils.EndpointConnectionUtilsKt") - peerConnectionMock = mockk(relaxed = true) + endpointConnectionMock = mockk(relaxed = true) coEvery { - peerConnectionMock.createOffer(any()) + endpointConnectionMock.createOffer(any()) } returns Result.success(SessionDescription(SessionDescription.Type.OFFER, "test_description")) coEvery { - peerConnectionMock.setLocalDescription(any()) + endpointConnectionMock.setLocalDescription(any()) } returns Result.success(Unit) - every { peerConnectionFactoryMock.createPeerConnection(any(), any()) } returns peerConnectionMock + every { endpointConnectionFactoryMock.createPeerConnection(any(), any()) } returns endpointConnectionMock - manager = PeerConnectionManager(peerConnectionListenerMock, peerConnectionFactoryMock) + manager = PeerConnectionManager(endpointConnectionListenerMock, endpointConnectionFactoryMock) } @OptIn(ExperimentalCoroutinesApi::class) @@ -60,7 +60,7 @@ class PeerConnectionManagerTest { manager.getSdpOffer(emptyList(), emptyMap(), listOf(audioTrack)) verify(exactly = 1) { - peerConnectionMock.addTransceiver( + endpointConnectionMock.addTransceiver( audioTrack.mediaTrack, eq(RtpTransceiver.RtpTransceiverDirection.SEND_ONLY), match { it.size == 1 }, @@ -89,7 +89,7 @@ class PeerConnectionManagerTest { manager.getSdpOffer(emptyList(), emptyMap(), listOf(videoTrack)) verify(exactly = 1) { - peerConnectionMock.addTransceiver( + endpointConnectionMock.addTransceiver( videoTrack.rtcTrack(), eq(RtpTransceiver.RtpTransceiverDirection.SEND_ONLY), match { it.size == 1 }, @@ -125,7 +125,7 @@ class PeerConnectionManagerTest { manager.getSdpOffer(emptyList(), emptyMap(), listOf(videoTrack)) verify(exactly = 1) { - peerConnectionMock.addTransceiver( + endpointConnectionMock.addTransceiver( videoTrack.rtcTrack(), eq(RtpTransceiver.RtpTransceiverDirection.SEND_ONLY), any(), @@ -155,7 +155,7 @@ class PeerConnectionManagerTest { val m = Encoding("m", true, 2.0) val l = Encoding("l", true, 4.0) - every { peerConnectionMock.senders } returns listOf( + every { endpointConnectionMock.senders } returns listOf( mockk(relaxed = true) { every { parameters } returns mockk(relaxed = true) { every { track()?.id() } returns "dummy_track" diff --git a/app/build.gradle b/app/build.gradle index 12911f4..6a2b351 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -20,7 +20,7 @@ android { vectorDrawables { useSupportLibrary true } - buildConfigField "String" , "VIDEOROOM_URL" , "\"https://videoroom.membrane.work/socket\"" + buildConfigField "String" , "VIDEOROOM_URL" , "\"https://videoroom.membrane.work/socket\"" } buildTypes { diff --git a/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt b/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt index 90c1ced..1af6593 100644 --- a/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt +++ b/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt @@ -10,21 +10,24 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.launch import org.membraneframework.rtc.* import org.membraneframework.rtc.media.* -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.models.TrackContext import org.membraneframework.rtc.transport.PhoenixTransport +import org.membraneframework.rtc.transport.PhoenixTransportError +import org.membraneframework.rtc.transport.PhoenixTransportListener +import org.membraneframework.rtc.utils.SerializedMediaEvent import timber.log.Timber import java.util.* class RoomViewModel( val url: String, application: Application -) : AndroidViewModel(application), MembraneRTCListener { +) : AndroidViewModel(application), MembraneRTCListener, PhoenixTransportListener { // media tracks var localAudioTrack: LocalAudioTrack? = null var localVideoTrack: LocalVideoTrack? = null var localScreencastTrack: LocalScreencastTrack? = null - private val localPeerId: String = UUID.randomUUID().toString() + private val localEndpointId: String = UUID.randomUUID().toString() var localDisplayName: String? = null @@ -45,6 +48,8 @@ class RoomViewModel( private val globalToLocalTrackId = HashMap() private val params = mapOf("token" to "mocktoken") + private lateinit var transport: PhoenixTransport + val videoSimulcastConfig = MutableStateFlow( SimulcastConfig( enabled = true, @@ -67,30 +72,43 @@ class RoomViewModel( // disconnect from the current view room.value?.disconnect() - room.value = MembraneRTC.connect( + val transport = PhoenixTransport( + url, + "room:$roomName", + Dispatchers.IO, + params, + mapOf("isSimulcastOn" to true) + ) + + try { + transport.connect(this@RoomViewModel) + } catch (e: Exception) { + Timber.i(e, "Failed to connect") + + errorMessage.value = "Encountered an error, go back and try again..." + return@launch + } + + this@RoomViewModel.transport = transport + + room.value = MembraneRTC.create( appContext = getApplication(), - options = ConnectOptions( - transport = PhoenixTransport( - url, - "room:$roomName", - Dispatchers.IO, - params, - mapOf("isSimulcastOn" to true) - ), - config = mapOf("displayName" to displayName), + options = CreateOptions( encoderOptions = EncoderOptions( encoderType = EncoderType.SOFTWARE ) ), listener = this@RoomViewModel ) + + room.value?.connect(mapOf("displayName" to (localDisplayName ?: ""))) } } fun disconnect() { room.value?.disconnect() - room.value = null + transport.disconnect() } fun focusVideo(participantId: String) { @@ -148,9 +166,9 @@ class RoomViewModel( room.value?.updateTrackMetadata(it.id(), mapOf("active" to enabled, "type" to "audio")) } - val p = mutableParticipants[localPeerId] + val p = mutableParticipants[localEndpointId] if (p != null) { - mutableParticipants[localPeerId] = p.updateTrackMetadata( + mutableParticipants[localEndpointId] = p.updateTrackMetadata( p.audioTrack?.id(), mapOf("active" to isMicrophoneOn.value) ) @@ -167,9 +185,9 @@ class RoomViewModel( room.value?.updateTrackMetadata(it.id(), mapOf("active" to enabled, "type" to "camera")) } - val p = mutableParticipants[localPeerId] + val p = mutableParticipants[localEndpointId] if (p != null) { - mutableParticipants[localPeerId] = p.updateTrackMetadata( + mutableParticipants[localEndpointId] = p.updateTrackMetadata( p.videoTrack?.id(), mapOf("active" to isCameraOn.value) ) @@ -182,8 +200,13 @@ class RoomViewModel( localVideoTrack?.flipCamera() } - // MembraneRTCListener callbacks - override fun onConnected() { + override fun onSendMediaEvent(event: SerializedMediaEvent) { + viewModelScope.launch { + this@RoomViewModel.transport.send(event) + } + } + + private fun setupTracks() { room.value?.let { localAudioTrack = it.createAudioTrack( mapOf( @@ -215,29 +238,26 @@ class RoomViewModel( ) ) - it.join() - isCameraOn.value = localVideoTrack?.enabled() ?: false isMicrophoneOn.value = localAudioTrack?.enabled() ?: false - val participant = Participant(localPeerId, "Me", localVideoTrack, localAudioTrack) + val participant = Participant(localEndpointId, "Me", localVideoTrack, localAudioTrack) - mutableParticipants[localPeerId] = participant.updateTrackMetadata( + mutableParticipants[localEndpointId] = participant.updateTrackMetadata( participant.audioTrack?.id(), mapOf("active" to isMicrophoneOn.value) ).updateTrackMetadata( participant.videoTrack?.id(), mapOf("active" to isCameraOn.value) ) - - emitParticipants() } } - override fun onJoinSuccess(peerID: String, peersInRoom: List) { + // MembraneRTCListener callbacks + override fun onConnected(endpointID: String, otherEndpoints: List) { Timber.i("Successfully join the room") - peersInRoom.forEach { + otherEndpoints.forEach { mutableParticipants[it.id] = Participant( it.id, it.metadata["displayName"] as? String ?: "UNKNOWN", @@ -246,15 +266,21 @@ class RoomViewModel( ) } + setupTracks() emitParticipants() } - override fun onJoinError(metadata: Any) { - Timber.e("User has been denied to join the room") + override fun onDisconnected() { + room.value = null + transport.disconnect() + } + + override fun onConnectError(metadata: Any) { + Timber.e("User has been denied to connect to the room") } override fun onTrackReady(ctx: TrackContext) { - val participant = mutableParticipants[ctx.peer.id] ?: return + val participant = mutableParticipants[ctx.endpoint.id] ?: return val (id, newParticipant) = when (ctx.track) { is RemoteVideoTrack -> { @@ -272,7 +298,7 @@ class RoomViewModel( } else { val p = participant.copy(videoTrack = ctx.track as RemoteVideoTrack) Pair( - ctx.peer.id, + ctx.endpoint.id, p.copy( tracksMetadata = p.tracksMetadata + ( ( @@ -288,7 +314,7 @@ class RoomViewModel( globalToLocalTrackId[ctx.trackId] = (ctx.track as RemoteAudioTrack).id() val p = participant.copy(audioTrack = ctx.track as RemoteAudioTrack) Pair( - ctx.peer.id, + ctx.endpoint.id, p.copy( tracksMetadata = p.tracksMetadata + ( ( @@ -308,9 +334,9 @@ class RoomViewModel( emitParticipants() ctx.setOnVoiceActivityChangedListener { - val p = mutableParticipants[it.peer.id] + val p = mutableParticipants[it.endpoint.id] if (p != null) { - mutableParticipants[it.peer.id] = p.copy(vadStatus = it.vadStatus) + mutableParticipants[it.endpoint.id] = p.copy(vadStatus = it.vadStatus) emitParticipants() } } @@ -330,8 +356,8 @@ class RoomViewModel( emitParticipants() } else { - val participant = mutableParticipants[ctx.peer.id] - ?: throw IllegalArgumentException("No participant with id ${ctx.peer.id}") + val participant = mutableParticipants[ctx.endpoint.id] + ?: throw IllegalArgumentException("No participant with id ${ctx.endpoint.id}") val localTrackId = globalToLocalTrackId[ctx.trackId] val audioTrackId = participant.audioTrack?.id() @@ -346,13 +372,13 @@ class RoomViewModel( else -> throw IllegalArgumentException( - "Track ${ctx.trackId} has not been found for given peer ${ctx.peer.id}" + "Track ${ctx.trackId} has not been found for given endpoint ${ctx.endpoint.id}" ) } globalToLocalTrackId.remove(ctx.trackId) - mutableParticipants[ctx.peer.id] = newParticipant + mutableParticipants[ctx.endpoint.id] = newParticipant emitParticipants() } @@ -361,16 +387,16 @@ class RoomViewModel( } override fun onTrackUpdated(ctx: TrackContext) { - val p = mutableParticipants[ctx.peer.id] + val p = mutableParticipants[ctx.endpoint.id] if (p != null) { // Updates metadata of given track if (ctx.metadata["type"] == "camera") { - mutableParticipants[ctx.peer.id] = p.updateTrackMetadata( + mutableParticipants[ctx.endpoint.id] = p.updateTrackMetadata( p.videoTrack?.id(), ctx.metadata ) } else { - mutableParticipants[ctx.peer.id] = p.updateTrackMetadata( + mutableParticipants[ctx.endpoint.id] = p.updateTrackMetadata( p.audioTrack?.id(), ctx.metadata ) @@ -381,30 +407,25 @@ class RoomViewModel( Timber.i("Track has been updated $ctx") } - override fun onPeerJoined(peer: Peer) { - mutableParticipants[peer.id] = Participant( - id = peer.id, - displayName = peer.metadata["displayName"] as? String ?: "UNKNOWN" + override fun onEndpointAdded(endpoint: Endpoint) { + mutableParticipants[endpoint.id] = Participant( + id = endpoint.id, + displayName = endpoint.metadata["displayName"] as? String ?: "UNKNOWN" ) emitParticipants() - Timber.i("Peer has joined the room $peer") + Timber.i("Endpoint $endpoint has been added") } - override fun onPeerLeft(peer: Peer) { - mutableParticipants.remove(peer.id) + override fun onEndpointRemoved(endpoint: Endpoint) { + mutableParticipants.remove(endpoint.id) emitParticipants() - Timber.i("Peer has left the room $peer") + Timber.i("Endpoint $endpoint has been removed") } - override fun onPeerUpdated(peer: Peer) { - Timber.i("Peer has updated $peer") - } - - override fun onError(error: MembraneRTCError) { - Timber.e("Encountered an error $error") - errorMessage.value = "Encountered an error, go back and try again..." + override fun onEndpointUpdated(endpoint: Endpoint) { + Timber.i("Endpoint $endpoint has been updated") } fun startScreencast(mediaProjectionPermission: Intent) { @@ -479,4 +500,16 @@ class RoomViewModel( fun toggleScreencastTrackEncoding(encoding: TrackEncoding) { localScreencastTrack?.id()?.let { toggleTrackEncoding(screencastSimulcastConfig, it, encoding) } } + + override fun onEvent(event: SerializedMediaEvent) { + room.value?.receiveMediaEvent(event) + } + + override fun onError(error: PhoenixTransportError) { + Timber.e("Encountered an error $error") + errorMessage.value = "Encountered an error, go back and try again..." + } + + override fun onClose() { + } }