Skip to content

Commit

Permalink
api: make sure that entities are deleted immediately after a kick req…
Browse files Browse the repository at this point in the history
…uest
  • Loading branch information
aler9 committed Aug 12, 2021
1 parent da7f9c7 commit 6702cb4
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 70 deletions.
6 changes: 1 addition & 5 deletions internal/core/hls_remuxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,6 @@ func newHLSRemuxer(
return r
}

// ParentClose closes a Remuxer.
func (r *hlsRemuxer) ParentClose() {
r.log(logger.Info, "destroyed")
}

func (r *hlsRemuxer) Close() {
r.ctxCancel()
}
Expand All @@ -178,6 +173,7 @@ func (r *hlsRemuxer) PathName() string {

func (r *hlsRemuxer) run() {
defer r.wg.Done()
defer r.log(logger.Info, "destroyed")

remuxerCtx, remuxerCtxCancel := context.WithCancel(context.Background())
remuxerReady := make(chan struct{})
Expand Down
11 changes: 1 addition & 10 deletions internal/core/hls_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ outer:
if c2, ok := s.remuxers[c.PathName()]; !ok || c2 != c {
continue
}
s.doRemuxerClose(c)
delete(s.remuxers, c.PathName())

case <-s.ctx.Done():
break outer
Expand All @@ -128,10 +128,6 @@ outer:

s.ctxCancel()

for _, c := range s.remuxers {
s.doRemuxerClose(c)
}

hs.Shutdown(context.Background())

s.pathManager.OnHLSServer(nil)
Expand Down Expand Up @@ -234,11 +230,6 @@ func (s *hlsServer) findOrCreateRemuxer(pathName string) *hlsRemuxer {
return r
}

func (s *hlsServer) doRemuxerClose(c *hlsRemuxer) {
delete(s.remuxers, c.PathName())
c.ParentClose()
}

// OnRemuxerClose is called by hlsRemuxer.
func (s *hlsServer) OnRemuxerClose(c *hlsRemuxer) {
select {
Expand Down
6 changes: 1 addition & 5 deletions internal/core/rtmp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ func newRTMPConn(
return c
}

// ParentClose closes a Conn.
func (c *rtmpConn) ParentClose() {
c.log(logger.Info, "closed")
}

// Close closes a Conn.
func (c *rtmpConn) Close() {
c.ctxCancel()
Expand Down Expand Up @@ -156,6 +151,7 @@ func (c *rtmpConn) safeState() gortsplib.ServerSessionState {

func (c *rtmpConn) run() {
defer c.wg.Done()
defer c.log(logger.Info, "closed")

if c.runOnConnect != "" {
_, port, _ := net.SplitHostPort(c.rtspAddress)
Expand Down
13 changes: 2 additions & 11 deletions internal/core/rtmp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ outer:
if _, ok := s.conns[c]; !ok {
continue
}
s.doConnClose(c)
delete(s.conns, c)

case req := <-s.apiRTMPConnsList:
for c := range s.conns {
Expand All @@ -181,6 +181,7 @@ outer:
res := func() bool {
for c := range s.conns {
if c.ID() == req.ID {
delete(s.conns, c)
c.Close()
return true
}
Expand All @@ -201,10 +202,6 @@ outer:
s.ctxCancel()

s.l.Close()

for c := range s.conns {
s.doConnClose(c)
}
}

func (s *rtmpServer) newConnID() (string, error) {
Expand Down Expand Up @@ -235,12 +232,6 @@ func (s *rtmpServer) newConnID() (string, error) {
}
}

func (s *rtmpServer) doConnClose(c *rtmpConn) {
delete(s.conns, c)
c.ParentClose()
c.Close()
}

// OnConnClose is called by rtmpConn.
func (s *rtmpServer) OnConnClose(c *rtmpConn) {
select {
Expand Down
26 changes: 13 additions & 13 deletions internal/core/rtsp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,6 @@ func newRTSPConn(
return c
}

// ParentClose closes a Conn.
func (c *rtspConn) ParentClose(err error) {
if err != io.EOF && !isTeardownErr(err) && !isTerminatedErr(err) {
c.log(logger.Info, "ERR: %v", err)
}

c.log(logger.Info, "closed")

if c.onConnectCmd != nil {
c.onConnectCmd.Close()
}
}

func (c *rtspConn) log(level logger.Level, format string, args ...interface{}) {
c.parent.Log(level, "[conn %v] "+format, append([]interface{}{c.conn.NetConn().RemoteAddr()}, args...)...)
}
Expand Down Expand Up @@ -178,6 +165,19 @@ func (c *rtspConn) validateCredentials(
return nil
}

// OnClose is called by rtspServer.
func (c *rtspConn) OnClose(err error) {
if err != io.EOF && !isTeardownErr(err) && !isTerminatedErr(err) {
c.log(logger.Info, "ERR: %v", err)
}

c.log(logger.Info, "closed")

if c.onConnectCmd != nil {
c.onConnectCmd.Close()
}
}

// OnRequest is called by rtspServer.
func (c *rtspConn) OnRequest(req *base.Request) {
c.log(logger.Debug, "[c->s] %v", req)
Expand Down
14 changes: 9 additions & 5 deletions internal/core/rtsp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (s *rtspServer) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
delete(s.conns, ctx.Conn)
s.mutex.Unlock()

c.ParentClose(ctx.Error)
c.OnClose(ctx.Error)
}

// OnRequest implements gortsplib.ServerHandlerOnRequest.
Expand Down Expand Up @@ -281,7 +281,9 @@ func (s *rtspServer) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCt
delete(s.sessions, ctx.Session)
s.mutex.Unlock()

se.ParentClose()
if se != nil {
se.OnClose()
}
}

// OnDescribe implements gortsplib.ServerHandlerOnDescribe.
Expand Down Expand Up @@ -384,9 +386,11 @@ func (s *rtspServer) OnAPIRTSPSessionsKick(req apiRTSPSessionsKickReq) apiRTSPSe
s.mutex.RLock()
defer s.mutex.RUnlock()

for _, s := range s.sessions {
if s.ID() == req.ID {
s.Close()
for key, se := range s.sessions {
if se.ID() == req.ID {
se.Close()
delete(s.sessions, key)
se.OnClose()
return apiRTSPSessionsKickRes{}
}
}
Expand Down
42 changes: 21 additions & 21 deletions internal/core/rtsp_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,6 @@ func newRTSPSession(
return s
}

// ParentClose closes a Session.
func (s *rtspSession) ParentClose() {
if s.ss.State() == gortsplib.ServerSessionStateRead {
if s.onReadCmd != nil {
s.onReadCmd.Close()
}
}

switch s.ss.State() {
case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead:
s.path.OnReaderRemove(pathReaderRemoveReq{Author: s})
s.path = nil

case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish:
s.path.OnPublisherRemove(pathPublisherRemoveReq{Author: s})
s.path = nil
}

s.log(logger.Info, "closed")
}

// Close closes a Session.
func (s *rtspSession) Close() {
s.ss.Close()
Expand Down Expand Up @@ -125,6 +104,27 @@ func (s *rtspSession) log(level logger.Level, format string, args ...interface{}
s.parent.Log(level, "[session %s] "+format, append([]interface{}{s.id}, args...)...)
}

// OnClose is called by rtspServer.
func (s *rtspSession) OnClose() {
if s.ss.State() == gortsplib.ServerSessionStateRead {
if s.onReadCmd != nil {
s.onReadCmd.Close()
}
}

switch s.ss.State() {
case gortsplib.ServerSessionStatePreRead, gortsplib.ServerSessionStateRead:
s.path.OnReaderRemove(pathReaderRemoveReq{Author: s})
s.path = nil

case gortsplib.ServerSessionStatePrePublish, gortsplib.ServerSessionStatePublish:
s.path.OnPublisherRemove(pathPublisherRemoveReq{Author: s})
s.path = nil
}

s.log(logger.Info, "closed")
}

// OnAnnounce is called by rtspServer.
func (s *rtspSession) OnAnnounce(c *rtspConn, ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
res := s.pathManager.OnPublisherAnnounce(pathPublisherAnnounceReq{
Expand Down

0 comments on commit 6702cb4

Please sign in to comment.