Skip to content

Commit

Permalink
screenshare POC.
Browse files Browse the repository at this point in the history
  • Loading branch information
m1k1o committed Jan 29, 2023
1 parent 72c0070 commit c873d4d
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 1 deletion.
34 changes: 34 additions & 0 deletions client/src/components/header.vue
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
<span><b>n</b>.eko</span>
</a>
<ul class="menu">
<li>
<button class="btn" @click="startShareScreen" v-if="!mediaStream">
START SCREEN SHARE
</button>
<button class="btn" @click="stopShareScreen" v-else>
STOP SCREEN SHARE
</button>
</li>
<li>
<i
:class="[{ disabled: !admin }, { locked: isLocked('control') }, 'fas', 'fa-mouse']"
Expand Down Expand Up @@ -207,5 +215,31 @@
return this.$t(`locks.${resource}.` + (this.isLocked(resource) ? `locked` : `unlocked`))
}
//
// Screen Share
//
mediaStream: MediaStream | null = null
mediaRtcpSender: RTCRtpSender | null = null
async startShareScreen() {
// get media stream from user's browser
this.mediaStream = await navigator.mediaDevices
.getDisplayMedia({
video: true,
audio: false,
})
const mediaTrack = this.mediaStream.getVideoTracks()[0];
this.mediaRtcpSender = this.$client.addTrack(mediaTrack, this.mediaStream)
}
async stopShareScreen() {
if (this.mediaStream) {
this.mediaStream.getTracks().forEach(track => track.stop())
this.mediaStream = null
}
if (this.mediaRtcpSender) {
this.$client.removeTrack(this.mediaRtcpSender)
this.mediaRtcpSender = null
}
}
}
</script>
16 changes: 16 additions & 0 deletions client/src/neko/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,22 @@ export abstract class BaseClient extends EventEmitter<BaseEvents> {
this._peer.setRemoteDescription({ type: 'answer', sdp })
}

public addTrack(track: MediaStreamTrack, ...streams: MediaStream[]): RTCRtpSender {
if (!this._peer) {
throw new Error('peer not connected')
}

return this._peer.addTrack(track, ...streams)
}

public removeTrack(sender: RTCRtpSender) {
if (!this._peer) {
throw new Error('peer not connected')
}

this._peer.removeTrack(sender)
}

private async onMessage(e: MessageEvent) {
const { event, ...payload } = JSON.parse(e.data) as WebSocketMessages

Expand Down
19 changes: 19 additions & 0 deletions server/internal/capture/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package capture

import (
"errors"
"fmt"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"m1k1o/neko/internal/config"
"m1k1o/neko/internal/types"
"m1k1o/neko/internal/types/codec"
)

type CaptureManagerCtx struct {
Expand All @@ -18,6 +20,9 @@ type CaptureManagerCtx struct {
broadcast *BroacastManagerCtx
audio *StreamSinkManagerCtx
video *StreamSinkManagerCtx

// source-sinks
screenshare *StreamSrcSinkManagerCtx
}

func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
Expand All @@ -43,6 +48,15 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt
}
return NewVideoPipeline(config.VideoCodec, config.Display, config.VideoPipeline, fps, config.VideoBitrate, config.VideoHWEnc)
}, "video"),

// source-sinks
screenshare: streamSrcSinkNew(config.ScreenshareEnabled, map[string]string{
codec.VP8().Name: "appsrc format=time is-live=true do-timestamp=true name=appsrc " +
fmt.Sprintf("! application/x-rtp, payload=%d, encoding-name=VP8-DRAFT-IETF-01 ", codec.VP8().PayloadType) +
"! rtpvp8depay " +
"! appsink name=appsink",
// TODO: Add support for more codecs.
}, "webcam"),
}
}

