working: webrtc

This commit is contained in:
Fritz Heiden 2025-03-24 20:25:37 +01:00
parent 8d14f3575f
commit e140c6b32e
11 changed files with 960 additions and 5 deletions

View File

@ -59,6 +59,8 @@ dependencies {
implementation(libs.androidx.camera.view)
implementation(libs.androidx.camera.mlkit.vision)
implementation(libs.androidx.camera.extensions)
implementation(libs.stream.webrtc.android)
implementation(libs.stream.log)
testImplementation(libs.junit)
androidTestImplementation(libs.androidx.junit)
androidTestImplementation(libs.androidx.espresso.core)

View File

@ -0,0 +1,16 @@
package com.example.tvcontroller.services
import android.util.Log
import androidx.camera.core.ImageAnalysis
import androidx.camera.core.ImageProxy
class WebRtcService : ImageAnalysis.Analyzer {
val sessionManager = LocalWebRtcSessionManager.current
val localVideoTrackState by sessionManager.localVideoTrackFlow.collectAsState(null)
val localVideoTrack = localVideoTrackState
override fun analyze(image: ImageProxy) {
Log.i("WebRtcService", "Image received")
image.close()
}
}

View File

@ -0,0 +1,129 @@
package com.example.tvcontroller.services.webrtc
/*
* Copyright 2023 Stream.IO, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.getstream.log.taggedLogger
import io.ktor.client.*
import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.client.plugins.websocket.webSocketSession
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.util.logging.Logger
import io.ktor.websocket.CloseReason
import io.ktor.websocket.DefaultWebSocketSession
import io.ktor.websocket.close
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import org.webrtc.Logging
const val SIGNALING_SERVER_IP_ADDRESS = "wss://signaling.stream.io"
class SignalingClient {
private val logger by taggedLogger("Call:SignalingClient")
private val signalingScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val client = HttpClient {
install(WebSockets)
}
// opening web socket with signaling server
private lateinit var ws: DefaultWebSocketSession
init {
signalingScope.launch {
try {
ws = client.webSocketSession {
url(SIGNALING_SERVER_IP_ADDRESS)
}
} catch (e: Exception) {
logger.e { "Error connecting to signaling server: ${e.message}" }
_sessionStateFlow.value = WebRTCSessionState.Offline
}
}
}
// session flow to send information about the session state to the subscribers
private val _sessionStateFlow = MutableStateFlow(WebRTCSessionState.Offline)
val sessionStateFlow: StateFlow<WebRTCSessionState> = _sessionStateFlow
// signaling commands to send commands to value pairs to the subscribers
private val _signalingCommandFlow = MutableSharedFlow<Pair<SignalingCommand, String>>()
val signalingCommandFlow: SharedFlow<Pair<SignalingCommand, String>> = _signalingCommandFlow
fun sendCommand(signalingCommand: SignalingCommand, message: String) {
logger.d { "[sendCommand] $signalingCommand $message" }
signalingScope.launch {
ws.send("$signalingCommand $message")
}
}
private inner class SignalingWebSocketListener : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
when {
text.startsWith(SignalingCommand.STATE.toString(), true) ->
handleStateMessage(text)
text.startsWith(SignalingCommand.OFFER.toString(), true) ->
handleSignalingCommand(SignalingCommand.OFFER, text)
text.startsWith(SignalingCommand.ANSWER.toString(), true) ->
handleSignalingCommand(SignalingCommand.ANSWER, text)
text.startsWith(SignalingCommand.ICE.toString(), true) ->
handleSignalingCommand(SignalingCommand.ICE, text)
}
}
}
private fun handleStateMessage(message: String) {
val state = getSeparatedMessage(message)
_sessionStateFlow.value = WebRTCSessionState.valueOf(state)
}
private fun handleSignalingCommand(command: SignalingCommand, text: String) {
val value = getSeparatedMessage(text)
logger.d { "received signaling: $command $value" }
signalingScope.launch {
_signalingCommandFlow.emit(command to value)
}
}
private fun getSeparatedMessage(text: String) = text.substringAfter(' ')
fun dispose() {
_sessionStateFlow.value = WebRTCSessionState.Offline
signalingScope.cancel()
ws.close(CloseReason(CloseReason.Codes.NORMAL, "Client is shutting down"))
}
}
enum class WebRTCSessionState {
Active, // Offer and Answer messages has been sent
Creating, // Creating session, offer has been sent
Ready, // Both clients available and ready to initiate session
Impossible, // We have less than two clients connected to the server
Offline // unable to connect signaling server
}
enum class SignalingCommand {
STATE, // Command for WebRTCSessionState
OFFER, // to send or receive offer
ANSWER, // to send or receive answer
ICE // to send and receive ice candidates
}

View File

@ -0,0 +1,338 @@
package com.example.tvcontroller.services.webrtc.peer
/*
* Copyright 2023 Stream.IO, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.getstream.log.taggedLogger
import com.example.tvcontroller.services.webrtc.utils.addRtcIceCandidate
import com.example.tvcontroller.services.webrtc.utils.createValue
import com.example.tvcontroller.services.webrtc.utils.setValue
import com.example.tvcontroller.services.webrtc.utils.stringify
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.webrtc.CandidatePairChangeEvent
import org.webrtc.DataChannel
import org.webrtc.IceCandidate
import org.webrtc.IceCandidateErrorEvent
import org.webrtc.MediaConstraints
import org.webrtc.MediaStream
import org.webrtc.MediaStreamTrack
import org.webrtc.PeerConnection
import org.webrtc.RTCStatsReport
import org.webrtc.RtpReceiver
import org.webrtc.RtpTransceiver
import org.webrtc.SessionDescription
/**
* Wrapper around the WebRTC connection that contains tracks.
*
* @param coroutineScope The scope used to listen to stats events.
* @param type The internal type of the PeerConnection. Check [StreamPeerType].
* @param mediaConstraints Constraints used for the connections.
* @param onStreamAdded Handler when a new [MediaStream] gets added.
* @param onNegotiationNeeded Handler when there's a new negotiation.
* @param onIceCandidate Handler whenever we receive [IceCandidate]s.
*/
class StreamPeerConnection(
private val coroutineScope: CoroutineScope,
private val type: StreamPeerType,
private val mediaConstraints: MediaConstraints,
private val onStreamAdded: ((MediaStream) -> Unit)?,
private val onNegotiationNeeded: ((StreamPeerConnection, StreamPeerType) -> Unit)?,
private val onIceCandidate: ((IceCandidate, StreamPeerType) -> Unit)?,
private val onVideoTrack: ((RtpTransceiver?) -> Unit)?
) : PeerConnection.Observer {
private val typeTag = type.stringify()
private val logger by taggedLogger("Call:PeerConnection")
/**
* The wrapped connection for all the WebRTC communication.
*/
lateinit var connection: PeerConnection
private set
/**
* Used to manage the stats observation lifecycle.
*/
private var statsJob: Job? = null
/**
* Used to pool together and store [IceCandidate]s before consuming them.
*/
private val pendingIceMutex = Mutex()
private val pendingIceCandidates = mutableListOf<IceCandidate>()
/**
* Contains stats events for observation.
*/
private val statsFlow: MutableStateFlow<RTCStatsReport?> = MutableStateFlow(null)
init {
logger.i { "<init> #sfu; #$typeTag; mediaConstraints: $mediaConstraints" }
}
/**
* Initialize a [StreamPeerConnection] using a WebRTC [PeerConnection].
*
* @param peerConnection The connection that holds audio and video tracks.
*/
fun initialize(peerConnection: PeerConnection) {
logger.d { "[initialize] #sfu; #$typeTag; peerConnection: $peerConnection" }
this.connection = peerConnection
}
/**
* Used to create an offer whenever there's a negotiation that we need to process on the
* publisher side.
*
* @return [Result] wrapper of the [SessionDescription] for the publisher.
*/
suspend fun createOffer(): Result<SessionDescription> {
logger.d { "[createOffer] #sfu; #$typeTag; no args" }
return createValue { connection.createOffer(it, mediaConstraints) }
}
/**
* Used to create an answer whenever there's a subscriber offer.
*
* @return [Result] wrapper of the [SessionDescription] for the subscriber.
*/
suspend fun createAnswer(): Result<SessionDescription> {
logger.d { "[createAnswer] #sfu; #$typeTag; no args" }
return createValue { connection.createAnswer(it, mediaConstraints) }
}
/**
* Used to set up the SDP on underlying connections and to add [pendingIceCandidates] to the
* connection for listening.
*
* @param sessionDescription That contains the remote SDP.
* @return An empty [Result], if the operation has been successful or not.
*/
suspend fun setRemoteDescription(sessionDescription: SessionDescription): Result<Unit> {
logger.d { "[setRemoteDescription] #sfu; #$typeTag; answerSdp: ${sessionDescription.stringify()}" }
return setValue {
connection.setRemoteDescription(
it,
SessionDescription(
sessionDescription.type,
sessionDescription.description.mungeCodecs()
)
)
}.also {
pendingIceMutex.withLock {
pendingIceCandidates.forEach { iceCandidate ->
logger.i { "[setRemoteDescription] #sfu; #subscriber; pendingRtcIceCandidate: $iceCandidate" }
connection.addRtcIceCandidate(iceCandidate)
}
pendingIceCandidates.clear()
}
}
}
/**
* Sets the local description for a connection either for the subscriber or publisher based on
* the flow.
*
* @param sessionDescription That contains the subscriber or publisher SDP.
* @return An empty [Result], if the operation has been successful or not.
*/
suspend fun setLocalDescription(sessionDescription: SessionDescription): Result<Unit> {
val sdp = SessionDescription(
sessionDescription.type,
sessionDescription.description.mungeCodecs()
)
logger.d { "[setLocalDescription] #sfu; #$typeTag; offerSdp: ${sessionDescription.stringify()}" }
return setValue { connection.setLocalDescription(it, sdp) }
}
/**
* Adds an [IceCandidate] to the underlying [connection] if it's already been set up, or stores
* it for later consumption.
*
* @param iceCandidate To process and add to the connection.
* @return An empty [Result], if the operation has been successful or not.
*/
suspend fun addIceCandidate(iceCandidate: IceCandidate): Result<Unit> {
if (connection.remoteDescription == null) {
logger.w { "[addIceCandidate] #sfu; #$typeTag; postponed (no remoteDescription): $iceCandidate" }
pendingIceMutex.withLock {
pendingIceCandidates.add(iceCandidate)
}
return Result.failure(RuntimeException("RemoteDescription is not set"))
}
logger.d { "[addIceCandidate] #sfu; #$typeTag; rtcIceCandidate: $iceCandidate" }
return connection.addRtcIceCandidate(iceCandidate).also {
logger.v { "[addIceCandidate] #sfu; #$typeTag; completed: $it" }
}
}
/**
* Peer connection listeners.
*/
/**
* Triggered whenever there's a new [RtcIceCandidate] for the call. Used to update our tracks
* and subscriptions.
*
* @param candidate The new candidate.
*/
override fun onIceCandidate(candidate: IceCandidate?) {
logger.i { "[onIceCandidate] #sfu; #$typeTag; candidate: $candidate" }
if (candidate == null) return
onIceCandidate?.invoke(candidate, type)
}
/**
* Triggered whenever there's a new [MediaStream] that was added to the connection.
*
* @param stream The stream that contains audio or video.
*/
override fun onAddStream(stream: MediaStream?) {
logger.i { "[onAddStream] #sfu; #$typeTag; stream: $stream" }
if (stream != null) {
onStreamAdded?.invoke(stream)
}
}
/**
* Triggered whenever there's a new [MediaStream] or [MediaStreamTrack] that's been added
* to the call. It contains all audio and video tracks for a given session.
*
* @param receiver The receiver of tracks.
* @param mediaStreams The streams that were added containing their appropriate tracks.
*/
override fun onAddTrack(receiver: RtpReceiver?, mediaStreams: Array<out MediaStream>?) {
logger.i { "[onAddTrack] #sfu; #$typeTag; receiver: $receiver, mediaStreams: $mediaStreams" }
mediaStreams?.forEach { mediaStream ->
logger.v { "[onAddTrack] #sfu; #$typeTag; mediaStream: $mediaStream" }
mediaStream.audioTracks?.forEach { remoteAudioTrack ->
logger.v { "[onAddTrack] #sfu; #$typeTag; remoteAudioTrack: ${remoteAudioTrack.stringify()}" }
remoteAudioTrack.setEnabled(true)
}
onStreamAdded?.invoke(mediaStream)
}
}
/**
* Triggered whenever there's a new negotiation needed for the active [PeerConnection].
*/
override fun onRenegotiationNeeded() {
logger.i { "[onRenegotiationNeeded] #sfu; #$typeTag; no args" }
onNegotiationNeeded?.invoke(this, type)
}
/**
* Triggered whenever a [MediaStream] was removed.
*
* @param stream The stream that was removed from the connection.
*/
override fun onRemoveStream(stream: MediaStream?) {}
/**
* Triggered when the connection state changes. Used to start and stop the stats observing.
*
* @param newState The new state of the [PeerConnection].
*/
override fun onIceConnectionChange(newState: PeerConnection.IceConnectionState?) {
logger.i { "[onIceConnectionChange] #sfu; #$typeTag; newState: $newState" }
when (newState) {
PeerConnection.IceConnectionState.CLOSED,
PeerConnection.IceConnectionState.FAILED,
PeerConnection.IceConnectionState.DISCONNECTED -> statsJob?.cancel()
PeerConnection.IceConnectionState.CONNECTED -> statsJob = observeStats()
else -> Unit
}
}
/**
* @return The [RTCStatsReport] for the active connection.
*/
fun getStats(): StateFlow<RTCStatsReport?> {
return statsFlow
}
/**
* Observes the local connection stats and emits it to [statsFlow] that users can consume.
*/
private fun observeStats() = coroutineScope.launch {
while (isActive) {
delay(10_000L)
connection.getStats {
logger.v { "[observeStats] #sfu; #$typeTag; stats: $it" }
statsFlow.value = it
}
}
}
override fun onTrack(transceiver: RtpTransceiver?) {
logger.i { "[onTrack] #sfu; #$typeTag; transceiver: $transceiver" }
onVideoTrack?.invoke(transceiver)
}
/**
* Domain - [PeerConnection] and [PeerConnection.Observer] related callbacks.
*/
override fun onRemoveTrack(receiver: RtpReceiver?) {
logger.i { "[onRemoveTrack] #sfu; #$typeTag; receiver: $receiver" }
}
override fun onSignalingChange(newState: PeerConnection.SignalingState?) {
logger.d { "[onSignalingChange] #sfu; #$typeTag; newState: $newState" }
}
override fun onIceConnectionReceivingChange(receiving: Boolean) {
logger.i { "[onIceConnectionReceivingChange] #sfu; #$typeTag; receiving: $receiving" }
}
override fun onIceGatheringChange(newState: PeerConnection.IceGatheringState?) {
logger.i { "[onIceGatheringChange] #sfu; #$typeTag; newState: $newState" }
}
override fun onIceCandidatesRemoved(iceCandidates: Array<out org.webrtc.IceCandidate>?) {
logger.i { "[onIceCandidatesRemoved] #sfu; #$typeTag; iceCandidates: $iceCandidates" }
}
override fun onIceCandidateError(event: IceCandidateErrorEvent?) {
logger.e { "[onIceCandidateError] #sfu; #$typeTag; event: ${event?.stringify()}" }
}
override fun onConnectionChange(newState: PeerConnection.PeerConnectionState?) {
logger.i { "[onConnectionChange] #sfu; #$typeTag; newState: $newState" }
}
override fun onSelectedCandidatePairChanged(event: CandidatePairChangeEvent?) {
logger.i { "[onSelectedCandidatePairChanged] #sfu; #$typeTag; event: $event" }
}
override fun onDataChannel(channel: DataChannel?): Unit = Unit
override fun toString(): String =
"StreamPeerConnection(type='$typeTag', constraints=$mediaConstraints)"
private fun String.mungeCodecs(): String {
return this.replace("vp9", "VP9").replace("vp8", "VP8").replace("h264", "H264")
}
}

View File

@ -0,0 +1,287 @@
package com.example.tvcontroller.services.webrtc.peer
/*
* Copyright 2023 Stream.IO, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import android.content.Context
import android.os.Build
import io.getstream.log.taggedLogger
import kotlinx.coroutines.CoroutineScope
import org.webrtc.AudioSource
import org.webrtc.AudioTrack
import org.webrtc.DefaultVideoDecoderFactory
import org.webrtc.EglBase
import org.webrtc.HardwareVideoEncoderFactory
import org.webrtc.IceCandidate
import org.webrtc.Logging
import org.webrtc.MediaConstraints
import org.webrtc.MediaStream
import org.webrtc.PeerConnection
import org.webrtc.PeerConnectionFactory
import org.webrtc.RtpTransceiver
import org.webrtc.SimulcastVideoEncoderFactory
import org.webrtc.SoftwareVideoEncoderFactory
import org.webrtc.VideoSource
import org.webrtc.VideoTrack
import org.webrtc.audio.JavaAudioDeviceModule
class StreamPeerConnectionFactory constructor(
private val context: Context
) {
private val webRtcLogger by taggedLogger("Call:WebRTC")
private val audioLogger by taggedLogger("Call:AudioTrackCallback")
val eglBaseContext: EglBase.Context by lazy {
EglBase.create().eglBaseContext
}
/**
* Default video decoder factory used to unpack video from the remote tracks.
*/
private val videoDecoderFactory by lazy {
DefaultVideoDecoderFactory(
eglBaseContext
)
}
// rtcConfig contains STUN and TURN servers list
val rtcConfig = PeerConnection.RTCConfiguration(
arrayListOf(
// adding google's standard server
PeerConnection.IceServer.builder("stun:stun.l.google.com:19302").createIceServer()
)
).apply {
// it's very important to use new unified sdp semantics PLAN_B is deprecated
sdpSemantics = PeerConnection.SdpSemantics.UNIFIED_PLAN
}
/**
* Default encoder factory that supports Simulcast, used to send video tracks to the server.
*/
private val videoEncoderFactory by lazy {
val hardwareEncoder = HardwareVideoEncoderFactory(eglBaseContext, true, true)
SimulcastVideoEncoderFactory(hardwareEncoder, SoftwareVideoEncoderFactory())
}
/**
* Factory that builds all the connections based on the extensive configuration provided under
* the hood.
*/
private val factory by lazy {
PeerConnectionFactory.initialize(
PeerConnectionFactory.InitializationOptions.builder(context)
.setInjectableLogger({ message, severity, label ->
when (severity) {
Logging.Severity.LS_VERBOSE -> {
webRtcLogger.v { "[onLogMessage] label: $label, message: $message" }
}
Logging.Severity.LS_INFO -> {
webRtcLogger.i { "[onLogMessage] label: $label, message: $message" }
}
Logging.Severity.LS_WARNING -> {
webRtcLogger.w { "[onLogMessage] label: $label, message: $message" }
}
Logging.Severity.LS_ERROR -> {
webRtcLogger.e { "[onLogMessage] label: $label, message: $message" }
}
Logging.Severity.LS_NONE -> {
webRtcLogger.d { "[onLogMessage] label: $label, message: $message" }
}
else -> {}
}
}, Logging.Severity.LS_VERBOSE)
.createInitializationOptions()
)
PeerConnectionFactory.builder()
.setVideoDecoderFactory(videoDecoderFactory)
.setVideoEncoderFactory(videoEncoderFactory)
.setAudioDeviceModule(
JavaAudioDeviceModule
.builder(context)
.setUseHardwareAcousticEchoCanceler(Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q)
.setUseHardwareNoiseSuppressor(Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q)
.setAudioRecordErrorCallback(object :
JavaAudioDeviceModule.AudioRecordErrorCallback {
override fun onWebRtcAudioRecordInitError(p0: String?) {
audioLogger.w { "[onWebRtcAudioRecordInitError] $p0" }
}
override fun onWebRtcAudioRecordStartError(
p0: JavaAudioDeviceModule.AudioRecordStartErrorCode?,
p1: String?
) {
audioLogger.w { "[onWebRtcAudioRecordInitError] $p1" }
}
override fun onWebRtcAudioRecordError(p0: String?) {
audioLogger.w { "[onWebRtcAudioRecordError] $p0" }
}
})
.setAudioTrackErrorCallback(object :
JavaAudioDeviceModule.AudioTrackErrorCallback {
override fun onWebRtcAudioTrackInitError(p0: String?) {
audioLogger.w { "[onWebRtcAudioTrackInitError] $p0" }
}
override fun onWebRtcAudioTrackStartError(
p0: JavaAudioDeviceModule.AudioTrackStartErrorCode?,
p1: String?
) {
audioLogger.w { "[onWebRtcAudioTrackStartError] $p0" }
}
override fun onWebRtcAudioTrackError(p0: String?) {
audioLogger.w { "[onWebRtcAudioTrackError] $p0" }
}
})
.setAudioRecordStateCallback(object :
JavaAudioDeviceModule.AudioRecordStateCallback {
override fun onWebRtcAudioRecordStart() {
audioLogger.d { "[onWebRtcAudioRecordStart] no args" }
}
override fun onWebRtcAudioRecordStop() {
audioLogger.d { "[onWebRtcAudioRecordStop] no args" }
}
})
.setAudioTrackStateCallback(object :
JavaAudioDeviceModule.AudioTrackStateCallback {
override fun onWebRtcAudioTrackStart() {
audioLogger.d { "[onWebRtcAudioTrackStart] no args" }
}
override fun onWebRtcAudioTrackStop() {
audioLogger.d { "[onWebRtcAudioTrackStop] no args" }
}
})
.createAudioDeviceModule().also {
it.setMicrophoneMute(false)
it.setSpeakerMute(false)
}
)
.createPeerConnectionFactory()
}
/**
* Builds a [StreamPeerConnection] that wraps the WebRTC [PeerConnection] and exposes several
* helpful handlers.
*
* @param coroutineScope Scope used for asynchronous operations.
* @param configuration The [PeerConnection.RTCConfiguration] used to set up the connection.
* @param type The type of connection, either a subscriber of a publisher.
* @param mediaConstraints Constraints used for audio and video tracks in the connection.
* @param onStreamAdded Handler when a new [MediaStream] gets added.
* @param onNegotiationNeeded Handler when there's a new negotiation.
* @param onIceCandidateRequest Handler whenever we receive [IceCandidate]s.
* @return [StreamPeerConnection] That's fully set up and can be observed and used to send and
* receive tracks.
*/
fun makePeerConnection(
coroutineScope: CoroutineScope,
configuration: PeerConnection.RTCConfiguration,
type: StreamPeerType,
mediaConstraints: MediaConstraints,
onStreamAdded: ((MediaStream) -> Unit)? = null,
onNegotiationNeeded: ((StreamPeerConnection, StreamPeerType) -> Unit)? = null,
onIceCandidateRequest: ((IceCandidate, StreamPeerType) -> Unit)? = null,
onVideoTrack: ((RtpTransceiver?) -> Unit)? = null
): StreamPeerConnection {
val peerConnection = StreamPeerConnection(
coroutineScope = coroutineScope,
type = type,
mediaConstraints = mediaConstraints,
onStreamAdded = onStreamAdded,
onNegotiationNeeded = onNegotiationNeeded,
onIceCandidate = onIceCandidateRequest,
onVideoTrack = onVideoTrack
)
val connection = makePeerConnectionInternal(
configuration = configuration,
observer = peerConnection
)
return peerConnection.apply { initialize(connection) }
}
/**
* Builds a [PeerConnection] internally that connects to the server and is able to send and
* receive tracks.
*
* @param configuration The [PeerConnection.RTCConfiguration] used to set up the connection.
* @param observer Handler used to observe different states of the connection.
* @return [PeerConnection] that's fully set up.
*/
private fun makePeerConnectionInternal(
configuration: PeerConnection.RTCConfiguration,
observer: PeerConnection.Observer?
): PeerConnection {
return requireNotNull(
factory.createPeerConnection(
configuration,
observer
)
)
}
/**
* Builds a [VideoSource] from the [factory] that can be used for regular video share (camera)
* or screen sharing.
*
* @param isScreencast If we're screen sharing using this source.
* @return [VideoSource] that can be used to build tracks.
*/
fun makeVideoSource(isScreencast: Boolean): VideoSource =
factory.createVideoSource(isScreencast)
/**
* Builds a [VideoTrack] from the [factory] that can be used for regular video share (camera)
* or screen sharing.
*
* @param source The [VideoSource] used for the track.
* @param trackId The unique ID for this track.
* @return [VideoTrack] That represents a video feed.
*/
fun makeVideoTrack(
source: VideoSource,
trackId: String
): VideoTrack = factory.createVideoTrack(trackId, source)
/**
* Builds an [AudioSource] from the [factory] that can be used for audio sharing.
*
* @param constraints The constraints used to change the way the audio behaves.
* @return [AudioSource] that can be used to build tracks.
*/
fun makeAudioSource(constraints: MediaConstraints = MediaConstraints()): AudioSource =
factory.createAudioSource(constraints)
/**
* Builds an [AudioTrack] from the [factory] that can be used for regular video share (camera)
* or screen sharing.
*
* @param source The [AudioSource] used for the track.
* @param trackId The unique ID for this track.
* @return [AudioTrack] That represents an audio feed.
*/
fun makeAudioTrack(
source: AudioSource,
trackId: String
): AudioTrack = factory.createAudioTrack(trackId, source)
}

