From 6372c01258f76c720d3713113da90bf195664761 Mon Sep 17 00:00:00 2001 From: Angelika Serwa Date: Tue, 16 May 2023 12:02:08 +0200 Subject: [PATCH 1/3] [RTC-238] Refactor for Jellyfish client (#41) * [WIP] Socket refactor * Make options optional --- .../membraneframework/rtc/ConnectOptions.kt | 16 ---- .../membraneframework/rtc/CreateOptions.kt | 9 +++ .../rtc/InternalMembraneRTC.kt | 51 ++++-------- .../org/membraneframework/rtc/MembraneRTC.kt | 35 ++++++--- .../rtc/MembraneRTCListener.kt | 8 +- .../rtc/PeerConnectionFactoryWrapper.kt | 7 +- .../rtc/RTCEngineCommunication.kt | 76 +++++++++--------- .../rtc/RTCEngineListener.kt | 5 +- .../rtc/transport/EventTransport.kt | 45 ----------- .../rtc/transport/PhoenixTransport.kt | 74 ++++++++++-------- .../org/membraneframework/rtc/utils/types.kt | 1 + .../viewmodels/RoomViewModel.kt | 77 +++++++++++++------ 12 files changed, 192 insertions(+), 212 deletions(-) delete mode 100644 MembraneRTC/src/main/java/org/membraneframework/rtc/ConnectOptions.kt create mode 100644 MembraneRTC/src/main/java/org/membraneframework/rtc/CreateOptions.kt delete mode 100644 MembraneRTC/src/main/java/org/membraneframework/rtc/transport/EventTransport.kt diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/ConnectOptions.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/ConnectOptions.kt deleted file mode 100644 index d5a7abd..0000000 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/ConnectOptions.kt +++ /dev/null @@ -1,16 +0,0 @@ -package org.membraneframework.rtc - -import org.membraneframework.rtc.transport.EventTransport -import org.membraneframework.rtc.utils.Metadata - -/** - * A set of connect options used by the MembraneRTC when connecting to the media server. - * @property transport The transport implementation that will be used for media events exchanged by the client. Should be ready for connect method invocation - * @property config The metadata that will be used by the transport to connect - * @property encoderOptions The encoder options used to encode video - */ -data class ConnectOptions( - val transport: EventTransport, - val config: Metadata, - val encoderOptions: EncoderOptions = EncoderOptions() -) diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/CreateOptions.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/CreateOptions.kt new file mode 100644 index 0000000..85ac841 --- /dev/null +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/CreateOptions.kt @@ -0,0 +1,9 @@ +package org.membraneframework.rtc + +/** + * A set of options used by the MembraneRTC when creating the client. + * @property encoderOptions The encoder options used to encode video + */ +data class CreateOptions( + val encoderOptions: EncoderOptions = EncoderOptions() +) diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt index f79500f..812ca97 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt @@ -15,9 +15,9 @@ import org.membraneframework.rtc.models.Peer import org.membraneframework.rtc.models.RTCStats import org.membraneframework.rtc.models.TrackContext import org.membraneframework.rtc.models.VadStatus -import org.membraneframework.rtc.transport.EventTransportError import org.membraneframework.rtc.utils.ClosableCoroutineScope import org.membraneframework.rtc.utils.Metadata +import org.membraneframework.rtc.utils.SerializedMediaEvent import org.membraneframework.rtc.utils.TimberDebugTree import org.webrtc.* import org.webrtc.AudioTrack @@ -29,7 +29,7 @@ internal class InternalMembraneRTC @AssistedInject constructor( @Assisted - private val connectOptions: ConnectOptions, + private val createOptions: CreateOptions, @Assisted private val listener: MembraneRTCListener, @Assisted @@ -40,12 +40,12 @@ constructor( peerConnectionManagerFactory: PeerConnectionManager.PeerConnectionManagerFactory, peerConnectionFactoryWrapperFactory: PeerConnectionFactoryWrapper.PeerConnectionFactoryWrapperFactory ) : RTCEngineListener, PeerConnectionListener { - private val rtcEngineCommunication = rtcEngineCommunicationFactory.create(connectOptions.transport, this) - private val peerConnectionFactoryWrapper = peerConnectionFactoryWrapperFactory.create(connectOptions) + private val rtcEngineCommunication = rtcEngineCommunicationFactory.create(this) + private val peerConnectionFactoryWrapper = peerConnectionFactoryWrapperFactory.create(createOptions) private val peerConnectionManager = peerConnectionManagerFactory.create(this, peerConnectionFactoryWrapper) private var localPeer: Peer = - Peer(id = "", metadata = connectOptions.config, trackIdToMetadata = mapOf()) + Peer(id = "", metadata = mapOf(), trackIdToMetadata = mapOf()) // mapping from peer's id to the peer himself private val remotePeers = HashMap() @@ -68,28 +68,14 @@ constructor( @AssistedFactory interface Factory { fun create( - connectOptions: ConnectOptions, + createOptions: CreateOptions, listener: MembraneRTCListener, defaultDispatcher: CoroutineDispatcher ): InternalMembraneRTC } - fun connect() { - coroutineScope.launch { - try { - rtcEngineCommunication.connect() - listener.onConnected() - } catch (e: Exception) { - Timber.i(e, "Failed to connect") - - listener.onError(MembraneRTCError.Transport("Failed to connect")) - } - } - } - fun disconnect() { coroutineScope.launch { - rtcEngineCommunication.disconnect() localTracksMutex.withLock { localTracks.forEach { it.stop() } } @@ -97,9 +83,14 @@ constructor( } } - fun join() { + fun receiveMediaEvent(event: SerializedMediaEvent) { + rtcEngineCommunication.onEvent(event) + } + + fun join(peerMetadata: Metadata) { coroutineScope.launch { - rtcEngineCommunication.join(localPeer.metadata) + localPeer = localPeer.copy(metadata = peerMetadata) + rtcEngineCommunication.join(peerMetadata) } } @@ -216,6 +207,10 @@ constructor( } } + override fun onSendMediaEvent(event: SerializedMediaEvent) { + listener.onSendMediaEvent(event) + } + override fun onPeerAccepted(peerId: String, peersInRoom: List) { this.localPeer = localPeer.copy(id = peerId) @@ -440,18 +435,6 @@ constructor( listener.onBandwidthEstimationChanged(estimation) } - override fun onError(error: EventTransportError) { - if (error is EventTransportError.ConnectionError) { - listener.onError(MembraneRTCError.Transport(error.reason)) - } else { - listener.onError(MembraneRTCError.Transport(error.message ?: "unknown transport message")) - } - } - - override fun onClose() { - Timber.i("Transport has been closed") - } - fun setTargetTrackEncoding(trackId: String, encoding: TrackEncoding) { coroutineScope.launch { rtcEngineCommunication.setTargetTrackEncoding(trackId, encoding) diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt index c7b56b2..bda8269 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt @@ -7,6 +7,7 @@ import org.membraneframework.rtc.dagger.DaggerMembraneRTCComponent import org.membraneframework.rtc.media.* import org.membraneframework.rtc.models.RTCStats import org.membraneframework.rtc.utils.Metadata +import org.membraneframework.rtc.utils.SerializedMediaEvent import org.webrtc.Logging /** @@ -37,14 +38,15 @@ class MembraneRTC private constructor( private var client: InternalMembraneRTC ) { - /** - * Starts the join process. + * Tries to join the RTC Engine. If user is accepted then onJoinSuccess will be called. + * In other case {@link Callbacks.onJoinError} is invoked. *

- * Should be called only when a listener received onConnected message. + * @param peerMetadata - Any information that other peers will receive in onPeerJoined + * after accepting this peer */ - fun join() { - client.join() + fun join(peerMetadata: Metadata) { + client.join(peerMetadata) } /** @@ -56,6 +58,17 @@ private constructor( client.disconnect() } + /** + * Feeds media event received from RTC Engine to MembraneWebRTC. + * This function should be called whenever some media event from RTC Engine + * was received and can result in MembraneWebRTC generating some other + * media events. + * @param mediaEvent - String data received over custom signalling layer. + */ + fun receiveMediaEvent(mediaEvent: SerializedMediaEvent) { + client.receiveMediaEvent(mediaEvent) + } + /** * Creates a video track utilizing device's camera. *

@@ -217,14 +230,18 @@ private constructor( companion object { /** - * Creates an instance of MembraneRTC client and starts the connecting process. + * Creates an instance of MembraneRTC client. * * @param appContext the context of the current application - * @param options a set of options defining parameters such as event transport or connect metadata * @param listener a listener that will receive all notifications emitted by the MembraneRTC + * @param options a set of options defining parameters such as encoder parameters * @return an instance of the client in connecting state */ - fun connect(appContext: Context, options: ConnectOptions, listener: MembraneRTCListener): MembraneRTC { + fun create( + appContext: Context, + listener: MembraneRTCListener, + options: CreateOptions = CreateOptions() + ): MembraneRTC { val ctx = appContext.applicationContext val component = DaggerMembraneRTCComponent @@ -235,8 +252,6 @@ private constructor( .membraneRTCFactory() .create(options, listener, Dispatchers.Default) - client.connect() - return MembraneRTC(client) } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt index 549e9d0..af15659 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt @@ -2,11 +2,12 @@ package org.membraneframework.rtc import org.membraneframework.rtc.models.Peer import org.membraneframework.rtc.models.TrackContext +import org.membraneframework.rtc.utils.SerializedMediaEvent import timber.log.Timber interface MembraneRTCListener { - // / Callback invoked when client has successfully connected via transport layer. - fun onConnected() + // Called each time MembraneWebRTC need to send some data to the server. + fun onSendMediaEvent(event: SerializedMediaEvent) // /Callback invoked when the client has been approved to participate in media exchange. fun onJoinSuccess(peerID: String, peersInRoom: List) @@ -43,9 +44,6 @@ interface MembraneRTCListener { ) } - // /Callback invoked when an errors happens. - fun onError(error: MembraneRTCError) - // Callback invoked every time a local peer is removed by the server fun onRemoved(reason: String) { Timber.e("Peer removed") } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionFactoryWrapper.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionFactoryWrapper.kt index 0b2932c..678e47f 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionFactoryWrapper.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionFactoryWrapper.kt @@ -7,11 +7,10 @@ import dagger.assisted.AssistedInject import org.membraneframework.rtc.media.SimulcastVideoEncoderFactoryWrapper import org.webrtc.* import org.webrtc.audio.AudioDeviceModule -import java.util.* internal class PeerConnectionFactoryWrapper @AssistedInject constructor( - @Assisted private val connectOptions: ConnectOptions, + @Assisted private val createOptions: CreateOptions, audioDeviceModule: AudioDeviceModule, eglBase: EglBase, appContext: Context @@ -19,7 +18,7 @@ internal class PeerConnectionFactoryWrapper @AssistedFactory interface PeerConnectionFactoryWrapperFactory { fun create( - connectOptions: ConnectOptions + createOptions: CreateOptions ): PeerConnectionFactoryWrapper } @@ -34,7 +33,7 @@ internal class PeerConnectionFactoryWrapper PeerConnectionFactory.builder().setAudioDeviceModule(audioDeviceModule).setVideoEncoderFactory( SimulcastVideoEncoderFactoryWrapper( eglBase.eglBaseContext, - connectOptions.encoderOptions + createOptions.encoderOptions ) ).setVideoDecoderFactory(DefaultVideoDecoderFactory(eglBase.eglBaseContext)).createPeerConnectionFactory() } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt index 3c8a1e7..e64ed08 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt @@ -1,54 +1,42 @@ package org.membraneframework.rtc +import com.google.gson.reflect.TypeToken import dagger.assisted.Assisted import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject import org.membraneframework.rtc.events.* -import org.membraneframework.rtc.transport.EventTransport -import org.membraneframework.rtc.transport.EventTransportError -import org.membraneframework.rtc.transport.EventTransportListener import org.membraneframework.rtc.utils.Metadata +import org.membraneframework.rtc.utils.SerializedMediaEvent import timber.log.Timber import kotlin.math.roundToLong internal class RTCEngineCommunication @AssistedInject constructor( - @Assisted - private val transport: EventTransport, @Assisted private val engineListener: RTCEngineListener -) : EventTransportListener { +) { @AssistedFactory interface RTCEngineCommunicationFactory { fun create( - transport: EventTransport, listener: RTCEngineListener ): RTCEngineCommunication } - suspend fun connect() { - transport.connect(this@RTCEngineCommunication) - } - - suspend fun disconnect() { - transport.disconnect() - } - - suspend fun join(peerMetadata: Metadata) { - transport.send(Join(peerMetadata)) + fun join(peerMetadata: Metadata) { + sendEvent(Join(peerMetadata)) } - suspend fun updatePeerMetadata(peerMetadata: Metadata) { - transport.send(UpdatePeerMetadata(peerMetadata)) + fun updatePeerMetadata(peerMetadata: Metadata) { + sendEvent(UpdatePeerMetadata(peerMetadata)) } - suspend fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { - transport.send(UpdateTrackMetadata(trackId, trackMetadata)) + fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { + sendEvent(UpdateTrackMetadata(trackId, trackMetadata)) } - suspend fun setTargetTrackEncoding(trackId: String, encoding: TrackEncoding) { - transport.send( + fun setTargetTrackEncoding(trackId: String, encoding: TrackEncoding) { + sendEvent( SelectEncoding( trackId, encoding.rid @@ -56,12 +44,12 @@ constructor( ) } - suspend fun renegotiateTracks() { - transport.send(RenegotiateTracks()) + fun renegotiateTracks() { + sendEvent(RenegotiateTracks()) } - suspend fun localCandidate(sdp: String, sdpMLineIndex: Int) { - transport.send( + fun localCandidate(sdp: String, sdpMLineIndex: Int) { + sendEvent( LocalCandidate( sdp, sdpMLineIndex @@ -69,12 +57,12 @@ constructor( ) } - suspend fun sdpOffer( + fun sdpOffer( sdp: String, trackIdToTrackMetadata: Map, midToTrackId: Map ) { - transport.send( + sendEvent( SdpOffer( sdp, trackIdToTrackMetadata, @@ -83,8 +71,26 @@ constructor( ) } - override fun onEvent(event: ReceivableEvent) { - when (event) { + private fun sendEvent(event: SendableEvent) { + val serializedMediaEvent = gson.toJson(event.serializeToMap()) + engineListener.onSendMediaEvent(serializedMediaEvent) + } + + private fun decodeEvent(event: SerializedMediaEvent): ReceivableEvent? { + val type = object : TypeToken>() {}.type + + val rawMessage: Map = gson.fromJson(event, type) + + ReceivableEvent.decode(rawMessage)?.let { + return it + } ?: run { + Timber.d("Failed to decode event $rawMessage") + return null + } + } + + fun onEvent(serializedEvent: SerializedMediaEvent) { + when (val event = decodeEvent(serializedEvent)) { is OfferData -> engineListener.onOfferData(event.data.integratedTurnServers, event.data.tracksTypes) is PeerAccepted -> engineListener.onPeerAccepted(event.data.id, event.data.peersInRoom) is PeerRemoved -> engineListener.onRemoved(event.data.peerId, event.data.reason) @@ -112,12 +118,4 @@ constructor( else -> Timber.e("Failed to process unknown event: $event") } } - - override fun onError(error: EventTransportError) { - engineListener.onError(error) - } - - override fun onClose() { - engineListener.onClose() - } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt index 967381a..5e05187 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt @@ -2,10 +2,11 @@ package org.membraneframework.rtc import org.membraneframework.rtc.events.OfferData import org.membraneframework.rtc.models.Peer -import org.membraneframework.rtc.transport.EventTransportError import org.membraneframework.rtc.utils.Metadata +import org.membraneframework.rtc.utils.SerializedMediaEvent internal interface RTCEngineListener { + fun onSendMediaEvent(event: SerializedMediaEvent) fun onPeerAccepted(peerId: String, peersInRoom: List) fun onPeerDenied() fun onPeerJoined(peer: Peer) @@ -21,6 +22,4 @@ internal interface RTCEngineListener { fun onRemoved(peerId: String, reason: String) fun onVadNotification(trackId: String, status: String) fun onBandwidthEstimation(estimation: Long) - fun onError(error: EventTransportError) - fun onClose() } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/EventTransport.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/EventTransport.kt deleted file mode 100644 index 22ede11..0000000 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/EventTransport.kt +++ /dev/null @@ -1,45 +0,0 @@ -package org.membraneframework.rtc.transport - -import org.membraneframework.rtc.events.* - -/** - * A base class of exceptions that can be emitted by the EventTransport implementations. - */ -sealed class EventTransportError : Exception() { - data class Unauthorized(val reason: String) : EventTransportError() - data class ConnectionError(val reason: String) : EventTransportError() - data class Unexpected(val reason: String) : EventTransportError() - - override fun toString(): String { - return when (this) { - is Unauthorized -> - "User is unauthorized to use the transport: ${this.reason}" - is ConnectionError -> - "Failed to connect with the remote side: ${this.reason}" - is Unexpected -> - "Encountered unexpected error: ${this.reason}" - } - } -} - -/** - * An interface defining a listener to a EventTransport. - */ -interface EventTransportListener { - fun onEvent(event: ReceivableEvent) - fun onError(error: EventTransportError) - fun onClose() -} - -/** - * Interface defining an event transport that the MembraneRTC uses for - * relaying media events from/to the Membrane RTC Engine. - *

- * An implementation of the transport should parse and forward received events to the listener - * passed with connect method. - */ -interface EventTransport { - suspend fun connect(listener: EventTransportListener) - suspend fun disconnect() - suspend fun send(event: SendableEvent) -} diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt index 931565d..a73d202 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt @@ -1,36 +1,55 @@ package org.membraneframework.rtc.transport -import com.google.gson.GsonBuilder -import com.google.gson.reflect.TypeToken import kotlinx.coroutines.* -import org.membraneframework.rtc.events.ReceivableEvent -import org.membraneframework.rtc.events.SendableEvent -import org.membraneframework.rtc.events.serializeToMap import org.membraneframework.rtc.utils.ClosableCoroutineScope +import org.membraneframework.rtc.utils.SerializedMediaEvent import org.membraneframework.rtc.utils.SocketChannelParams import org.membraneframework.rtc.utils.SocketConnectionParams import org.phoenixframework.Channel import org.phoenixframework.Socket import timber.log.Timber +sealed class PhoenixTransportError : Exception() { + data class Unauthorized(val reason: String) : PhoenixTransportError() + data class ConnectionError(val reason: String) : PhoenixTransportError() + data class Unexpected(val reason: String) : PhoenixTransportError() + + override fun toString(): String { + return when (this) { + is Unauthorized -> + "User is unauthorized to use the transport: ${this.reason}" + is ConnectionError -> + "Failed to connect with the remote side: ${this.reason}" + is Unexpected -> + "Encountered unexpected error: ${this.reason}" + } + } +} + +/** + * An interface defining a listener to a PhoenixTransport. + */ +interface PhoenixTransportListener { + fun onEvent(event: SerializedMediaEvent) + fun onError(error: PhoenixTransportError) + fun onClose() +} + class PhoenixTransport constructor( private val url: String, private val topic: String, private val ioDispatcher: CoroutineDispatcher, private val params: SocketConnectionParams? = emptyMap(), private val socketChannelParams: SocketChannelParams = emptyMap() -) : EventTransport { - +) { private lateinit var coroutineScope: CoroutineScope private var socket: Socket? = null private var channel: Channel? = null - private var listener: EventTransportListener? = null - - private val gson = GsonBuilder().create() + private var listener: PhoenixTransportListener? = null private var joinContinuation: CancellableContinuation? = null - override suspend fun connect(listener: EventTransportListener) { + suspend fun connect(listener: PhoenixTransportListener) { Timber.i("Starting connection...") this.listener = listener @@ -47,11 +66,11 @@ class PhoenixTransport constructor( } val errorRef = socket!!.onError { error, _ -> - continuation.cancel(EventTransportError.ConnectionError(error.toString())) + continuation.cancel(PhoenixTransportError.ConnectionError(error.toString())) } val closeRef = socket!!.onClose { - continuation.cancel(EventTransportError.ConnectionError("closed")) + continuation.cancel(PhoenixTransportError.ConnectionError("closed")) } socketRefs += openRef @@ -62,7 +81,7 @@ class PhoenixTransport constructor( socket!!.off(socketRefs.toList()) socket!!.onError { error, _ -> - this.listener?.onError(EventTransportError.ConnectionError(error.toString())) + this.listener?.onError(PhoenixTransportError.ConnectionError(error.toString())) } socket!!.onClose { @@ -77,25 +96,13 @@ class PhoenixTransport constructor( } ?.receive("error") { _ -> joinContinuation?.resumeWith( - Result.failure(EventTransportError.Unauthorized("couldn't join phoenix channel")) + Result.failure(PhoenixTransportError.Unauthorized("couldn't join phoenix channel")) ) } channel?.on("mediaEvent") { message -> - try { - val data = message.payload["data"] as String - val type = object : TypeToken>() {}.type - - val rawMessage: Map = gson.fromJson(data, type) - - ReceivableEvent.decode(rawMessage)?.let { - listener.onEvent(it) - } ?: run { - Timber.d("Failed to decode event $rawMessage") - } - } catch (e: Exception) { - Timber.e(e) - } + val data = message.payload["data"] as String + listener.onEvent(data) } return suspendCancellableCoroutine { @@ -103,7 +110,7 @@ class PhoenixTransport constructor( } } - override suspend fun disconnect() { + suspend fun disconnect() { if (channel != null) { channel ?.leave() @@ -115,13 +122,12 @@ class PhoenixTransport constructor( } } - override suspend fun send(event: SendableEvent) { + suspend fun send(event: SerializedMediaEvent) { coroutineScope.async { val payload = mapOf( - "data" to gson.toJson(event.serializeToMap()) + "data" to event ) - - channel ?.push("mediaEvent", payload) + channel?.push("mediaEvent", payload) } } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/types.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/types.kt index 1e92442..0621404 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/types.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/types.kt @@ -4,3 +4,4 @@ typealias Payload = Map typealias Metadata = Map typealias SocketConnectionParams = Map typealias SocketChannelParams = Map +typealias SerializedMediaEvent = String diff --git a/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt b/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt index 90c1ced..73cf0ea 100644 --- a/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt +++ b/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt @@ -13,13 +13,16 @@ import org.membraneframework.rtc.media.* import org.membraneframework.rtc.models.Peer import org.membraneframework.rtc.models.TrackContext import org.membraneframework.rtc.transport.PhoenixTransport +import org.membraneframework.rtc.transport.PhoenixTransportError +import org.membraneframework.rtc.transport.PhoenixTransportListener +import org.membraneframework.rtc.utils.SerializedMediaEvent import timber.log.Timber import java.util.* class RoomViewModel( val url: String, application: Application -) : AndroidViewModel(application), MembraneRTCListener { +) : AndroidViewModel(application), MembraneRTCListener, PhoenixTransportListener { // media tracks var localAudioTrack: LocalAudioTrack? = null var localVideoTrack: LocalVideoTrack? = null @@ -45,6 +48,8 @@ class RoomViewModel( private val globalToLocalTrackId = HashMap() private val params = mapOf("token" to "mocktoken") + private lateinit var transport: PhoenixTransport + val videoSimulcastConfig = MutableStateFlow( SimulcastConfig( enabled = true, @@ -67,30 +72,45 @@ class RoomViewModel( // disconnect from the current view room.value?.disconnect() - room.value = MembraneRTC.connect( + val transport = PhoenixTransport( + url, + "room:$roomName", + Dispatchers.IO, + params, + mapOf("isSimulcastOn" to true) + ) + + try { + transport.connect(this@RoomViewModel) + } catch (e: Exception) { + Timber.i(e, "Failed to connect") + + errorMessage.value = "Encountered an error, go back and try again..." + return@launch + } + + this@RoomViewModel.transport = transport + + room.value = MembraneRTC.create( appContext = getApplication(), - options = ConnectOptions( - transport = PhoenixTransport( - url, - "room:$roomName", - Dispatchers.IO, - params, - mapOf("isSimulcastOn" to true) - ), - config = mapOf("displayName" to displayName), + options = CreateOptions( encoderOptions = EncoderOptions( encoderType = EncoderType.SOFTWARE ) ), listener = this@RoomViewModel ) + + setupTracksAndJoinRoom() } } fun disconnect() { - room.value?.disconnect() - - room.value = null + viewModelScope.launch { + room.value?.disconnect() + room.value = null + transport.disconnect() + } } fun focusVideo(participantId: String) { @@ -182,8 +202,13 @@ class RoomViewModel( localVideoTrack?.flipCamera() } - // MembraneRTCListener callbacks - override fun onConnected() { + override fun onSendMediaEvent(event: SerializedMediaEvent) { + viewModelScope.launch { + this@RoomViewModel.transport.send(event) + } + } + + private fun setupTracksAndJoinRoom() { room.value?.let { localAudioTrack = it.createAudioTrack( mapOf( @@ -215,7 +240,7 @@ class RoomViewModel( ) ) - it.join() + it.join(mapOf("displayName" to (localDisplayName ?: ""))) isCameraOn.value = localVideoTrack?.enabled() ?: false isMicrophoneOn.value = localAudioTrack?.enabled() ?: false @@ -234,6 +259,7 @@ class RoomViewModel( } } + // MembraneRTCListener callbacks override fun onJoinSuccess(peerID: String, peersInRoom: List) { Timber.i("Successfully join the room") @@ -402,11 +428,6 @@ class RoomViewModel( Timber.i("Peer has updated $peer") } - override fun onError(error: MembraneRTCError) { - Timber.e("Encountered an error $error") - errorMessage.value = "Encountered an error, go back and try again..." - } - fun startScreencast(mediaProjectionPermission: Intent) { if (localScreencastTrack != null) return @@ -479,4 +500,16 @@ class RoomViewModel( fun toggleScreencastTrackEncoding(encoding: TrackEncoding) { localScreencastTrack?.id()?.let { toggleTrackEncoding(screencastSimulcastConfig, it, encoding) } } + + override fun onEvent(event: SerializedMediaEvent) { + room.value?.receiveMediaEvent(event) + } + + override fun onError(error: PhoenixTransportError) { + Timber.e("Encountered an error $error") + errorMessage.value = "Encountered an error, go back and try again..." + } + + override fun onClose() { + } } From 00374040061ed3eabae8cd7b9a9e343b1eacf497 Mon Sep 17 00:00:00 2001 From: Angelika Serwa Date: Wed, 7 Jun 2023 13:36:20 +0200 Subject: [PATCH 2/3] [RTC-271] Remove peers (#45) * peer => endpoint Update events fix lint Fix tests fix tests fix filename Add manual docs dispach Change webrtc names to be aligned with the original one not working lots of logs fix connecting issues update Some fixes * Review comments --------- Co-authored-by: skyman503 --- .github/workflows/publish_docs.yaml | 4 +- .../rtc/InternalMembraneRTC.kt | 170 +++++++++--------- .../org/membraneframework/rtc/MembraneRTC.kt | 38 ++-- .../rtc/MembraneRTCListener.kt | 44 ++--- .../rtc/PeerConnectionManager.kt | 5 +- .../rtc/RTCEngineCommunication.kt | 37 ++-- .../rtc/RTCEngineListener.kt | 20 +-- .../org/membraneframework/rtc/events/Event.kt | 93 +++++----- .../rtc/models/{Peer.kt => Endpoint.kt} | 13 +- .../rtc/models/TrackContext.kt | 6 +- .../rtc/transport/PhoenixTransport.kt | 14 +- ...ionUtils.kt => EndpointConnectionUtils.kt} | 0 ...st.kt => EndpointConnectionManagerTest.kt} | 28 +-- app/build.gradle | 2 +- .../viewmodels/RoomViewModel.kt | 83 ++++----- 15 files changed, 275 insertions(+), 282 deletions(-) rename MembraneRTC/src/main/java/org/membraneframework/rtc/models/{Peer.kt => Endpoint.kt} (64%) rename MembraneRTC/src/main/java/org/membraneframework/rtc/utils/{PeerConnectionUtils.kt => EndpointConnectionUtils.kt} (100%) rename MembraneRTC/src/test/java/org/membraneframework/rtc/test/{PeerConnectionManagerTest.kt => EndpointConnectionManagerTest.kt} (85%) diff --git a/.github/workflows/publish_docs.yaml b/.github/workflows/publish_docs.yaml index f3d91b9..3a19ce9 100644 --- a/.github/workflows/publish_docs.yaml +++ b/.github/workflows/publish_docs.yaml @@ -1,6 +1,6 @@ - name: Deploy Docs on: + workflow_dispatch: push: tags: - "*" @@ -20,4 +20,4 @@ jobs: REPO: self BRANCH: gh-pages FOLDER: MembraneRTC/build/dokka/html - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt index 812ca97..74fa830 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.sync.withLock import org.membraneframework.rtc.events.OfferData import org.membraneframework.rtc.media.* import org.membraneframework.rtc.models.EncodingReason -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.models.RTCStats import org.membraneframework.rtc.models.TrackContext import org.membraneframework.rtc.models.VadStatus @@ -42,13 +42,16 @@ constructor( ) : RTCEngineListener, PeerConnectionListener { private val rtcEngineCommunication = rtcEngineCommunicationFactory.create(this) private val peerConnectionFactoryWrapper = peerConnectionFactoryWrapperFactory.create(createOptions) - private val peerConnectionManager = peerConnectionManagerFactory.create(this, peerConnectionFactoryWrapper) + private val endpointConnectionManager = peerConnectionManagerFactory.create( + this, + peerConnectionFactoryWrapper + ) - private var localPeer: Peer = - Peer(id = "", metadata = mapOf(), trackIdToMetadata = mapOf()) + private var localEndpoint: Endpoint = + Endpoint(id = "", type = "webrtc", metadata = mapOf(), trackIdToMetadata = mapOf()) - // mapping from peer's id to the peer himself - private val remotePeers = HashMap() + // mapping from endpoint's id to the endpoint himself + private val remoteEndpoints = HashMap() // mapping from remote track's id to its context private val trackContexts = HashMap() @@ -76,10 +79,11 @@ constructor( fun disconnect() { coroutineScope.launch { + rtcEngineCommunication.disconnect() localTracksMutex.withLock { localTracks.forEach { it.stop() } } - peerConnectionManager.close() + endpointConnectionManager.close() } } @@ -87,10 +91,10 @@ constructor( rtcEngineCommunication.onEvent(event) } - fun join(peerMetadata: Metadata) { + fun connect(endpointMetadata: Metadata) { coroutineScope.launch { - localPeer = localPeer.copy(metadata = peerMetadata) - rtcEngineCommunication.join(peerMetadata) + localEndpoint = localEndpoint.copy(metadata = endpointMetadata) + rtcEngineCommunication.connect(endpointMetadata) } } @@ -110,31 +114,33 @@ constructor( } localTracks.add(videoTrack) - localPeer = localPeer.withTrack(videoTrack.id(), metadata) - + localEndpoint = localEndpoint.withTrack(videoTrack.id(), metadata) return videoTrack } fun createLocalAudioTrack(metadata: Metadata = mapOf()): LocalAudioTrack { - val audioTrack = LocalAudioTrack.create(context, peerConnectionFactoryWrapper.peerConnectionFactory).also { + val audioTrack = LocalAudioTrack.create( + context, + peerConnectionFactoryWrapper.peerConnectionFactory + ).also { it.start() } localTracks.add(audioTrack) - localPeer = localPeer.withTrack(audioTrack.id(), metadata) + localEndpoint = localEndpoint.withTrack(audioTrack.id(), metadata) return audioTrack } fun setTrackBandwidth(trackId: String, bandwidthLimit: TrackBandwidthLimit.BandwidthLimit) { coroutineScope.launch { - peerConnectionManager.setTrackBandwidth(trackId, bandwidthLimit) + endpointConnectionManager.setTrackBandwidth(trackId, bandwidthLimit) } } fun setEncodingBandwidth(trackId: String, encoding: String, bandwidthLimit: TrackBandwidthLimit.BandwidthLimit) { coroutineScope.launch { - peerConnectionManager.setEncodingBandwidth(trackId, encoding, bandwidthLimit) + endpointConnectionManager.setEncodingBandwidth(trackId, encoding, bandwidthLimit) } } @@ -157,7 +163,7 @@ constructor( } localTracks.add(screencastTrack) - localPeer = localPeer.withTrack(screencastTrack.id(), metadata) + localEndpoint = localEndpoint.withTrack(screencastTrack.id(), metadata) coroutineScope.launch { screencastTrack.startForegroundService(null, null) @@ -167,7 +173,7 @@ constructor( val streamIds = listOf(UUID.randomUUID().toString()) coroutineScope.launch { - peerConnectionManager.addTrack(screencastTrack, streamIds) + endpointConnectionManager.addTrack(screencastTrack, streamIds) rtcEngineCommunication.renegotiateTracks() } @@ -182,10 +188,10 @@ constructor( return@runBlocking false } - peerConnectionManager.removeTrack(track.id()) + endpointConnectionManager.removeTrack(track.id()) localTracks.remove(track) - localPeer = localPeer.withoutTrack(trackId) + localEndpoint = localEndpoint.withoutTrack(trackId) track.stop() } rtcEngineCommunication.renegotiateTracks() @@ -193,67 +199,66 @@ constructor( } } - fun updatePeerMetadata(peerMetadata: Metadata) { + fun updateEndpointMetadata(endpointMetadata: Metadata) { coroutineScope.launch { - rtcEngineCommunication.updatePeerMetadata(peerMetadata) - localPeer = localPeer.copy(metadata = peerMetadata) + rtcEngineCommunication.updateEndpointMetadata(endpointMetadata) + localEndpoint = localEndpoint.copy(metadata = endpointMetadata) } } fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { coroutineScope.launch { rtcEngineCommunication.updateTrackMetadata(trackId, trackMetadata) - localPeer = localPeer.withTrack(trackId, trackMetadata) + localEndpoint = localEndpoint.withTrack(trackId, trackMetadata) } } - override fun onSendMediaEvent(event: SerializedMediaEvent) { - listener.onSendMediaEvent(event) - } - - override fun onPeerAccepted(peerId: String, peersInRoom: List) { - this.localPeer = localPeer.copy(id = peerId) + override fun onConnected(endpointID: String, otherEndpoints: List) { + this.localEndpoint = localEndpoint.copy(id = endpointID) + listener.onConnected(endpointID, otherEndpoints) - listener.onJoinSuccess(localPeer.id, peersInRoom = peersInRoom) - - peersInRoom.forEach { - this.remotePeers[it.id] = it + otherEndpoints.forEach { + this.remoteEndpoints[it.id] = it for ((trackId, metadata) in it.trackIdToMetadata) { - val context = TrackContext(track = null, peer = it, trackId = trackId, metadata = metadata) + val context = TrackContext(track = null, endpoint = it, trackId = trackId, metadata = metadata) this.trackContexts[trackId] = context this.listener.onTrackAdded(context) } } + coroutineScope.launch { rtcEngineCommunication.renegotiateTracks() } } - override fun onPeerDenied() { - // TODO: return meaningful data - listener.onJoinError(mapOf()) + override fun onSendMediaEvent(event: SerializedMediaEvent) { + listener.onSendMediaEvent(event) } - override fun onPeerJoined(peer: Peer) { - if (peer.id == this.localPeer.id) { + override fun onEndpointAdded(endpoint: Endpoint) { + if (endpoint.id == this.localEndpoint.id) { return } - remotePeers[peer.id] = peer + remoteEndpoints[endpoint.id] = endpoint - listener.onPeerJoined(peer) + listener.onEndpointAdded(endpoint) } - override fun onPeerLeft(peerId: String) { - val peer = remotePeers.remove(peerId) ?: run { - Timber.e("Failed to process PeerLeft event: Peer not found: $peerId") + override fun onEndpointRemoved(endpointId: String) { + if (endpointId == localEndpoint.id) { + listener.onDisconnected() + return + } + val endpoint = remoteEndpoints.remove(endpointId) ?: run { + Timber.e("Failed to process EndpointLeft event: Endpoint not found: $endpointId") return } - val trackIds: List = peer.trackIdToMetadata.keys.toList() + val trackIds: List = endpoint.trackIdToMetadata.keys.toList() trackIds.forEach { trackContexts.remove(it)?.let { ctx -> @@ -261,16 +266,16 @@ constructor( } } - listener.onPeerLeft(peer) + listener.onEndpointRemoved(endpoint) } - override fun onPeerUpdated(peerId: String, peerMetadata: Metadata) { - val peer = remotePeers.remove(peerId) ?: run { - Timber.e("Failed to process PeerUpdated event: Peer not found: $peerId") + override fun onEndpointUpdated(endpointId: String, endpointMetadata: Metadata) { + val endpoint = remoteEndpoints.remove(endpointId) ?: run { + Timber.e("Failed to process EndpointUpdated event: Endpoint not found: $endpointId") return } - remotePeers[peer.id] = peer.copy(metadata = peerMetadata) + remoteEndpoints[endpoint.id] = endpoint.copy(metadata = endpointMetadata) } override fun onOfferData(integratedTurnServers: List, tracksTypes: Map) { @@ -278,11 +283,11 @@ constructor( try { val offer = localTracksMutex.withLock { - peerConnectionManager.getSdpOffer(integratedTurnServers, tracksTypes, localTracks) + endpointConnectionManager.getSdpOffer(integratedTurnServers, tracksTypes, localTracks) } rtcEngineCommunication.sdpOffer( offer.description, - localPeer.trackIdToMetadata, + localEndpoint.trackIdToMetadata, offer.midToTrackIdMapping ) } catch (e: Exception) { @@ -293,7 +298,7 @@ constructor( override fun onSdpAnswer(type: String, sdp: String, midToTrackId: Map) { coroutineScope.launch { - peerConnectionManager.onSdpAnswer(sdp, midToTrackId) + endpointConnectionManager.onSdpAnswer(sdp, midToTrackId) localTracksMutex.withLock { // temporary workaround, the backend doesn't add ~ in sdp answer @@ -307,7 +312,7 @@ constructor( } listOf(TrackEncoding.L, TrackEncoding.M, TrackEncoding.H).forEach { if (config?.activeEncodings?.contains(it) == false) { - peerConnectionManager.setTrackEncoding(localTrack.id(), it, false) + endpointConnectionManager.setTrackEncoding(localTrack.id(), it, false) } } } @@ -323,24 +328,24 @@ constructor( candidate ) - peerConnectionManager.onRemoteCandidate(iceCandidate) + endpointConnectionManager.onRemoteCandidate(iceCandidate) } } - override fun onTracksAdded(peerId: String, trackIdToMetadata: Map) { - if (localPeer.id == peerId) return + override fun onTracksAdded(endpointId: String, trackIdToMetadata: Map) { + if (localEndpoint.id == endpointId) return - val peer = remotePeers.remove(peerId) ?: run { - Timber.e("Failed to process TracksAdded event: Peer not found: $peerId") + val endpoint = remoteEndpoints.remove(endpointId) ?: run { + Timber.e("Failed to process TracksAdded event: Endpoint not found: $endpointId") return } - val updatedPeer = peer.copy(trackIdToMetadata = trackIdToMetadata) + val updatedEndpoint = endpoint.copy(trackIdToMetadata = trackIdToMetadata) - remotePeers[updatedPeer.id] = updatedPeer + remoteEndpoints[updatedEndpoint.id] = updatedEndpoint - for ((trackId, metadata) in updatedPeer.trackIdToMetadata) { - val context = TrackContext(track = null, peer = peer, trackId = trackId, metadata = metadata) + for ((trackId, metadata) in updatedEndpoint.trackIdToMetadata) { + val context = TrackContext(track = null, endpoint = endpoint, trackId = trackId, metadata = metadata) this.trackContexts[trackId] = context @@ -348,9 +353,9 @@ constructor( } } - override fun onTracksRemoved(peerId: String, trackIds: List) { - val peer = remotePeers[peerId] ?: run { - Timber.e("Failed to process TracksRemoved event: Peer not found: $peerId") + override fun onTracksRemoved(endpointId: String, trackIds: List) { + val endpoint = remoteEndpoints[endpointId] ?: run { + Timber.e("Failed to process TracksRemoved event: Endpoint not found: $endpointId") return } @@ -360,16 +365,16 @@ constructor( this.listener.onTrackRemoved(context) } - val updatedPeer = trackIds.fold(peer) { acc, trackId -> + val updatedEndpoint = trackIds.fold(endpoint) { acc, trackId -> acc.withoutTrack(trackId) } - remotePeers[peerId] = updatedPeer + remoteEndpoints[endpointId] = updatedEndpoint } - override fun onTrackUpdated(peerId: String, trackId: String, metadata: Metadata) { - val peer = remotePeers[peerId] ?: run { - Timber.e("Failed to process TrackUpdated event: Peer not found: $peerId") + override fun onTrackUpdated(endpointId: String, trackId: String, metadata: Metadata) { + val endpoint = remoteEndpoints[endpointId] ?: run { + Timber.e("Failed to process TrackUpdated event: Endpoint not found: $endpointId") return } @@ -380,16 +385,16 @@ constructor( context.metadata = metadata - val updatedPeer = peer + val updatedEndpoint = endpoint .withoutTrack(trackId) .withTrack(trackId, metadata) - remotePeers[peerId] = updatedPeer + remoteEndpoints[endpointId] = updatedEndpoint this.listener.onTrackUpdated(context) } - override fun onTrackEncodingChanged(peerId: String, trackId: String, encoding: String, encodingReason: String) { + override fun onTrackEncodingChanged(endpointId: String, trackId: String, encoding: String, encodingReason: String) { val encodingReasonEnum = EncodingReason.fromString(encodingReason) if (encodingReasonEnum == null) { Timber.e("Invalid encoding reason: $encodingReason") @@ -406,15 +411,6 @@ constructor( return } trackContext.setEncoding(encodingEnum, encodingReasonEnum) - this.listener.onTrackEncodingChanged(peerId, trackId, encoding) - } - - override fun onRemoved(peerId: String, reason: String) { - if (peerId != localPeer.id) { - Timber.e("Received onRemoved media event, but it does not refer to the local peer") - return - } - listener.onRemoved(reason) } override fun onVadNotification(trackId: String, status: String) { @@ -443,13 +439,13 @@ constructor( fun enableTrackEncoding(trackId: String, encoding: TrackEncoding) { coroutineScope.launch { - peerConnectionManager.setTrackEncoding(trackId, encoding, true) + endpointConnectionManager.setTrackEncoding(trackId, encoding, true) } } fun disableTrackEncoding(trackId: String, encoding: TrackEncoding) { coroutineScope.launch { - peerConnectionManager.setTrackEncoding(trackId, encoding, false) + endpointConnectionManager.setTrackEncoding(trackId, encoding, false) } } @@ -480,6 +476,6 @@ constructor( } fun getStats(): Map { - return peerConnectionManager.getStats() + return endpointConnectionManager.getStats() } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt index bda8269..d47a64b 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTC.kt @@ -15,11 +15,11 @@ import org.webrtc.Logging *

* The client is responsible for relaying MembraneRTC Engine specific messages through given reliable transport layer. * Once initialized, the client is responsible for exchanging necessary messages via provided EventTransport passed via `ConnectOptions` and managing underlying - * `PeerConnection`. The goal of the client is to be as lean as possible, meaning that all activities regarding the session such as moderating + * `EndpointConnection`. The goal of the client is to be as lean as possible, meaning that all activities regarding the session such as moderating * should be implemented by the user himself on top of the MembraneRTC. *

- * The user's ability of interacting with the client is greatly limited to the essential actions such as joining/leaving the session, - * adding/removing local tracks and receiving information about remote peers and their tracks that can be played by the user. + * The user's ability of interacting with the client is greatly limited to the essential actions such as connecting to/leaving the session, + * adding/removing local tracks and receiving information about remote endpoints and their tracks that can be played by the user. *

* User can request 3 different types of local tracks that will get forwarded to the server by the client: *

    @@ -28,10 +28,10 @@ import org.webrtc.Logging *
  • `LocalScreencast` - a screencast track capturing a device's screen using MediaProjection mechanism
  • *
*

- * It is recommended to request necessary audio and video tracks before joining the room but it does not mean it can't be done afterwards (in case of screencast) + * It is recommended to request necessary audio and video tracks before connecting to the room but it does not mean it can't be done afterwards (in case of screencast) *

- * Once the user received onConnected notification they can call the join method to initialize joining the session. - * After receiving `onJoinSuccess` a user will receive notification about various peers joining/leaving the session, new tracks being published and ready for playback + * Once the user created MembraneRTC client, they can call the connect method to initialize connecting to the session. + * After receiving `onConnected` a user will receive notification about various endpoints connecting to/leaving the session, new tracks being published and ready for playback * or going inactive. */ class MembraneRTC @@ -39,14 +39,14 @@ private constructor( private var client: InternalMembraneRTC ) { /** - * Tries to join the RTC Engine. If user is accepted then onJoinSuccess will be called. - * In other case {@link Callbacks.onJoinError} is invoked. + * Tries to connect the RTC Engine. If user is accepted then onConnected will be called. + * In other case {@link Callbacks.onConnectError} is invoked. *

- * @param peerMetadata - Any information that other peers will receive in onPeerJoined - * after accepting this peer + * @param endpointMetadata - Any information that other endpoints will receive in onEndpointAdded + * after accepting this endpoint */ - fun join(peerMetadata: Metadata) { - client.join(peerMetadata) + fun connect(endpointMetadata: Metadata) { + client.connect(endpointMetadata) } /** @@ -165,23 +165,23 @@ private constructor( } /** - * Updates the metadata for the current peer. - * @param peerMetadata Data about this peer that other peers will receive upon joining. + * Updates the metadata for the current endpoint. + * @param endpointMetadata Data about this endpoint that other endpoints will receive upon connecting. * * If the metadata is different from what is already tracked in the room, the optional - * callback `onPeerUpdated` will be triggered for other peers in the room. + * callback `onEndpointUpdated` will be triggered for other endpoints in the room. */ - fun updatePeerMetadata(peerMetadata: Metadata) { - client.updatePeerMetadata(peerMetadata) + fun updateEndpointMetadata(endpointMetadata: Metadata) { + client.updateEndpointMetadata(endpointMetadata) } /** * Updates the metadata for a specific track. * @param trackId local track id of audio or video track. - * @param trackMetadata Data about this track that other peers will receive upon joining. + * @param trackMetadata Data about this track that other endpoints will receive upon connecting. * * If the metadata is different from what is already tracked in the room, the optional - * callback `onTrackUpdated` will be triggered for other peers in the room. + * callback `onTrackUpdated` will be triggered for other endpoints in the room. */ fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { client.updateTrackMetadata(trackId, trackMetadata) diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt index af15659..ff148d6 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/MembraneRTCListener.kt @@ -1,6 +1,6 @@ package org.membraneframework.rtc -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.models.TrackContext import org.membraneframework.rtc.utils.SerializedMediaEvent import timber.log.Timber @@ -9,43 +9,35 @@ interface MembraneRTCListener { // Called each time MembraneWebRTC need to send some data to the server. fun onSendMediaEvent(event: SerializedMediaEvent) - // /Callback invoked when the client has been approved to participate in media exchange. - fun onJoinSuccess(peerID: String, peersInRoom: List) + // Callback invoked when the client has been approved to participate in media exchange. + fun onConnected(endpointID: String, otherEndpoints: List) - // /Callback invoked when client has been denied access to enter the room. - fun onJoinError(metadata: Any) + // Called when endpoint of this MembraneRTC instance was removed + fun onDisconnected() - // /Callback invoked a track is ready to be played. + // Called in case of errors related to multimedia session e.g. ICE connection. + fun onConnectError(metadata: Any) + + // Callback invoked a track is ready to be played. fun onTrackReady(ctx: TrackContext) - // /Callback invoked a peer already present in a room adds a new track. + // Callback invoked a endpoint already present in a room adds a new track. fun onTrackAdded(ctx: TrackContext) - // /Callback invoked when a track will no longer receive any data. + // Callback invoked when a track will no longer receive any data. fun onTrackRemoved(ctx: TrackContext) - // /Callback invoked when track's metadata gets updated + // Callback invoked when track's metadata gets updated fun onTrackUpdated(ctx: TrackContext) - // /Callback invoked when a new peer joins the room. - fun onPeerJoined(peer: Peer) - - // /Callback invoked when a peer leaves the room. - fun onPeerLeft(peer: Peer) + // Callback invoked when a new endpoint joins the room. + fun onEndpointAdded(endpoint: Endpoint) - // /Callback invoked when peer's metadata gets updated. - fun onPeerUpdated(peer: Peer) - - // Callback invoked when received track encoding has changed - @Deprecated("Deprecated, use TrackContext::setOnEncodingChangedListener") - fun onTrackEncodingChanged(peerId: String, trackId: String, encoding: String) { - Timber.i( - "Track encoding changed $trackId -> $encoding" - ) - } + // Called each time endpoint is removed, called only for other endpoints. + fun onEndpointRemoved(endpoint: Endpoint) - // Callback invoked every time a local peer is removed by the server - fun onRemoved(reason: String) { Timber.e("Peer removed") } + // Called each time endpoint has its metadata updated. + fun onEndpointUpdated(endpoint: Endpoint) // Called every time the server estimates client's bandwidth. // estimation - client's available incoming bitrate estimated diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt index cbabcf0..88b8ef9 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt @@ -379,7 +379,10 @@ internal class PeerConnectionManager } val params = sender.parameters val encoding = params?.encodings?.find { it.rid == trackEncoding.rid } ?: run { - Timber.e("setTrackEncoding: Invalid encoding $trackEncoding, no such encoding found in peer connection") + Timber.e( + "setTrackEncoding: Invalid encoding $trackEncoding," + + "no such encoding found in peer connection" + ) return } encoding.active = enabled diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt index e64ed08..10f18d7 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineCommunication.kt @@ -5,6 +5,7 @@ import dagger.assisted.Assisted import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject import org.membraneframework.rtc.events.* +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.utils.Metadata import org.membraneframework.rtc.utils.SerializedMediaEvent import timber.log.Timber @@ -23,12 +24,12 @@ constructor( ): RTCEngineCommunication } - fun join(peerMetadata: Metadata) { - sendEvent(Join(peerMetadata)) + fun connect(endpointMetadata: Metadata) { + sendEvent(Connect(endpointMetadata)) } - fun updatePeerMetadata(peerMetadata: Metadata) { - sendEvent(UpdatePeerMetadata(peerMetadata)) + fun updateEndpointMetadata(endpointMetadata: Metadata) { + sendEvent(UpdateEndpointMetadata(endpointMetadata)) } fun updateTrackMetadata(trackId: String, trackMetadata: Metadata) { @@ -71,6 +72,10 @@ constructor( ) } + fun disconnect() { + sendEvent(Disconnect()) + } + private fun sendEvent(event: SendableEvent) { val serializedMediaEvent = gson.toJson(event.serializeToMap()) engineListener.onSendMediaEvent(serializedMediaEvent) @@ -91,24 +96,28 @@ constructor( fun onEvent(serializedEvent: SerializedMediaEvent) { when (val event = decodeEvent(serializedEvent)) { + is Connected -> engineListener.onConnected(event.data.id, event.data.otherEndpoints) is OfferData -> engineListener.onOfferData(event.data.integratedTurnServers, event.data.tracksTypes) - is PeerAccepted -> engineListener.onPeerAccepted(event.data.id, event.data.peersInRoom) - is PeerRemoved -> engineListener.onRemoved(event.data.peerId, event.data.reason) - is PeerDenied -> engineListener.onPeerDenied() - is PeerJoined -> engineListener.onPeerJoined(event.data.peer) - is PeerLeft -> engineListener.onPeerLeft(event.data.peerId) - is PeerUpdated -> engineListener.onPeerUpdated(event.data.peerId, event.data.metadata) + is EndpointRemoved -> engineListener.onEndpointRemoved(event.data.id) + is EndpointAdded -> engineListener.onEndpointAdded( + Endpoint(event.data.id, event.data.type, event.data.metadata, mapOf()) + ) + is EndpointUpdated -> engineListener.onEndpointUpdated(event.data.id, event.data.metadata) is RemoteCandidate -> engineListener.onRemoteCandidate( event.data.candidate, event.data.sdpMLineIndex, event.data.sdpMid ) is SdpAnswer -> engineListener.onSdpAnswer(event.data.type, event.data.sdp, event.data.midToTrackId) - is TrackUpdated -> engineListener.onTrackUpdated(event.data.peerId, event.data.trackId, event.data.metadata) - is TracksAdded -> engineListener.onTracksAdded(event.data.peerId, event.data.trackIdToMetadata) - is TracksRemoved -> engineListener.onTracksRemoved(event.data.peerId, event.data.trackIds) + is TrackUpdated -> engineListener.onTrackUpdated( + event.data.endpointId, + event.data.trackId, + event.data.metadata + ) + is TracksAdded -> engineListener.onTracksAdded(event.data.endpointId, event.data.trackIdToMetadata) + is TracksRemoved -> engineListener.onTracksRemoved(event.data.endpointId, event.data.trackIds) is EncodingSwitched -> engineListener.onTrackEncodingChanged( - event.data.peerId, + event.data.endpointId, event.data.trackId, event.data.encoding, event.data.reason diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt index 5e05187..b51f9ab 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/RTCEngineListener.kt @@ -1,25 +1,23 @@ package org.membraneframework.rtc import org.membraneframework.rtc.events.OfferData -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.utils.Metadata import org.membraneframework.rtc.utils.SerializedMediaEvent internal interface RTCEngineListener { + fun onConnected(endpointID: String, otherEndpoints: List) fun onSendMediaEvent(event: SerializedMediaEvent) - fun onPeerAccepted(peerId: String, peersInRoom: List) - fun onPeerDenied() - fun onPeerJoined(peer: Peer) - fun onPeerLeft(peerId: String) - fun onPeerUpdated(peerId: String, peerMetadata: Metadata) + fun onEndpointAdded(endpoint: Endpoint) + fun onEndpointRemoved(endpointId: String) + fun onEndpointUpdated(endpointId: String, endpointMetadata: Metadata) fun onOfferData(integratedTurnServers: List, tracksTypes: Map) fun onSdpAnswer(type: String, sdp: String, midToTrackId: Map) fun onRemoteCandidate(candidate: String, sdpMLineIndex: Int, sdpMid: String?) - fun onTracksAdded(peerId: String, trackIdToMetadata: Map) - fun onTracksRemoved(peerId: String, trackIds: List) - fun onTrackUpdated(peerId: String, trackId: String, metadata: Metadata) - fun onTrackEncodingChanged(peerId: String, trackId: String, encoding: String, encodingReason: String) - fun onRemoved(peerId: String, reason: String) + fun onTracksAdded(endpointId: String, trackIdToMetadata: Map) + fun onTracksRemoved(endpointId: String, trackIds: List) + fun onTrackUpdated(endpointId: String, trackId: String, metadata: Metadata) + fun onTrackEncodingChanged(endpointId: String, trackId: String, encoding: String, encodingReason: String) fun onVadNotification(trackId: String, status: String) fun onBandwidthEstimation(estimation: Long) } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/events/Event.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/events/Event.kt index 5374972..7d1baa3 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/events/Event.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/events/Event.kt @@ -4,7 +4,7 @@ import com.google.gson.Gson import com.google.gson.JsonParseException import com.google.gson.annotations.SerializedName import com.google.gson.reflect.TypeToken -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.utils.Metadata import org.membraneframework.rtc.utils.Payload import timber.log.Timber @@ -29,10 +29,10 @@ internal inline fun I.convert(): O { sealed class SendableEvent -data class Join(val type: String, val data: Data) : SendableEvent() { +data class Connect(val type: String, val data: Data) : SendableEvent() { data class Data(val metadata: Metadata) - constructor(metadata: Metadata) : this("join", Data(metadata)) + constructor(metadata: Metadata) : this("connect", Data(metadata)) } data class SdpOffer(val type: String, val data: Payload) : SendableEvent() { @@ -92,10 +92,10 @@ data class SelectEncoding(val type: String, val data: Payload) : SendableEvent() ) } -data class UpdatePeerMetadata(val type: String, val data: Data) : SendableEvent() { +data class UpdateEndpointMetadata(val type: String, val data: Data) : SendableEvent() { data class Data(val metadata: Metadata) - constructor(metadata: Metadata) : this("updatePeerMetadata", Data(metadata)) + constructor(metadata: Metadata) : this("updateEndpointMetadata", Data(metadata)) } data class UpdateTrackMetadata(val type: String, val data: Data) : SendableEvent() { @@ -104,24 +104,22 @@ data class UpdateTrackMetadata(val type: String, val data: Data) : SendableEvent constructor(trackId: String, trackMetadata: Metadata) : this("updateTrackMetadata", Data(trackId, trackMetadata)) } -enum class ReceivableEventType { - @SerializedName("peerAccepted") - PeerAccepted, - - @SerializedName("peerDenied") - PeerDenied, +data class Disconnect(val type: String) : SendableEvent() { + constructor() : this("disconnect") +} - @SerializedName("peerJoined") - PeerJoined, +enum class ReceivableEventType { + @SerializedName("connected") + Connected, - @SerializedName("peerLeft") - PeerLeft, + @SerializedName("endpointAdded") + EndpointAdded, - @SerializedName("peerUpdated") - PeerUpdated, + @SerializedName("endpointUpdated") + EndpointUpdated, - @SerializedName("peerRemoved") - PeerRemoved, + @SerializedName("endpointRemoved") + EndpointRemoved, @SerializedName("custom") Custom, @@ -164,23 +162,17 @@ sealed class ReceivableEvent { val eventBase: BaseReceivableEvent = payload.toDataClass() return when (eventBase.type) { - ReceivableEventType.PeerAccepted -> - payload.toDataClass() - - ReceivableEventType.PeerDenied -> - payload.toDataClass() + ReceivableEventType.Connected -> + payload.toDataClass() - ReceivableEventType.PeerJoined -> - payload.toDataClass() + ReceivableEventType.EndpointAdded -> + payload.toDataClass() - ReceivableEventType.PeerLeft -> - payload.toDataClass() + ReceivableEventType.EndpointRemoved -> + payload.toDataClass() - ReceivableEventType.PeerUpdated -> - payload.toDataClass() - - ReceivableEventType.PeerRemoved -> - payload.toDataClass() + ReceivableEventType.EndpointUpdated -> + payload.toDataClass() ReceivableEventType.TracksAdded -> payload.toDataClass() @@ -229,26 +221,25 @@ sealed class ReceivableEvent { } } -data class PeerAccepted(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val id: String, val peersInRoom: List) +data class Connected(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { + data class Data(val id: String, val otherEndpoints: List) } -data class PeerDenied(val type: ReceivableEventType) : ReceivableEvent() - -data class PeerJoined(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peer: Peer) -} - -data class PeerLeft(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String) +data class EndpointAdded(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { + data class Data( + val id: String, + val type: String, + val metadata: Metadata, + val trackIdToMetadata: Map + ) } -data class PeerUpdated(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val metadata: Metadata) +data class EndpointUpdated(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { + data class Data(val id: String, val metadata: Metadata) } -data class PeerRemoved(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val reason: String) +data class EndpointRemoved(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { + data class Data(val id: String, val reason: String) } data class OfferData(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { @@ -264,15 +255,15 @@ data class OfferData(val type: ReceivableEventType, val data: Data) : Receivable } data class TracksAdded(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val trackIdToMetadata: Map) + data class Data(val endpointId: String, val trackIdToMetadata: Map) } data class TracksRemoved(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val trackIds: List) + data class Data(val endpointId: String, val trackIds: List) } data class TrackUpdated(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val trackId: String, val metadata: Metadata) + data class Data(val endpointId: String, val trackId: String, val metadata: Metadata) } data class SdpAnswer(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { @@ -284,7 +275,7 @@ data class RemoteCandidate(val type: ReceivableEventType, val data: Data) : Rece } data class EncodingSwitched(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { - data class Data(val peerId: String, val trackId: String, val encoding: String, val reason: String) + data class Data(val endpointId: String, val trackId: String, val encoding: String, val reason: String) } data class VadNotification(val type: ReceivableEventType, val data: Data) : ReceivableEvent() { diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/models/Peer.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/models/Endpoint.kt similarity index 64% rename from MembraneRTC/src/main/java/org/membraneframework/rtc/models/Peer.kt rename to MembraneRTC/src/main/java/org/membraneframework/rtc/models/Endpoint.kt index b88422e..5cd6455 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/models/Peer.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/models/Endpoint.kt @@ -2,18 +2,21 @@ package org.membraneframework.rtc.models import org.membraneframework.rtc.utils.Metadata -data class Peer(val id: String, val metadata: Metadata, val trackIdToMetadata: Map) { - fun withTrack(trackId: String, metadata: Metadata): Peer { +data class Endpoint( + val id: String, + val type: String, + val metadata: Metadata, + val trackIdToMetadata: Map +) { + fun withTrack(trackId: String, metadata: Metadata): Endpoint { val newTrackIdToMetadata = this.trackIdToMetadata.toMutableMap() newTrackIdToMetadata[trackId] = metadata - return this.copy(trackIdToMetadata = newTrackIdToMetadata) } - fun withoutTrack(trackId: String): Peer { + fun withoutTrack(trackId: String): Endpoint { val newTrackIdToMetadata = this.trackIdToMetadata.toMutableMap() newTrackIdToMetadata.remove(trackId) - return this.copy(trackIdToMetadata = newTrackIdToMetadata) } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/models/TrackContext.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/models/TrackContext.kt index 8ab0a99..0cad3bb 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/models/TrackContext.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/models/TrackContext.kt @@ -15,11 +15,11 @@ fun interface OnVoiceActivityChangedListener { /** * Track's context i.e. all data that can be useful when operating on track. * - * @property peer Peer this track comes from. - * @property trackId Track id. It is generated by RTC engine and takes form `peer_id:`. + * @property endpoint Endpoint this track comes from. + * @property trackId Track id. It is generated by RTC engine and takes form `endpoint_id:`. * @property metadata Any info that was passed in MembraneWebRTC.createVideoTrack/MembraneWebRTC.createAudioTrack */ -class TrackContext(track: RemoteTrack?, val peer: Peer, val trackId: String, metadata: Metadata) { +class TrackContext(track: RemoteTrack?, val endpoint: Endpoint, val trackId: String, metadata: Metadata) { private var onTrackEncodingChangeListener: (OnEncodingChangedListener)? = null private var onVadNotificationListener: (OnVoiceActivityChangedListener)? = null diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt index a73d202..75ae814 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/transport/PhoenixTransport.kt @@ -110,7 +110,7 @@ class PhoenixTransport constructor( } } - suspend fun disconnect() { + fun disconnect() { if (channel != null) { channel ?.leave() @@ -122,12 +122,10 @@ class PhoenixTransport constructor( } } - suspend fun send(event: SerializedMediaEvent) { - coroutineScope.async { - val payload = mapOf( - "data" to event - ) - channel?.push("mediaEvent", payload) - } + fun send(event: SerializedMediaEvent) { + val payload = mapOf( + "data" to event + ) + channel?.push("mediaEvent", payload) } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/PeerConnectionUtils.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/utils/EndpointConnectionUtils.kt similarity index 100% rename from MembraneRTC/src/main/java/org/membraneframework/rtc/utils/PeerConnectionUtils.kt rename to MembraneRTC/src/main/java/org/membraneframework/rtc/utils/EndpointConnectionUtils.kt diff --git a/MembraneRTC/src/test/java/org/membraneframework/rtc/test/PeerConnectionManagerTest.kt b/MembraneRTC/src/test/java/org/membraneframework/rtc/test/EndpointConnectionManagerTest.kt similarity index 85% rename from MembraneRTC/src/test/java/org/membraneframework/rtc/test/PeerConnectionManagerTest.kt rename to MembraneRTC/src/test/java/org/membraneframework/rtc/test/EndpointConnectionManagerTest.kt index b0d36d5..2e98549 100644 --- a/MembraneRTC/src/test/java/org/membraneframework/rtc/test/PeerConnectionManagerTest.kt +++ b/MembraneRTC/src/test/java/org/membraneframework/rtc/test/EndpointConnectionManagerTest.kt @@ -18,30 +18,30 @@ import org.membraneframework.rtc.utils.setLocalDescription import org.webrtc.* import org.webrtc.RtpParameters.Encoding -class PeerConnectionManagerTest { +class EndpointConnectionManagerTest { private lateinit var manager: PeerConnectionManager - private lateinit var peerConnectionMock: PeerConnection + private lateinit var endpointConnectionMock: PeerConnection @Before fun createMocks() { - val peerConnectionListenerMock = mockk(relaxed = true) - val peerConnectionFactoryMock = mockk(relaxed = true) + val endpointConnectionListenerMock = mockk(relaxed = true) + val endpointConnectionFactoryMock = mockk(relaxed = true) mockkStatic("org.membraneframework.rtc.utils.SuspendableSdpObserverKt") - mockkStatic("org.membraneframework.rtc.utils.PeerConnectionUtilsKt") + mockkStatic("org.membraneframework.rtc.utils.EndpointConnectionUtilsKt") - peerConnectionMock = mockk(relaxed = true) + endpointConnectionMock = mockk(relaxed = true) coEvery { - peerConnectionMock.createOffer(any()) + endpointConnectionMock.createOffer(any()) } returns Result.success(SessionDescription(SessionDescription.Type.OFFER, "test_description")) coEvery { - peerConnectionMock.setLocalDescription(any()) + endpointConnectionMock.setLocalDescription(any()) } returns Result.success(Unit) - every { peerConnectionFactoryMock.createPeerConnection(any(), any()) } returns peerConnectionMock + every { endpointConnectionFactoryMock.createPeerConnection(any(), any()) } returns endpointConnectionMock - manager = PeerConnectionManager(peerConnectionListenerMock, peerConnectionFactoryMock) + manager = PeerConnectionManager(endpointConnectionListenerMock, endpointConnectionFactoryMock) } @OptIn(ExperimentalCoroutinesApi::class) @@ -60,7 +60,7 @@ class PeerConnectionManagerTest { manager.getSdpOffer(emptyList(), emptyMap(), listOf(audioTrack)) verify(exactly = 1) { - peerConnectionMock.addTransceiver( + endpointConnectionMock.addTransceiver( audioTrack.mediaTrack, eq(RtpTransceiver.RtpTransceiverDirection.SEND_ONLY), match { it.size == 1 }, @@ -89,7 +89,7 @@ class PeerConnectionManagerTest { manager.getSdpOffer(emptyList(), emptyMap(), listOf(videoTrack)) verify(exactly = 1) { - peerConnectionMock.addTransceiver( + endpointConnectionMock.addTransceiver( videoTrack.rtcTrack(), eq(RtpTransceiver.RtpTransceiverDirection.SEND_ONLY), match { it.size == 1 }, @@ -125,7 +125,7 @@ class PeerConnectionManagerTest { manager.getSdpOffer(emptyList(), emptyMap(), listOf(videoTrack)) verify(exactly = 1) { - peerConnectionMock.addTransceiver( + endpointConnectionMock.addTransceiver( videoTrack.rtcTrack(), eq(RtpTransceiver.RtpTransceiverDirection.SEND_ONLY), any(), @@ -155,7 +155,7 @@ class PeerConnectionManagerTest { val m = Encoding("m", true, 2.0) val l = Encoding("l", true, 4.0) - every { peerConnectionMock.senders } returns listOf( + every { endpointConnectionMock.senders } returns listOf( mockk(relaxed = true) { every { parameters } returns mockk(relaxed = true) { every { track()?.id() } returns "dummy_track" diff --git a/app/build.gradle b/app/build.gradle index 12911f4..6a2b351 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -20,7 +20,7 @@ android { vectorDrawables { useSupportLibrary true } - buildConfigField "String" , "VIDEOROOM_URL" , "\"https://videoroom.membrane.work/socket\"" + buildConfigField "String" , "VIDEOROOM_URL" , "\"https://videoroom.membrane.work/socket\"" } buildTypes { diff --git a/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt b/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt index 73cf0ea..62a6608 100644 --- a/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt +++ b/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt @@ -10,7 +10,7 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.launch import org.membraneframework.rtc.* import org.membraneframework.rtc.media.* -import org.membraneframework.rtc.models.Peer +import org.membraneframework.rtc.models.Endpoint import org.membraneframework.rtc.models.TrackContext import org.membraneframework.rtc.transport.PhoenixTransport import org.membraneframework.rtc.transport.PhoenixTransportError @@ -27,7 +27,7 @@ class RoomViewModel( var localAudioTrack: LocalAudioTrack? = null var localVideoTrack: LocalVideoTrack? = null var localScreencastTrack: LocalScreencastTrack? = null - private val localPeerId: String = UUID.randomUUID().toString() + private val localEndpointId: String = UUID.randomUUID().toString() var localDisplayName: String? = null @@ -106,11 +106,9 @@ class RoomViewModel( } fun disconnect() { - viewModelScope.launch { - room.value?.disconnect() - room.value = null - transport.disconnect() - } + room.value?.disconnect() + room.value = null + transport.disconnect() } fun focusVideo(participantId: String) { @@ -168,9 +166,9 @@ class RoomViewModel( room.value?.updateTrackMetadata(it.id(), mapOf("active" to enabled, "type" to "audio")) } - val p = mutableParticipants[localPeerId] + val p = mutableParticipants[localEndpointId] if (p != null) { - mutableParticipants[localPeerId] = p.updateTrackMetadata( + mutableParticipants[localEndpointId] = p.updateTrackMetadata( p.audioTrack?.id(), mapOf("active" to isMicrophoneOn.value) ) @@ -187,9 +185,9 @@ class RoomViewModel( room.value?.updateTrackMetadata(it.id(), mapOf("active" to enabled, "type" to "camera")) } - val p = mutableParticipants[localPeerId] + val p = mutableParticipants[localEndpointId] if (p != null) { - mutableParticipants[localPeerId] = p.updateTrackMetadata( + mutableParticipants[localEndpointId] = p.updateTrackMetadata( p.videoTrack?.id(), mapOf("active" to isCameraOn.value) ) @@ -240,14 +238,14 @@ class RoomViewModel( ) ) - it.join(mapOf("displayName" to (localDisplayName ?: ""))) + it.connect(mapOf("displayName" to (localDisplayName ?: ""))) isCameraOn.value = localVideoTrack?.enabled() ?: false isMicrophoneOn.value = localAudioTrack?.enabled() ?: false - val participant = Participant(localPeerId, "Me", localVideoTrack, localAudioTrack) + val participant = Participant(localEndpointId, "Me", localVideoTrack, localAudioTrack) - mutableParticipants[localPeerId] = participant.updateTrackMetadata( + mutableParticipants[localEndpointId] = participant.updateTrackMetadata( participant.audioTrack?.id(), mapOf("active" to isMicrophoneOn.value) ).updateTrackMetadata( @@ -260,10 +258,10 @@ class RoomViewModel( } // MembraneRTCListener callbacks - override fun onJoinSuccess(peerID: String, peersInRoom: List) { + override fun onConnected(endpointID: String, otherEndpoints: List) { Timber.i("Successfully join the room") - peersInRoom.forEach { + otherEndpoints.forEach { mutableParticipants[it.id] = Participant( it.id, it.metadata["displayName"] as? String ?: "UNKNOWN", @@ -275,12 +273,17 @@ class RoomViewModel( emitParticipants() } - override fun onJoinError(metadata: Any) { - Timber.e("User has been denied to join the room") + override fun onDisconnected() { + room.value = null + transport.disconnect() + } + + override fun onConnectError(metadata: Any) { + Timber.e("User has been denied to connect to the room") } override fun onTrackReady(ctx: TrackContext) { - val participant = mutableParticipants[ctx.peer.id] ?: return + val participant = mutableParticipants[ctx.endpoint.id] ?: return val (id, newParticipant) = when (ctx.track) { is RemoteVideoTrack -> { @@ -298,7 +301,7 @@ class RoomViewModel( } else { val p = participant.copy(videoTrack = ctx.track as RemoteVideoTrack) Pair( - ctx.peer.id, + ctx.endpoint.id, p.copy( tracksMetadata = p.tracksMetadata + ( ( @@ -314,7 +317,7 @@ class RoomViewModel( globalToLocalTrackId[ctx.trackId] = (ctx.track as RemoteAudioTrack).id() val p = participant.copy(audioTrack = ctx.track as RemoteAudioTrack) Pair( - ctx.peer.id, + ctx.endpoint.id, p.copy( tracksMetadata = p.tracksMetadata + ( ( @@ -334,9 +337,9 @@ class RoomViewModel( emitParticipants() ctx.setOnVoiceActivityChangedListener { - val p = mutableParticipants[it.peer.id] + val p = mutableParticipants[it.endpoint.id] if (p != null) { - mutableParticipants[it.peer.id] = p.copy(vadStatus = it.vadStatus) + mutableParticipants[it.endpoint.id] = p.copy(vadStatus = it.vadStatus) emitParticipants() } } @@ -356,8 +359,8 @@ class RoomViewModel( emitParticipants() } else { - val participant = mutableParticipants[ctx.peer.id] - ?: throw IllegalArgumentException("No participant with id ${ctx.peer.id}") + val participant = mutableParticipants[ctx.endpoint.id] + ?: throw IllegalArgumentException("No participant with id ${ctx.endpoint.id}") val localTrackId = globalToLocalTrackId[ctx.trackId] val audioTrackId = participant.audioTrack?.id() @@ -372,13 +375,13 @@ class RoomViewModel( else -> throw IllegalArgumentException( - "Track ${ctx.trackId} has not been found for given peer ${ctx.peer.id}" + "Track ${ctx.trackId} has not been found for given endpoint ${ctx.endpoint.id}" ) } globalToLocalTrackId.remove(ctx.trackId) - mutableParticipants[ctx.peer.id] = newParticipant + mutableParticipants[ctx.endpoint.id] = newParticipant emitParticipants() } @@ -387,16 +390,16 @@ class RoomViewModel( } override fun onTrackUpdated(ctx: TrackContext) { - val p = mutableParticipants[ctx.peer.id] + val p = mutableParticipants[ctx.endpoint.id] if (p != null) { // Updates metadata of given track if (ctx.metadata["type"] == "camera") { - mutableParticipants[ctx.peer.id] = p.updateTrackMetadata( + mutableParticipants[ctx.endpoint.id] = p.updateTrackMetadata( p.videoTrack?.id(), ctx.metadata ) } else { - mutableParticipants[ctx.peer.id] = p.updateTrackMetadata( + mutableParticipants[ctx.endpoint.id] = p.updateTrackMetadata( p.audioTrack?.id(), ctx.metadata ) @@ -407,25 +410,25 @@ class RoomViewModel( Timber.i("Track has been updated $ctx") } - override fun onPeerJoined(peer: Peer) { - mutableParticipants[peer.id] = Participant( - id = peer.id, - displayName = peer.metadata["displayName"] as? String ?: "UNKNOWN" + override fun onEndpointAdded(endpoint: Endpoint) { + mutableParticipants[endpoint.id] = Participant( + id = endpoint.id, + displayName = endpoint.metadata["displayName"] as? String ?: "UNKNOWN" ) emitParticipants() - Timber.i("Peer has joined the room $peer") + Timber.i("Endpoint $endpoint has been added") } - override fun onPeerLeft(peer: Peer) { - mutableParticipants.remove(peer.id) + override fun onEndpointRemoved(endpoint: Endpoint) { + mutableParticipants.remove(endpoint.id) emitParticipants() - Timber.i("Peer has left the room $peer") + Timber.i("Endpoint $endpoint has been removed") } - override fun onPeerUpdated(peer: Peer) { - Timber.i("Peer has updated $peer") + override fun onEndpointUpdated(endpoint: Endpoint) { + Timber.i("Endpoint $endpoint has been updated") } fun startScreencast(mediaProjectionPermission: Intent) { From a049a8b19d9cbd387d20c233d0eab1206915fe8b Mon Sep 17 00:00:00 2001 From: Angelika Serwa Date: Wed, 14 Jun 2023 16:40:49 +0200 Subject: [PATCH 3/3] Allow to create tracks immediately after onConnect callback (#46) --- .../rtc/InternalMembraneRTC.kt | 43 +++++++++++-------- .../rtc/PeerConnectionManager.kt | 10 +++-- .../viewmodels/RoomViewModel.kt | 9 ++-- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt index 74fa830..7c2f8e5 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/InternalMembraneRTC.kt @@ -42,7 +42,7 @@ constructor( ) : RTCEngineListener, PeerConnectionListener { private val rtcEngineCommunication = rtcEngineCommunicationFactory.create(this) private val peerConnectionFactoryWrapper = peerConnectionFactoryWrapperFactory.create(createOptions) - private val endpointConnectionManager = peerConnectionManagerFactory.create( + private val peerConnectionManager = peerConnectionManagerFactory.create( this, peerConnectionFactoryWrapper ) @@ -83,7 +83,7 @@ constructor( localTracksMutex.withLock { localTracks.forEach { it.stop() } } - endpointConnectionManager.close() + peerConnectionManager.close() } } @@ -115,6 +115,12 @@ constructor( localTracks.add(videoTrack) localEndpoint = localEndpoint.withTrack(videoTrack.id(), metadata) + + coroutineScope.launch { + peerConnectionManager.addTrack(videoTrack) + rtcEngineCommunication.renegotiateTracks() + } + return videoTrack } @@ -129,18 +135,23 @@ constructor( localTracks.add(audioTrack) localEndpoint = localEndpoint.withTrack(audioTrack.id(), metadata) + coroutineScope.launch { + peerConnectionManager.addTrack(audioTrack) + rtcEngineCommunication.renegotiateTracks() + } + return audioTrack } fun setTrackBandwidth(trackId: String, bandwidthLimit: TrackBandwidthLimit.BandwidthLimit) { coroutineScope.launch { - endpointConnectionManager.setTrackBandwidth(trackId, bandwidthLimit) + peerConnectionManager.setTrackBandwidth(trackId, bandwidthLimit) } } fun setEncodingBandwidth(trackId: String, encoding: String, bandwidthLimit: TrackBandwidthLimit.BandwidthLimit) { coroutineScope.launch { - endpointConnectionManager.setEncodingBandwidth(trackId, encoding, bandwidthLimit) + peerConnectionManager.setEncodingBandwidth(trackId, encoding, bandwidthLimit) } } @@ -170,10 +181,8 @@ constructor( screencastTrack.start() } - val streamIds = listOf(UUID.randomUUID().toString()) - coroutineScope.launch { - endpointConnectionManager.addTrack(screencastTrack, streamIds) + peerConnectionManager.addTrack(screencastTrack) rtcEngineCommunication.renegotiateTracks() } @@ -188,7 +197,7 @@ constructor( return@runBlocking false } - endpointConnectionManager.removeTrack(track.id()) + peerConnectionManager.removeTrack(track.id()) localTracks.remove(track) localEndpoint = localEndpoint.withoutTrack(trackId) @@ -228,10 +237,6 @@ constructor( this.listener.onTrackAdded(context) } } - - coroutineScope.launch { - rtcEngineCommunication.renegotiateTracks() - } } override fun onSendMediaEvent(event: SerializedMediaEvent) { @@ -283,7 +288,7 @@ constructor( try { val offer = localTracksMutex.withLock { - endpointConnectionManager.getSdpOffer(integratedTurnServers, tracksTypes, localTracks) + peerConnectionManager.getSdpOffer(integratedTurnServers, tracksTypes, localTracks) } rtcEngineCommunication.sdpOffer( offer.description, @@ -298,7 +303,7 @@ constructor( override fun onSdpAnswer(type: String, sdp: String, midToTrackId: Map) { coroutineScope.launch { - endpointConnectionManager.onSdpAnswer(sdp, midToTrackId) + peerConnectionManager.onSdpAnswer(sdp, midToTrackId) localTracksMutex.withLock { // temporary workaround, the backend doesn't add ~ in sdp answer @@ -312,7 +317,7 @@ constructor( } listOf(TrackEncoding.L, TrackEncoding.M, TrackEncoding.H).forEach { if (config?.activeEncodings?.contains(it) == false) { - endpointConnectionManager.setTrackEncoding(localTrack.id(), it, false) + peerConnectionManager.setTrackEncoding(localTrack.id(), it, false) } } } @@ -328,7 +333,7 @@ constructor( candidate ) - endpointConnectionManager.onRemoteCandidate(iceCandidate) + peerConnectionManager.onRemoteCandidate(iceCandidate) } } @@ -439,13 +444,13 @@ constructor( fun enableTrackEncoding(trackId: String, encoding: TrackEncoding) { coroutineScope.launch { - endpointConnectionManager.setTrackEncoding(trackId, encoding, true) + peerConnectionManager.setTrackEncoding(trackId, encoding, true) } } fun disableTrackEncoding(trackId: String, encoding: TrackEncoding) { coroutineScope.launch { - endpointConnectionManager.setTrackEncoding(trackId, encoding, false) + peerConnectionManager.setTrackEncoding(trackId, encoding, false) } } @@ -476,6 +481,6 @@ constructor( } fun getStats(): Map { - return endpointConnectionManager.getStats() + return peerConnectionManager.getStats() } } diff --git a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt index 88b8ef9..a6970d2 100644 --- a/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt +++ b/MembraneRTC/src/main/java/org/membraneframework/rtc/PeerConnectionManager.kt @@ -51,6 +51,8 @@ internal class PeerConnectionManager private val coroutineScope: CoroutineScope = ClosableCoroutineScope(SupervisorJob()) + private var streamIds: List = listOf(UUID.randomUUID().toString()) + private fun getSendEncodingsFromConfig(simulcastConfig: SimulcastConfig): List { val sendEncodings = Constants.simulcastEncodings() simulcastConfig.activeEncodings.forEach { @@ -59,7 +61,11 @@ internal class PeerConnectionManager return sendEncodings } - suspend fun addTrack(track: LocalTrack, streamIds: List) { + suspend fun addTrack(track: LocalTrack) { + addTrack(track, streamIds) + } + + private suspend fun addTrack(track: LocalTrack, streamIds: List) { val videoParameters = (track as? LocalVideoTrack)?.videoParameters ?: (track as? LocalScreencastTrack)?.videoParameters @@ -208,8 +214,6 @@ internal class PeerConnectionManager this@PeerConnectionManager.peerConnection = pc } - val streamIds = listOf(UUID.randomUUID().toString()) - localTracks.forEach { addTrack(it, streamIds) } diff --git a/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt b/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt index 62a6608..1af6593 100644 --- a/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt +++ b/app/src/main/java/com/dscout/membranevideoroomdemo/viewmodels/RoomViewModel.kt @@ -101,7 +101,7 @@ class RoomViewModel( listener = this@RoomViewModel ) - setupTracksAndJoinRoom() + room.value?.connect(mapOf("displayName" to (localDisplayName ?: ""))) } } @@ -206,7 +206,7 @@ class RoomViewModel( } } - private fun setupTracksAndJoinRoom() { + private fun setupTracks() { room.value?.let { localAudioTrack = it.createAudioTrack( mapOf( @@ -238,8 +238,6 @@ class RoomViewModel( ) ) - it.connect(mapOf("displayName" to (localDisplayName ?: ""))) - isCameraOn.value = localVideoTrack?.enabled() ?: false isMicrophoneOn.value = localAudioTrack?.enabled() ?: false @@ -252,8 +250,6 @@ class RoomViewModel( participant.videoTrack?.id(), mapOf("active" to isCameraOn.value) ) - - emitParticipants() } } @@ -270,6 +266,7 @@ class RoomViewModel( ) } + setupTracks() emitParticipants() }