diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 0ff9cf5..8d539f7 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -59,6 +59,8 @@ dependencies { implementation(libs.androidx.camera.view) implementation(libs.androidx.camera.mlkit.vision) implementation(libs.androidx.camera.extensions) + implementation(libs.stream.webrtc.android) + implementation(libs.stream.log) testImplementation(libs.junit) androidTestImplementation(libs.androidx.junit) androidTestImplementation(libs.androidx.espresso.core) diff --git a/app/src/main/java/com/example/tvcontroller/services/WebRtcService.kt b/app/src/main/java/com/example/tvcontroller/services/WebRtcService.kt new file mode 100644 index 0000000..ea35ee4 --- /dev/null +++ b/app/src/main/java/com/example/tvcontroller/services/WebRtcService.kt @@ -0,0 +1,16 @@ +package com.example.tvcontroller.services + +import android.util.Log +import androidx.camera.core.ImageAnalysis +import androidx.camera.core.ImageProxy + +class WebRtcService : ImageAnalysis.Analyzer { + val sessionManager = LocalWebRtcSessionManager.current + val localVideoTrackState by sessionManager.localVideoTrackFlow.collectAsState(null) + val localVideoTrack = localVideoTrackState + + override fun analyze(image: ImageProxy) { + Log.i("WebRtcService", "Image received") + image.close() + } +} \ No newline at end of file diff --git a/app/src/main/java/com/example/tvcontroller/services/webrtc/SignalingClient.kt b/app/src/main/java/com/example/tvcontroller/services/webrtc/SignalingClient.kt new file mode 100644 index 0000000..3cf05a8 --- /dev/null +++ b/app/src/main/java/com/example/tvcontroller/services/webrtc/SignalingClient.kt @@ -0,0 +1,129 @@ +package com.example.tvcontroller.services.webrtc +/* + * Copyright 2023 Stream.IO, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import io.getstream.log.taggedLogger +import io.ktor.client.* +import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.plugins.websocket.webSocketSession +import io.ktor.client.request.* +import io.ktor.http.* +import io.ktor.util.logging.Logger +import io.ktor.websocket.CloseReason +import io.ktor.websocket.DefaultWebSocketSession +import io.ktor.websocket.close +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch +import org.webrtc.Logging + +const val SIGNALING_SERVER_IP_ADDRESS = "wss://signaling.stream.io" + +class SignalingClient { + private val logger by taggedLogger("Call:SignalingClient") + private val signalingScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + private val client = HttpClient { + install(WebSockets) + } + + // opening web socket with signaling server + private lateinit var ws: DefaultWebSocketSession + + init { + signalingScope.launch { + try { + ws = client.webSocketSession { + url(SIGNALING_SERVER_IP_ADDRESS) + } + } catch (e: Exception) { + logger.e { "Error connecting to signaling server: ${e.message}" } + _sessionStateFlow.value = WebRTCSessionState.Offline + } + } + } + + // session flow to send information about the session state to the subscribers + private val _sessionStateFlow = MutableStateFlow(WebRTCSessionState.Offline) + val sessionStateFlow: StateFlow = _sessionStateFlow + + // signaling commands to send commands to value pairs to the subscribers + private val _signalingCommandFlow = MutableSharedFlow>() + val signalingCommandFlow: SharedFlow> = _signalingCommandFlow + + fun sendCommand(signalingCommand: SignalingCommand, message: String) { + logger.d { "[sendCommand] $signalingCommand $message" } + signalingScope.launch { + ws.send("$signalingCommand $message") + } + } + + private inner class SignalingWebSocketListener : WebSocketListener() { + override fun onMessage(webSocket: WebSocket, text: String) { + when { + text.startsWith(SignalingCommand.STATE.toString(), true) -> + handleStateMessage(text) + text.startsWith(SignalingCommand.OFFER.toString(), true) -> + handleSignalingCommand(SignalingCommand.OFFER, text) + text.startsWith(SignalingCommand.ANSWER.toString(), true) -> + handleSignalingCommand(SignalingCommand.ANSWER, text) + text.startsWith(SignalingCommand.ICE.toString(), true) -> + handleSignalingCommand(SignalingCommand.ICE, text) + } + } + } + + private fun handleStateMessage(message: String) { + val state = getSeparatedMessage(message) + _sessionStateFlow.value = WebRTCSessionState.valueOf(state) + } + + private fun handleSignalingCommand(command: SignalingCommand, text: String) { + val value = getSeparatedMessage(text) + logger.d { "received signaling: $command $value" } + signalingScope.launch { + _signalingCommandFlow.emit(command to value) + } + } + + private fun getSeparatedMessage(text: String) = text.substringAfter(' ') + + fun dispose() { + _sessionStateFlow.value = WebRTCSessionState.Offline + signalingScope.cancel() + ws.close(CloseReason(CloseReason.Codes.NORMAL, "Client is shutting down")) + } +} + +enum class WebRTCSessionState { + Active, // Offer and Answer messages has been sent + Creating, // Creating session, offer has been sent + Ready, // Both clients available and ready to initiate session + Impossible, // We have less than two clients connected to the server + Offline // unable to connect signaling server +} + +enum class SignalingCommand { + STATE, // Command for WebRTCSessionState + OFFER, // to send or receive offer + ANSWER, // to send or receive answer + ICE // to send and receive ice candidates +} diff --git a/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerConnection.kt b/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerConnection.kt new file mode 100644 index 0000000..79dd221 --- /dev/null +++ b/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerConnection.kt @@ -0,0 +1,338 @@ +package com.example.tvcontroller.services.webrtc.peer +/* + * Copyright 2023 Stream.IO, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import io.getstream.log.taggedLogger +import com.example.tvcontroller.services.webrtc.utils.addRtcIceCandidate +import com.example.tvcontroller.services.webrtc.utils.createValue +import com.example.tvcontroller.services.webrtc.utils.setValue +import com.example.tvcontroller.services.webrtc.utils.stringify +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.webrtc.CandidatePairChangeEvent +import org.webrtc.DataChannel +import org.webrtc.IceCandidate +import org.webrtc.IceCandidateErrorEvent +import org.webrtc.MediaConstraints +import org.webrtc.MediaStream +import org.webrtc.MediaStreamTrack +import org.webrtc.PeerConnection +import org.webrtc.RTCStatsReport +import org.webrtc.RtpReceiver +import org.webrtc.RtpTransceiver +import org.webrtc.SessionDescription + +/** + * Wrapper around the WebRTC connection that contains tracks. + * + * @param coroutineScope The scope used to listen to stats events. + * @param type The internal type of the PeerConnection. Check [StreamPeerType]. + * @param mediaConstraints Constraints used for the connections. + * @param onStreamAdded Handler when a new [MediaStream] gets added. + * @param onNegotiationNeeded Handler when there's a new negotiation. + * @param onIceCandidate Handler whenever we receive [IceCandidate]s. + */ +class StreamPeerConnection( + private val coroutineScope: CoroutineScope, + private val type: StreamPeerType, + private val mediaConstraints: MediaConstraints, + private val onStreamAdded: ((MediaStream) -> Unit)?, + private val onNegotiationNeeded: ((StreamPeerConnection, StreamPeerType) -> Unit)?, + private val onIceCandidate: ((IceCandidate, StreamPeerType) -> Unit)?, + private val onVideoTrack: ((RtpTransceiver?) -> Unit)? +) : PeerConnection.Observer { + + private val typeTag = type.stringify() + + private val logger by taggedLogger("Call:PeerConnection") + + /** + * The wrapped connection for all the WebRTC communication. + */ + lateinit var connection: PeerConnection + private set + + /** + * Used to manage the stats observation lifecycle. + */ + private var statsJob: Job? = null + + /** + * Used to pool together and store [IceCandidate]s before consuming them. + */ + private val pendingIceMutex = Mutex() + private val pendingIceCandidates = mutableListOf() + + /** + * Contains stats events for observation. + */ + private val statsFlow: MutableStateFlow = MutableStateFlow(null) + + init { + logger.i { " #sfu; #$typeTag; mediaConstraints: $mediaConstraints" } + } + + /** + * Initialize a [StreamPeerConnection] using a WebRTC [PeerConnection]. + * + * @param peerConnection The connection that holds audio and video tracks. + */ + fun initialize(peerConnection: PeerConnection) { + logger.d { "[initialize] #sfu; #$typeTag; peerConnection: $peerConnection" } + this.connection = peerConnection + } + + /** + * Used to create an offer whenever there's a negotiation that we need to process on the + * publisher side. + * + * @return [Result] wrapper of the [SessionDescription] for the publisher. + */ + suspend fun createOffer(): Result { + logger.d { "[createOffer] #sfu; #$typeTag; no args" } + return createValue { connection.createOffer(it, mediaConstraints) } + } + + /** + * Used to create an answer whenever there's a subscriber offer. + * + * @return [Result] wrapper of the [SessionDescription] for the subscriber. + */ + suspend fun createAnswer(): Result { + logger.d { "[createAnswer] #sfu; #$typeTag; no args" } + return createValue { connection.createAnswer(it, mediaConstraints) } + } + + /** + * Used to set up the SDP on underlying connections and to add [pendingIceCandidates] to the + * connection for listening. + * + * @param sessionDescription That contains the remote SDP. + * @return An empty [Result], if the operation has been successful or not. + */ + suspend fun setRemoteDescription(sessionDescription: SessionDescription): Result { + logger.d { "[setRemoteDescription] #sfu; #$typeTag; answerSdp: ${sessionDescription.stringify()}" } + return setValue { + connection.setRemoteDescription( + it, + SessionDescription( + sessionDescription.type, + sessionDescription.description.mungeCodecs() + ) + ) + }.also { + pendingIceMutex.withLock { + pendingIceCandidates.forEach { iceCandidate -> + logger.i { "[setRemoteDescription] #sfu; #subscriber; pendingRtcIceCandidate: $iceCandidate" } + connection.addRtcIceCandidate(iceCandidate) + } + pendingIceCandidates.clear() + } + } + } + + /** + * Sets the local description for a connection either for the subscriber or publisher based on + * the flow. + * + * @param sessionDescription That contains the subscriber or publisher SDP. + * @return An empty [Result], if the operation has been successful or not. + */ + suspend fun setLocalDescription(sessionDescription: SessionDescription): Result { + val sdp = SessionDescription( + sessionDescription.type, + sessionDescription.description.mungeCodecs() + ) + logger.d { "[setLocalDescription] #sfu; #$typeTag; offerSdp: ${sessionDescription.stringify()}" } + return setValue { connection.setLocalDescription(it, sdp) } + } + + /** + * Adds an [IceCandidate] to the underlying [connection] if it's already been set up, or stores + * it for later consumption. + * + * @param iceCandidate To process and add to the connection. + * @return An empty [Result], if the operation has been successful or not. + */ + suspend fun addIceCandidate(iceCandidate: IceCandidate): Result { + if (connection.remoteDescription == null) { + logger.w { "[addIceCandidate] #sfu; #$typeTag; postponed (no remoteDescription): $iceCandidate" } + pendingIceMutex.withLock { + pendingIceCandidates.add(iceCandidate) + } + return Result.failure(RuntimeException("RemoteDescription is not set")) + } + logger.d { "[addIceCandidate] #sfu; #$typeTag; rtcIceCandidate: $iceCandidate" } + return connection.addRtcIceCandidate(iceCandidate).also { + logger.v { "[addIceCandidate] #sfu; #$typeTag; completed: $it" } + } + } + + /** + * Peer connection listeners. + */ + + /** + * Triggered whenever there's a new [RtcIceCandidate] for the call. Used to update our tracks + * and subscriptions. + * + * @param candidate The new candidate. + */ + override fun onIceCandidate(candidate: IceCandidate?) { + logger.i { "[onIceCandidate] #sfu; #$typeTag; candidate: $candidate" } + if (candidate == null) return + + onIceCandidate?.invoke(candidate, type) + } + + /** + * Triggered whenever there's a new [MediaStream] that was added to the connection. + * + * @param stream The stream that contains audio or video. + */ + override fun onAddStream(stream: MediaStream?) { + logger.i { "[onAddStream] #sfu; #$typeTag; stream: $stream" } + if (stream != null) { + onStreamAdded?.invoke(stream) + } + } + + /** + * Triggered whenever there's a new [MediaStream] or [MediaStreamTrack] that's been added + * to the call. It contains all audio and video tracks for a given session. + * + * @param receiver The receiver of tracks. + * @param mediaStreams The streams that were added containing their appropriate tracks. + */ + override fun onAddTrack(receiver: RtpReceiver?, mediaStreams: Array?) { + logger.i { "[onAddTrack] #sfu; #$typeTag; receiver: $receiver, mediaStreams: $mediaStreams" } + mediaStreams?.forEach { mediaStream -> + logger.v { "[onAddTrack] #sfu; #$typeTag; mediaStream: $mediaStream" } + mediaStream.audioTracks?.forEach { remoteAudioTrack -> + logger.v { "[onAddTrack] #sfu; #$typeTag; remoteAudioTrack: ${remoteAudioTrack.stringify()}" } + remoteAudioTrack.setEnabled(true) + } + onStreamAdded?.invoke(mediaStream) + } + } + + /** + * Triggered whenever there's a new negotiation needed for the active [PeerConnection]. + */ + override fun onRenegotiationNeeded() { + logger.i { "[onRenegotiationNeeded] #sfu; #$typeTag; no args" } + onNegotiationNeeded?.invoke(this, type) + } + + /** + * Triggered whenever a [MediaStream] was removed. + * + * @param stream The stream that was removed from the connection. + */ + override fun onRemoveStream(stream: MediaStream?) {} + + /** + * Triggered when the connection state changes. Used to start and stop the stats observing. + * + * @param newState The new state of the [PeerConnection]. + */ + override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) { + logger.i { "[onIceConnectionChange] #sfu; #$typeTag; newState: $newState" } + when (newState) { + PeerConnection.IceConnectionState.CLOSED, + PeerConnection.IceConnectionState.FAILED, + PeerConnection.IceConnectionState.DISCONNECTED -> statsJob?.cancel() + PeerConnection.IceConnectionState.CONNECTED -> statsJob = observeStats() + else -> Unit + } + } + + /** + * @return The [RTCStatsReport] for the active connection. + */ + fun getStats(): StateFlow { + return statsFlow + } + + /** + * Observes the local connection stats and emits it to [statsFlow] that users can consume. + */ + private fun observeStats() = coroutineScope.launch { + while (isActive) { + delay(10_000L) + connection.getStats { + logger.v { "[observeStats] #sfu; #$typeTag; stats: $it" } + statsFlow.value = it + } + } + } + + override fun onTrack(transceiver: RtpTransceiver?) { + logger.i { "[onTrack] #sfu; #$typeTag; transceiver: $transceiver" } + onVideoTrack?.invoke(transceiver) + } + + /** + * Domain - [PeerConnection] and [PeerConnection.Observer] related callbacks. + */ + override fun onRemoveTrack(receiver: RtpReceiver?) { + logger.i { "[onRemoveTrack] #sfu; #$typeTag; receiver: $receiver" } + } + + override fun onSignalingChange(newState: PeerConnection.SignalingState?) { + logger.d { "[onSignalingChange] #sfu; #$typeTag; newState: $newState" } + } + + override fun onIceConnectionReceivingChange(receiving: Boolean) { + logger.i { "[onIceConnectionReceivingChange] #sfu; #$typeTag; receiving: $receiving" } + } + + override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) { + logger.i { "[onIceGatheringChange] #sfu; #$typeTag; newState: $newState" } + } + + override fun onIceCandidatesRemoved(iceCandidates: Array?) { + logger.i { "[onIceCandidatesRemoved] #sfu; #$typeTag; iceCandidates: $iceCandidates" } + } + + override fun onIceCandidateError(event: IceCandidateErrorEvent?) { + logger.e { "[onIceCandidateError] #sfu; #$typeTag; event: ${event?.stringify()}" } + } + + override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) { + logger.i { "[onConnectionChange] #sfu; #$typeTag; newState: $newState" } + } + + override fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent?) { + logger.i { "[onSelectedCandidatePairChanged] #sfu; #$typeTag; event: $event" } + } + + override fun onDataChannel(channel: DataChannel?): Unit = Unit + + override fun toString(): String = + "StreamPeerConnection(type='$typeTag', constraints=$mediaConstraints)" + + private fun String.mungeCodecs(): String { + return this.replace("vp9", "VP9").replace("vp8", "VP8").replace("h264", "H264") + } +} diff --git a/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerConnectionFactory.kt b/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerConnectionFactory.kt new file mode 100644 index 0000000..609bc66 --- /dev/null +++ b/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerConnectionFactory.kt @@ -0,0 +1,287 @@ +package com.example.tvcontroller.services.webrtc.peer +/* + * Copyright 2023 Stream.IO, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import android.content.Context +import android.os.Build +import io.getstream.log.taggedLogger +import kotlinx.coroutines.CoroutineScope +import org.webrtc.AudioSource +import org.webrtc.AudioTrack +import org.webrtc.DefaultVideoDecoderFactory +import org.webrtc.EglBase +import org.webrtc.HardwareVideoEncoderFactory +import org.webrtc.IceCandidate +import org.webrtc.Logging +import org.webrtc.MediaConstraints +import org.webrtc.MediaStream +import org.webrtc.PeerConnection +import org.webrtc.PeerConnectionFactory +import org.webrtc.RtpTransceiver +import org.webrtc.SimulcastVideoEncoderFactory +import org.webrtc.SoftwareVideoEncoderFactory +import org.webrtc.VideoSource +import org.webrtc.VideoTrack +import org.webrtc.audio.JavaAudioDeviceModule + +class StreamPeerConnectionFactory constructor( + private val context: Context +) { + private val webRtcLogger by taggedLogger("Call:WebRTC") + private val audioLogger by taggedLogger("Call:AudioTrackCallback") + + val eglBaseContext: EglBase.Context by lazy { + EglBase.create().eglBaseContext + } + + /** + * Default video decoder factory used to unpack video from the remote tracks. + */ + private val videoDecoderFactory by lazy { + DefaultVideoDecoderFactory( + eglBaseContext + ) + } + + // rtcConfig contains STUN and TURN servers list + val rtcConfig = PeerConnection.RTCConfiguration( + arrayListOf( + // adding google's standard server + PeerConnection.IceServer.builder("stun:stun.l.google.com:19302").createIceServer() + ) + ).apply { + // it's very important to use new unified sdp semantics PLAN_B is deprecated + sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN + } + + /** + * Default encoder factory that supports Simulcast, used to send video tracks to the server. + */ + private val videoEncoderFactory by lazy { + val hardwareEncoder = HardwareVideoEncoderFactory(eglBaseContext, true, true) + SimulcastVideoEncoderFactory(hardwareEncoder, SoftwareVideoEncoderFactory()) + } + + /** + * Factory that builds all the connections based on the extensive configuration provided under + * the hood. + */ + private val factory by lazy { + PeerConnectionFactory.initialize( + PeerConnectionFactory.InitializationOptions.builder(context) + .setInjectableLogger({ message, severity, label -> + when (severity) { + Logging.Severity.LS_VERBOSE -> { + webRtcLogger.v { "[onLogMessage] label: $label, message: $message" } + } + + Logging.Severity.LS_INFO -> { + webRtcLogger.i { "[onLogMessage] label: $label, message: $message" } + } + + Logging.Severity.LS_WARNING -> { + webRtcLogger.w { "[onLogMessage] label: $label, message: $message" } + } + + Logging.Severity.LS_ERROR -> { + webRtcLogger.e { "[onLogMessage] label: $label, message: $message" } + } + + Logging.Severity.LS_NONE -> { + webRtcLogger.d { "[onLogMessage] label: $label, message: $message" } + } + + else -> {} + } + }, Logging.Severity.LS_VERBOSE) + .createInitializationOptions() + ) + + PeerConnectionFactory.builder() + .setVideoDecoderFactory(videoDecoderFactory) + .setVideoEncoderFactory(videoEncoderFactory) + .setAudioDeviceModule( + JavaAudioDeviceModule + .builder(context) + .setUseHardwareAcousticEchoCanceler(Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) + .setUseHardwareNoiseSuppressor(Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q) + .setAudioRecordErrorCallback(object : + JavaAudioDeviceModule.AudioRecordErrorCallback { + override fun onWebRtcAudioRecordInitError(p0: String?) { + audioLogger.w { "[onWebRtcAudioRecordInitError] $p0" } + } + + override fun onWebRtcAudioRecordStartError( + p0: JavaAudioDeviceModule.AudioRecordStartErrorCode?, + p1: String? + ) { + audioLogger.w { "[onWebRtcAudioRecordInitError] $p1" } + } + + override fun onWebRtcAudioRecordError(p0: String?) { + audioLogger.w { "[onWebRtcAudioRecordError] $p0" } + } + }) + .setAudioTrackErrorCallback(object : + JavaAudioDeviceModule.AudioTrackErrorCallback { + override fun onWebRtcAudioTrackInitError(p0: String?) { + audioLogger.w { "[onWebRtcAudioTrackInitError] $p0" } + } + + override fun onWebRtcAudioTrackStartError( + p0: JavaAudioDeviceModule.AudioTrackStartErrorCode?, + p1: String? + ) { + audioLogger.w { "[onWebRtcAudioTrackStartError] $p0" } + } + + override fun onWebRtcAudioTrackError(p0: String?) { + audioLogger.w { "[onWebRtcAudioTrackError] $p0" } + } + }) + .setAudioRecordStateCallback(object : + JavaAudioDeviceModule.AudioRecordStateCallback { + override fun onWebRtcAudioRecordStart() { + audioLogger.d { "[onWebRtcAudioRecordStart] no args" } + } + + override fun onWebRtcAudioRecordStop() { + audioLogger.d { "[onWebRtcAudioRecordStop] no args" } + } + }) + .setAudioTrackStateCallback(object : + JavaAudioDeviceModule.AudioTrackStateCallback { + override fun onWebRtcAudioTrackStart() { + audioLogger.d { "[onWebRtcAudioTrackStart] no args" } + } + + override fun onWebRtcAudioTrackStop() { + audioLogger.d { "[onWebRtcAudioTrackStop] no args" } + } + }) + .createAudioDeviceModule().also { + it.setMicrophoneMute(false) + it.setSpeakerMute(false) + } + ) + .createPeerConnectionFactory() + } + + /** + * Builds a [StreamPeerConnection] that wraps the WebRTC [PeerConnection] and exposes several + * helpful handlers. + * + * @param coroutineScope Scope used for asynchronous operations. + * @param configuration The [PeerConnection.RTCConfiguration] used to set up the connection. + * @param type The type of connection, either a subscriber of a publisher. + * @param mediaConstraints Constraints used for audio and video tracks in the connection. + * @param onStreamAdded Handler when a new [MediaStream] gets added. + * @param onNegotiationNeeded Handler when there's a new negotiation. + * @param onIceCandidateRequest Handler whenever we receive [IceCandidate]s. + * @return [StreamPeerConnection] That's fully set up and can be observed and used to send and + * receive tracks. + */ + fun makePeerConnection( + coroutineScope: CoroutineScope, + configuration: PeerConnection.RTCConfiguration, + type: StreamPeerType, + mediaConstraints: MediaConstraints, + onStreamAdded: ((MediaStream) -> Unit)? = null, + onNegotiationNeeded: ((StreamPeerConnection, StreamPeerType) -> Unit)? = null, + onIceCandidateRequest: ((IceCandidate, StreamPeerType) -> Unit)? = null, + onVideoTrack: ((RtpTransceiver?) -> Unit)? = null + ): StreamPeerConnection { + val peerConnection = StreamPeerConnection( + coroutineScope = coroutineScope, + type = type, + mediaConstraints = mediaConstraints, + onStreamAdded = onStreamAdded, + onNegotiationNeeded = onNegotiationNeeded, + onIceCandidate = onIceCandidateRequest, + onVideoTrack = onVideoTrack + ) + val connection = makePeerConnectionInternal( + configuration = configuration, + observer = peerConnection + ) + return peerConnection.apply { initialize(connection) } + } + + /** + * Builds a [PeerConnection] internally that connects to the server and is able to send and + * receive tracks. + * + * @param configuration The [PeerConnection.RTCConfiguration] used to set up the connection. + * @param observer Handler used to observe different states of the connection. + * @return [PeerConnection] that's fully set up. + */ + private fun makePeerConnectionInternal( + configuration: PeerConnection.RTCConfiguration, + observer: PeerConnection.Observer? + ): PeerConnection { + return requireNotNull( + factory.createPeerConnection( + configuration, + observer + ) + ) + } + + /** + * Builds a [VideoSource] from the [factory] that can be used for regular video share (camera) + * or screen sharing. + * + * @param isScreencast If we're screen sharing using this source. + * @return [VideoSource] that can be used to build tracks. + */ + fun makeVideoSource(isScreencast: Boolean): VideoSource = + factory.createVideoSource(isScreencast) + + /** + * Builds a [VideoTrack] from the [factory] that can be used for regular video share (camera) + * or screen sharing. + * + * @param source The [VideoSource] used for the track. + * @param trackId The unique ID for this track. + * @return [VideoTrack] That represents a video feed. + */ + fun makeVideoTrack( + source: VideoSource, + trackId: String + ): VideoTrack = factory.createVideoTrack(trackId, source) + + /** + * Builds an [AudioSource] from the [factory] that can be used for audio sharing. + * + * @param constraints The constraints used to change the way the audio behaves. + * @return [AudioSource] that can be used to build tracks. + */ + fun makeAudioSource(constraints: MediaConstraints = MediaConstraints()): AudioSource = + factory.createAudioSource(constraints) + + /** + * Builds an [AudioTrack] from the [factory] that can be used for regular video share (camera) + * or screen sharing. + * + * @param source The [AudioSource] used for the track. + * @param trackId The unique ID for this track. + * @return [AudioTrack] That represents an audio feed. + */ + fun makeAudioTrack( + source: AudioSource, + trackId: String + ): AudioTrack = factory.createAudioTrack(trackId, source) +} diff --git a/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerType.kt b/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerType.kt new file mode 100644 index 0000000..0940d87 --- /dev/null +++ b/app/src/main/java/com/example/tvcontroller/services/webrtc/peer/StreamPeerType.kt @@ -0,0 +1,25 @@ +package com.example.tvcontroller.services.webrtc.peer +/* + * Copyright 2023 Stream.IO, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The type of peer connections, either a [PUBLISHER] that sends data to the call or a [SUBSCRIBER] + * that receives and decodes the data from the server. + */ +enum class StreamPeerType { + PUBLISHER, + SUBSCRIBER +} diff --git a/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/PeerConnectionUtils.kt b/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/PeerConnectionUtils.kt new file mode 100644 index 0000000..c80f6c2 --- /dev/null +++ b/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/PeerConnectionUtils.kt @@ -0,0 +1,39 @@ +package com.example.tvcontroller.services.webrtc.utils +/* + * Copyright 2023 Stream.IO, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.webrtc.AddIceObserver +import org.webrtc.IceCandidate +import org.webrtc.PeerConnection +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +suspend fun PeerConnection.addRtcIceCandidate(iceCandidate: IceCandidate): Result { + return suspendCoroutine { cont -> + addIceCandidate( + iceCandidate, + object : AddIceObserver { + override fun onAddSuccess() { + cont.resume(Result.success(Unit)) + } + + override fun onAddFailure(error: String?) { + cont.resume(Result.failure(RuntimeException(error))) + } + } + ) + } +} diff --git a/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/SDPUtils.kt b/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/SDPUtils.kt new file mode 100644 index 0000000..4a6470b --- /dev/null +++ b/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/SDPUtils.kt @@ -0,0 +1,71 @@ +package com.example.tvcontroller.services.webrtc.utils +/* + * Copyright 2023 Stream.IO, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.webrtc.SdpObserver +import org.webrtc.SessionDescription +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine + +suspend inline fun createValue( + crossinline call: (SdpObserver) -> Unit +): Result = suspendCoroutine { + val observer = object : SdpObserver { + + /** + * Handling of create values. + */ + override fun onCreateSuccess(description: SessionDescription?) { + if (description != null) { + it.resume(Result.success(description)) + } else { + it.resume(Result.failure(RuntimeException("SessionDescription is null!"))) + } + } + + override fun onCreateFailure(message: String?) = + it.resume(Result.failure(RuntimeException(message))) + + /** + * We ignore set results. + */ + override fun onSetSuccess() = Unit + override fun onSetFailure(p0: String?) = Unit + } + + call(observer) +} + +suspend inline fun setValue( + crossinline call: (SdpObserver) -> Unit +): Result = suspendCoroutine { + val observer = object : SdpObserver { + /** + * We ignore create results. + */ + override fun onCreateFailure(p0: String?) = Unit + override fun onCreateSuccess(p0: SessionDescription?) = Unit + + /** + * Handling of set values. + */ + override fun onSetSuccess() = it.resume(Result.success(Unit)) + override fun onSetFailure(message: String?) = + it.resume(Result.failure(RuntimeException(message))) + } + + call(observer) +} diff --git a/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/stringify.kt b/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/stringify.kt new file mode 100644 index 0000000..9e299cf --- /dev/null +++ b/app/src/main/java/com/example/tvcontroller/services/webrtc/utils/stringify.kt @@ -0,0 +1,43 @@ +package com.example.tvcontroller.services.webrtc.utils +/* + * Copyright 2023 Stream.IO, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.example.tvcontroller.services.webrtc.peer.StreamPeerType +import org.webrtc.IceCandidateErrorEvent +import org.webrtc.MediaStreamTrack +import org.webrtc.SessionDescription +import org.webrtc.audio.JavaAudioDeviceModule + +fun SessionDescription.stringify(): String = + "SessionDescription(type=$type, description=$description)" + +fun MediaStreamTrack.stringify(): String { + return "MediaStreamTrack(id=${id()}, kind=${kind()}, enabled: ${enabled()}, state=${state()})" +} + +fun IceCandidateErrorEvent.stringify(): String { + return "IceCandidateErrorEvent(errorCode=$errorCode, $errorText, address=$address, port=$port, url=$url)" +} + +fun JavaAudioDeviceModule.AudioSamples.stringify(): String { + return "AudioSamples(audioFormat=$audioFormat, channelCount=$channelCount" + + ", sampleRate=$sampleRate, data.size=${data.size})" +} + +fun StreamPeerType.stringify() = when (this) { + StreamPeerType.PUBLISHER -> "publisher" + StreamPeerType.SUBSCRIBER -> "subscriber" +} diff --git a/app/src/main/java/com/example/tvcontroller/ui/views/CameraView.kt b/app/src/main/java/com/example/tvcontroller/ui/views/CameraView.kt index 02ab23f..7f791af 100644 --- a/app/src/main/java/com/example/tvcontroller/ui/views/CameraView.kt +++ b/app/src/main/java/com/example/tvcontroller/ui/views/CameraView.kt @@ -2,26 +2,26 @@ package com.example.tvcontroller.ui.views import androidx.camera.view.CameraController import androidx.camera.view.LifecycleCameraController -import androidx.compose.foundation.layout.Arrangement import androidx.compose.foundation.layout.Box -import androidx.compose.foundation.layout.Column import androidx.compose.foundation.layout.fillMaxSize import androidx.compose.foundation.layout.padding -import androidx.compose.material3.Text import androidx.compose.runtime.Composable import androidx.compose.runtime.remember -import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.platform.LocalContext import androidx.compose.ui.unit.dp +import androidx.core.content.ContextCompat +import com.example.tvcontroller.services.WebRtcService import com.example.tvcontroller.ui.components.CameraPreview @Composable fun CameraView() { + val webRtcService = remember { WebRtcService() } val context = LocalContext.current val controller = remember { LifecycleCameraController(context).apply { - setEnabledUseCases(CameraController.VIDEO_CAPTURE) + setEnabledUseCases(CameraController.IMAGE_ANALYSIS) + setImageAnalysisAnalyzer(ContextCompat.getMainExecutor(context), webRtcService) } } Box( diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 460bb85..a806d4d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,4 +1,5 @@ [versions] +streamLog = "1.1.4" agp = "8.9.0" cameraCore = "1.4.1" kotlin = "2.0.0" @@ -11,6 +12,7 @@ lifecycleRuntimeKtx = "2.6.1" activityCompose = "1.8.0" composeBom = "2024.04.01" navigationCompose = "2.8.4" +streamWebrtcAndroid = "1.3.8" [libraries] androidx-camera-camera2 = { module = "androidx.camera:camera-camera2", version.ref = "cameraCore" } @@ -37,6 +39,9 @@ androidx-ui-test-junit4 = { group = "androidx.compose.ui", name = "ui-test-junit androidx-material3 = { group = "androidx.compose.material3", name = "material3" } ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" } ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" } +stream-webrtc-android = { module = "io.getstream:stream-webrtc-android", version.ref = "streamWebrtcAndroid" } +stream-log = { group = "io.getstream", name = "stream-log-android", version.ref = "streamLog" } + [plugins] android-application = { id = "com.android.application", version.ref = "agp" }