View File

@ -0,0 +1,25 @@
package com.example.tvcontroller.services.webrtc.peer
/*
* Copyright 2023 Stream.IO, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* The type of peer connections, either a [PUBLISHER] that sends data to the call or a [SUBSCRIBER]
* that receives and decodes the data from the server.
*/
enum class StreamPeerType {
PUBLISHER,
SUBSCRIBER
}

View File

@ -0,0 +1,39 @@
package com.example.tvcontroller.services.webrtc.utils
/*
* Copyright 2023 Stream.IO, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.webrtc.AddIceObserver
import org.webrtc.IceCandidate
import org.webrtc.PeerConnection
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
suspend fun PeerConnection.addRtcIceCandidate(iceCandidate: IceCandidate): Result<Unit> {
return suspendCoroutine { cont ->
addIceCandidate(
iceCandidate,
object : AddIceObserver {
override fun onAddSuccess() {
cont.resume(Result.success(Unit))
}
override fun onAddFailure(error: String?) {
cont.resume(Result.failure(RuntimeException(error)))
}
}
)
}
}

View File

@ -0,0 +1,71 @@
package com.example.tvcontroller.services.webrtc.utils
/*
* Copyright 2023 Stream.IO, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.webrtc.SdpObserver
import org.webrtc.SessionDescription
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
suspend inline fun createValue(
crossinline call: (SdpObserver) -> Unit
): Result<SessionDescription> = suspendCoroutine {
val observer = object : SdpObserver {
/**
* Handling of create values.
*/
override fun onCreateSuccess(description: SessionDescription?) {
if (description != null) {
it.resume(Result.success(description))
} else {
it.resume(Result.failure(RuntimeException("SessionDescription is null!")))
}
}
override fun onCreateFailure(message: String?) =
it.resume(Result.failure(RuntimeException(message)))
/**
* We ignore set results.
*/
override fun onSetSuccess() = Unit
override fun onSetFailure(p0: String?) = Unit
}
call(observer)
}
suspend inline fun setValue(
crossinline call: (SdpObserver) -> Unit
): Result<Unit> = suspendCoroutine {
val observer = object : SdpObserver {
/**
* We ignore create results.
*/
override fun onCreateFailure(p0: String?) = Unit
override fun onCreateSuccess(p0: SessionDescription?) = Unit
/**
* Handling of set values.
*/
override fun onSetSuccess() = it.resume(Result.success(Unit))
override fun onSetFailure(message: String?) =
it.resume(Result.failure(RuntimeException(message)))
}
call(observer)
}

