diff --git a/go.mod b/go.mod index 71cce32..6246a3b 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,18 @@ module playback-device-server go 1.24.1 require ( - github.com/google/uuid v1.6.0 // indirect - github.com/gorilla/websocket v1.5.3 // indirect - github.com/labstack/echo/v4 v4.13.3 // indirect + github.com/gorilla/websocket v1.5.3 + github.com/labstack/echo/v4 v4.13.3 github.com/labstack/gommon v0.4.2 // indirect - github.com/matoous/go-nanoid v1.5.1 // indirect + github.com/matoous/go-nanoid v1.5.1 github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mattn/go-sqlite3 v1.14.24 // indirect - github.com/rs/zerolog v1.33.0 // indirect + github.com/mattn/go-sqlite3 v1.14.24 + github.com/rs/zerolog v1.33.0 github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect - github.com/ziflex/lecho/v3 v3.7.0 // indirect - golang.org/x/crypto v0.36.0 // indirect + github.com/ziflex/lecho/v3 v3.7.0 + golang.org/x/crypto v0.36.0 golang.org/x/net v0.33.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/text v0.23.0 // indirect diff --git a/main/main.go b/main/main.go index 7e56730..1f609bb 100644 --- a/main/main.go +++ b/main/main.go @@ -75,6 +75,9 @@ func main() { webSocketServer.SetRouter(webServer.Router()) webSocketServer.Initialize(&authenticator) + webRTCWSHandler := server.WebRTCWSHandler{} + webSocketServer.AddHandler(&webRTCWSHandler) + var wg sync.WaitGroup wg.Add(1) go func() { diff --git a/server/authenticator.go b/server/authenticator.go index 551ac1f..0740a77 100644 --- a/server/authenticator.go +++ b/server/authenticator.go @@ -43,7 +43,6 @@ func (r *Authenticator) Authenticate(path string, exceptions []string) func(next } } cookie, err := context.Cookie("token") - fmt.Println(context.Cookies()) if err != nil { SendError(401, context, "no cookie for session token found") return err @@ -69,8 +68,6 @@ func (r *Authenticator) Authenticate(path string, exceptions []string) func(next return fmt.Errorf("no integration or user found for given token") } - fmt.Println("user:", user, "session:", session, "integration:", integration) - authContext := AuthContext{Context: context, User: user, Session: session, Integration: integration} return next(authContext) diff --git a/server/webrtc_ws_handler.go b/server/webrtc_ws_handler.go new file mode 100644 index 0000000..25fb13a --- /dev/null +++ b/server/webrtc_ws_handler.go @@ -0,0 +1,50 @@ +package server + +import ( + "encoding/json" + + "github.com/gorilla/websocket" + "github.com/rs/zerolog/log" +) + +const TYPE_SIGNALING = "signaling" + +type WebRTCWSHandler struct { +} + +func (h *WebRTCWSHandler) Handle(messageObject map[string]any, senderId string, sockets map[string]*websocket.Conn) { + if _, ok := messageObject["type"]; !ok { + return + } + + messageType := messageObject["type"] + switch messageType { + case TYPE_SIGNALING: + target, ok := messageObject["target"].(string) + if !ok { + log.Error().Msgf("Invalid target in signaling message") + return + } + message := messageObject["message"] + h.handleSignaling(target, message, senderId, sockets) + } +} + +func (h *WebRTCWSHandler) handleSignaling(target string, message any, senderId string, sockets map[string]*websocket.Conn) { + ws, ok := sockets[target] + if !ok { + log.Error().Msgf("No connection found for target %s", target) + return + } + byteArray, err := json.Marshal(map[string]any{"sender": senderId, "message": message}) + if err != nil { + log.Error().Msgf("Error marshaling signaling message: %s", err) + return + } + log.Info().Str("target", target).Str("sender", senderId).Msg("sending signaling message to target") + err = ws.WriteMessage(websocket.TextMessage, byteArray) + if err != nil { + log.Error().Msgf("Error writing signaling message to target: %s", err) + return + } +} diff --git a/server/websocket_server.go b/server/websocket_server.go index 023263b..93bbc79 100644 --- a/server/websocket_server.go +++ b/server/websocket_server.go @@ -2,7 +2,6 @@ package server import ( "encoding/json" - "fmt" "github.com/gorilla/websocket" "github.com/labstack/echo/v4" @@ -10,9 +9,14 @@ import ( var upgrader = websocket.Upgrader{} +type WebsocketHandler interface { + Handle(message map[string]any, senderId string, sockets map[string]*websocket.Conn) +} + type WebsocketServer struct { - router *echo.Echo - sockets map[string]*websocket.Conn + router *echo.Echo + sockets map[string]*websocket.Conn + handlers []WebsocketHandler } func (s *WebsocketServer) Initialize(authenticator *Authenticator) { @@ -42,8 +46,9 @@ func (s *WebsocketServer) handle(context echo.Context) error { if messageType == websocket.TextMessage { var messageObject map[string]any json.Unmarshal(messageBytes, &messageObject) - fmt.Println("Received message from authenticated user", senderId) - fmt.Println(messageObject) + for _, handler := range s.handlers { + handler.Handle(messageObject, senderId, s.sockets) + } } } } @@ -58,6 +63,10 @@ func getAuthenticatedId(authContext AuthContext) string { return "" } +func (s *WebsocketServer) AddHandler(handler WebsocketHandler) { + s.handlers = append(s.handlers, handler) +} + func (s *WebsocketServer) SetRouter(router *echo.Echo) { s.router = router } diff --git a/www/src/services/webrtc-service.js b/www/src/services/webrtc-service.js new file mode 100644 index 0000000..124b6ba --- /dev/null +++ b/www/src/services/webrtc-service.js @@ -0,0 +1,134 @@ +import WebsocketService from "./websocket-service"; + +function WebRTCService() { + const TYPE_SIGNALING = "signaling"; + + let videoElement; + let peerConnection; + let peerId; + + WebsocketService.onMessage((data) => { + let dataObject = JSON.parse(data); + let sender = dataObject.sender; + if (sender !== peerId) return; + let message = dataObject.message; + handleSignalingMessage(peerId, message); + }); + + async function connect(targetId) { + peerId = targetId; + let configuration = getConfiguration(); + peerConnection = new RTCPeerConnection(configuration); + console.log("ICE connection state:" + peerConnection.iceConnectionState) + peerConnection.oniceconnectionstatechange = (event) => { + console.log("ICE connection state changed to:", peerConnection.iceConnectionState); + }; + peerConnection.addEventListener("signalingstatechange", (event) => { + console.log("Signaling state changed to:", event.target.signalingState); + }); + peerConnection.onicecandidate = (event) => { + if (event.candidate) { + sendIceCandidate(targetId, event.candidate); + } + }; + peerConnection.ontrack = (event) => { + console.log("Received track:", event.track); + if (videoElement) { + videoElement.srcObject.addTrack(event.track); + } + }; + peerConnection.onicecandidateerror = (event) => { + console.error("ICE candidate error:", event); + }; + peerConnection.onnegotiationneeded = () => { + console.log("Negotiation needed"); + negotiate(targetId); + } + negotiate(targetId); + } + + async function negotiate(targetId) { + try { + let offer = await peerConnection.createOffer({ + offerToReceiveAudio: true, + offerToReceiveVideo: true, + }); + console.log("Created offer:", offer) + await peerConnection.setLocalDescription(offer); + sendOffer(targetId, offer); + } catch (error) { + console.error("Error creating offer:", error); + } + } + + function sendOffer(integrationId, offer) { + WebsocketService.sendJson({ + type: TYPE_SIGNALING, + target: integrationId, + message: offer, + }); + } + + function sendIceCandidate(integrationId, candidate) { + console.log("Sending ICE candidate:", candidate); + WebsocketService.sendJson({ + type: TYPE_SIGNALING, + target: integrationId, + message: { + type: "ice_candidate", + candidate: candidate, + }, + }); + } + + function handleSignalingMessage(targetId, message) { + //console.log("Received message:", message); + switch (message.type) { + case "answer": + handleAnswer(message); + break; + case "ice_candidate": + handleIceCandidate(message); + break; + default: + return; + } + } + + function handleAnswer(answer) { + console.log("Remote answer:", answer); + if (peerConnection.signalingState === "stable") return + peerConnection.setRemoteDescription(new RTCSessionDescription(answer)); + } + + function handleIceCandidate(message) { + let iceCandidate = new RTCIceCandidate(message.candidate); + console.log("Received ICE candidate:", iceCandidate); + peerConnection.addIceCandidate(iceCandidate); + } + + function getConfiguration() { + return { + iceServers: [{ urls: "stun:stun.l.google.com:19302" }], + }; + } + + function setVideoElement(element) { + videoElement = element; + videoElement.srcObject = new MediaStream(); + } + + function getVideoElement() { + return videoElement; + } + + return { + connect, + setVideoElement, + getVideoElement, + }; +} + +WebRTCService = new WebRTCService(); + +export default WebRTCService; diff --git a/www/src/services/websocket-service.js b/www/src/services/websocket-service.js new file mode 100644 index 0000000..2eed4a0 --- /dev/null +++ b/www/src/services/websocket-service.js @@ -0,0 +1,40 @@ +function WebsocketService() { + let websocket; + + initialize(); + function initialize() { + let url = "ws://" + location.host + "/ws"; + websocket = new WebSocket(url); + } + + function onMessage(callback) { + if (!websocket) throw new Error("Websocket is not open"); + websocket.addEventListener("message", event => { + callback(event.data); + }); + } + + function sendMessage(message) { + if (!isWebsocketOpen()) throw new Error("Websocket is not open"); + websocket.send(message); + } + + function sendJson(jsonObject) { + let jsonString = JSON.stringify(jsonObject); + sendMessage(jsonString); + } + + function isWebsocketOpen() { + return websocket.readyState === WebSocket.OPEN; + } + + return { + onMessage, + sendJson, + sendMessage, + }; +} + +WebsocketService = new WebsocketService(); + +export default WebsocketService; diff --git a/www/src/views/integration-view.jsx b/www/src/views/integration-view.jsx index 908cace..54bfbae 100644 --- a/www/src/views/integration-view.jsx +++ b/www/src/views/integration-view.jsx @@ -1,5 +1,6 @@ import { createMemo, createSignal } from "solid-js"; import Integration from "../data/integration"; +import WebRTCService from "../services/webrtc-service"; const [integration, setIntegration] = createSignal(null); @@ -9,6 +10,14 @@ function IntegrationView(props) { ? integration().getName() : "Integration" ); + let videoElement = null; + + function handleConnectWebRTC() { + let integrationId = integration().getId(); + WebRTCService.setVideoElement(videoElement); + WebRTCService.connect(integrationId); + } + return (
Integration

{title}

+
+
+ +
+
+
+ +
); }