Expand Down Expand Up @@ -95,6 +109,7 @@ func (manager *CaptureManagerCtx) Start() {
func (manager *CaptureManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("shutdown")

manager.screenshare.shutdown()
manager.broadcast.shutdown()

manager.audio.shutdown()
Expand All @@ -114,3 +129,7 @@ func (manager *CaptureManagerCtx) Audio() types.StreamSinkManager {
func (manager *CaptureManagerCtx) Video() types.StreamSinkManager {
return manager.video
}

func (manager *CaptureManagerCtx) Screenshare() types.StreamSrcSinkManager {
return manager.screenshare
}
137 changes: 137 additions & 0 deletions server/internal/capture/streamsrcsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package capture

import (
"errors"
"sync"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"m1k1o/neko/internal/capture/gst"
"m1k1o/neko/internal/types"
"m1k1o/neko/internal/types/codec"
)

type StreamSrcSinkManagerCtx struct {
logger zerolog.Logger
sampleChannel chan types.Sample

enabled bool
codecPipeline map[string]string // codec -> pipeline

codec codec.RTPCodec
pipeline *gst.Pipeline
pipelineMu sync.Mutex
pipelineStr string
}

func streamSrcSinkNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcSinkManagerCtx {
logger := log.With().
Str("module", "capture").
Str("submodule", "stream-src-sink").
Str("video_id", video_id).Logger()

return &StreamSrcSinkManagerCtx{
logger: logger,
enabled: enabled,
codecPipeline: codecPipeline,
sampleChannel: make(chan types.Sample),
}
}

func (manager *StreamSrcSinkManagerCtx) shutdown() {
manager.logger.Info().Msgf("shutdown")

manager.Stop()
}

func (manager *StreamSrcSinkManagerCtx) Codec() codec.RTPCodec {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

return manager.codec
}

func (manager *StreamSrcSinkManagerCtx) Start(codec codec.RTPCodec) error {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

if manager.pipeline != nil {
return types.ErrCapturePipelineAlreadyExists
}

if !manager.enabled {
return errors.New("stream-src-sink not enabled")
}

found := false
for codecName, pipeline := range manager.codecPipeline {
if codecName == codec.Name {
manager.pipelineStr = pipeline
manager.codec = codec
found = true
break
}
}

if !found {
return errors.New("no pipeline found for a codec")
}

var err error

manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipelineStr).
Msgf("creating pipeline")

manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr)
if err != nil {
return err
}

manager.pipeline.AttachAppsrc("appsrc")
manager.pipeline.AttachAppsink("appsink", manager.sampleChannel)
manager.pipeline.Play()

return nil
}

func (manager *StreamSrcSinkManagerCtx) Stop() {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

if manager.pipeline == nil {
return
}

manager.pipeline.Destroy()
manager.pipeline = nil

manager.logger.Info().
Str("codec", manager.codec.Name).
Str("src", manager.pipelineStr).
Msgf("destroying pipeline")
}

func (manager *StreamSrcSinkManagerCtx) Push(bytes []byte) {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

if manager.pipeline == nil {
return
}

manager.pipeline.Push(bytes)
}

func (manager *StreamSrcSinkManagerCtx) Started() bool {
manager.pipelineMu.Lock()
defer manager.pipelineMu.Unlock()

return manager.pipeline != nil
}

func (manager *StreamSrcSinkManagerCtx) GetSampleChannel() chan types.Sample {
return manager.sampleChannel
}
18 changes: 18 additions & 0 deletions server/internal/config/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Capture struct {
// broadcast
BroadcastPipeline string
BroadcastUrl string

// screenshare
ScreenshareEnabled bool
}

func (Capture) Init(cmd *cobra.Command) error {
Expand Down Expand Up @@ -151,6 +154,15 @@ func (Capture) Init(cmd *cobra.Command) error {
return err
}

//
// screenshare
//

cmd.PersistentFlags().Bool("screenshare.enabled", true, "enable screenshare")
if err := viper.BindPFlag("screenshare.enabled", cmd.PersistentFlags().Lookup("screenshare.enabled")); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -230,4 +242,10 @@ func (s *Capture) Set() {

s.BroadcastPipeline = viper.GetString("broadcast_pipeline")
s.BroadcastUrl = viper.GetString("broadcast_url")

//
// screenshare
//

s.ScreenshareEnabled = viper.GetBool("screenshare.enabled")
}
12 changes: 12 additions & 0 deletions server/internal/types/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,23 @@ type StreamSinkManager interface {
GetSampleChannel() chan Sample
}

type StreamSrcSinkManager interface {
Codec() codec.RTPCodec

Start(codec codec.RTPCodec) error
Stop()

Push(bytes []byte)
Started() bool
GetSampleChannel() chan Sample
}

type CaptureManager interface {
Start()
Shutdown() error

Broadcast() BroadcastManager
Audio() StreamSinkManager
Video() StreamSinkManager
Screenshare() StreamSrcSinkManager
}
Loading

0 comments on commit c873d4d

Please sign in to comment.