View File

@ -0,0 +1,43 @@
package com.example.tvcontroller.services.webrtc.utils
/*
* Copyright 2023 Stream.IO, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.example.tvcontroller.services.webrtc.peer.StreamPeerType
import org.webrtc.IceCandidateErrorEvent
import org.webrtc.MediaStreamTrack
import org.webrtc.SessionDescription
import org.webrtc.audio.JavaAudioDeviceModule
fun SessionDescription.stringify(): String =
"SessionDescription(type=$type, description=$description)"
fun MediaStreamTrack.stringify(): String {
return "MediaStreamTrack(id=${id()}, kind=${kind()}, enabled: ${enabled()}, state=${state()})"
}
fun IceCandidateErrorEvent.stringify(): String {
return "IceCandidateErrorEvent(errorCode=$errorCode, $errorText, address=$address, port=$port, url=$url)"
}
fun JavaAudioDeviceModule.AudioSamples.stringify(): String {
return "AudioSamples(audioFormat=$audioFormat, channelCount=$channelCount" +
", sampleRate=$sampleRate, data.size=${data.size})"
}
fun StreamPeerType.stringify() = when (this) {
StreamPeerType.PUBLISHER -> "publisher"
StreamPeerType.SUBSCRIBER -> "subscriber"
}

View File

@ -2,26 +2,26 @@ package com.example.tvcontroller.ui.views
import androidx.camera.view.CameraController
import androidx.camera.view.LifecycleCameraController
import androidx.compose.foundation.layout.Arrangement
import androidx.compose.foundation.layout.Box
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.padding
import androidx.compose.material3.Text
import androidx.compose.runtime.Composable
import androidx.compose.runtime.remember
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.platform.LocalContext
import androidx.compose.ui.unit.dp
import androidx.core.content.ContextCompat
import com.example.tvcontroller.services.WebRtcService
import com.example.tvcontroller.ui.components.CameraPreview
@Composable
fun CameraView() {
val webRtcService = remember { WebRtcService() }
val context = LocalContext.current
val controller = remember {
LifecycleCameraController(context).apply {
setEnabledUseCases(CameraController.VIDEO_CAPTURE)
setEnabledUseCases(CameraController.IMAGE_ANALYSIS)
setImageAnalysisAnalyzer(ContextCompat.getMainExecutor(context), webRtcService)
}
}
Box(

View File

@ -1,4 +1,5 @@
[versions]
streamLog = "1.1.4"
agp = "8.9.0"
cameraCore = "1.4.1"
kotlin = "2.0.0"
@ -11,6 +12,7 @@ lifecycleRuntimeKtx = "2.6.1"
activityCompose = "1.8.0"
composeBom = "2024.04.01"
navigationCompose = "2.8.4"
streamWebrtcAndroid = "1.3.8"
[libraries]
androidx-camera-camera2 = { module = "androidx.camera:camera-camera2", version.ref = "cameraCore" }
@ -37,6 +39,9 @@ androidx-ui-test-junit4 = { group = "androidx.compose.ui", name = "ui-test-junit
androidx-material3 = { group = "androidx.compose.material3", name = "material3" }
ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" }
ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
stream-webrtc-android = { module = "io.getstream:stream-webrtc-android", version.ref = "streamWebrtcAndroid" }
stream-log = { group = "io.getstream", name = "stream-log-android", version.ref = "streamLog" }
[plugins]
android-application = { id = "com.android.application", version.ref = "agp" }