Skip to content

Commit

Permalink
Scaffolding for live-video-to-video (#3210)
Browse files Browse the repository at this point in the history
* realtime: Add MediaMTX handler to G, and O caps and signaling
  • Loading branch information
j0sh authored Oct 29, 2024
1 parent 4d966f8 commit 7bb0026
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
Capability_SegmentAnything2 Capability = 32
Capability_LLM Capability = 33
Capability_ImageToText Capability = 34
Capability_LiveVideoToVideo Capability = 35
)

var CapabilityNameLookup = map[Capability]string{
Expand Down Expand Up @@ -120,6 +121,7 @@ var CapabilityNameLookup = map[Capability]string{
Capability_SegmentAnything2: "Segment anything 2",
Capability_LLM: "Llm",
Capability_ImageToText: "Image to text",
Capability_LiveVideoToVideo: "Live video to video",
}

var CapabilityTestLookup = map[Capability]CapabilityTest{
Expand Down
30 changes: 30 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func startAIServer(lp lphttp) error {
lp.transRPC.Handle("/llm", oapiReqValidator(lp.LLM()))
lp.transRPC.Handle("/segment-anything-2", oapiReqValidator(lp.SegmentAnything2()))
lp.transRPC.Handle("/image-to-text", oapiReqValidator(lp.ImageToText()))
lp.transRPC.Handle("/live-video-to-video", oapiReqValidator(lp.StartLiveVideoToVideo()))
// Additionally, there is the '/aiResults' endpoint registered in server/rpc.go

return nil
Expand Down Expand Up @@ -236,6 +237,35 @@ func (h *lphttp) ImageToText() http.Handler {
}

handleAIRequest(ctx, w, r, orch, req)

})
}

func (h *lphttp) StartLiveVideoToVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

// skipping handleAIRequest for now until we have payments

var (
mid = string(core.RandomManifestID())
pubUrl = "/ai/live-video/" + mid
subUrl = pubUrl + "/out"
)
jsonData, err := json.Marshal(struct {
PublishUrl string
SubscribeUrl string
}{
PublishUrl: pubUrl,
SubscribeUrl: subUrl,
})
if err != nil {
respondWithError(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonData)
})
}

Expand Down
24 changes: 24 additions & 0 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func startAIMediaServer(ls *LivepeerServer) error {
ls.HTTPMux.Handle("/segment-anything-2", oapiReqValidator(handle(ls, multipartDecoder[worker.GenSegmentAnything2MultipartRequestBody], processSegmentAnything2)))
ls.HTTPMux.Handle("/image-to-text", oapiReqValidator(handle(ls, multipartDecoder[worker.GenImageToTextMultipartRequestBody], processImageToText)))

// This is called by the media server when the stream is ready
ls.HTTPMux.Handle("/live/video-to-video/start", ls.StartLiveVideo())

return nil
}

Expand Down Expand Up @@ -361,3 +364,24 @@ func (ls *LivepeerServer) ImageToVideoResult() http.Handler {
_ = json.NewEncoder(w).Encode(resp)
})
}

func (ls *LivepeerServer) StartLiveVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
streamName := r.FormValue("stream")
if streamName == "" {
http.Error(w, "Missing stream name", http.StatusBadRequest)
return
}
requestID := string(core.RandomManifestID())
params := aiRequestParams{
node: ls.LivepeerNode,
os: drivers.NodeStorage.NewSession(requestID),
sessManager: ls.AISessionManager,
}
ctx := clog.AddVal(r.Context(), "request_id", requestID)
// TODO set model and initial parameters here if necessary (eg, prompt)
req := struct{}{}
resp, err := processAIRequest(ctx, params, req)
clog.Infof(ctx, "Received live video AI request stream=%s resp=%v err=%v", streamName, resp, err)
})
}
25 changes: 25 additions & 0 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const defaultAudioToTextModelID = "openai/whisper-large-v3"
const defaultLLMModelID = "meta-llama/llama-3.1-8B-Instruct"
const defaultSegmentAnything2ModelID = "facebook/sam2-hiera-large"
const defaultImageToTextModelID = "Salesforce/blip-image-captioning-large"
const defaultLiveVideoToVideoModelID = "cumulo-autumn/stream-diffusion"

var errWrongFormat = fmt.Errorf("result not in correct format")

Expand Down Expand Up @@ -865,6 +866,19 @@ func submitAudioToText(ctx context.Context, params aiRequestParams, sess *AISess
return &res, nil
}

func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *AISession, req struct{ ModelId *string }) (any, error) {
//client, err := worker.NewClientWithResponses(sess.Transcoder(), worker.WithHTTPClient(httpClient))
var err error
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
// TODO check urls and add sess.Transcoder to the host if necessary
return nil, nil
}

func CalculateLLMLatencyScore(took time.Duration, tokensUsed int) float64 {
if tokensUsed <= 0 {
return 0
Expand Down Expand Up @@ -1204,6 +1218,17 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface
submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) {
return submitImageToText(ctx, params, sess, v)
}
/*
case worker.StartLiveVideoToVideoFormdataRequestBody:
cap = core.Capability_LiveVideoToVideo
modelID = defaultLiveVideoToVideoModelID
if v.ModelId != nil {
modelID = *v.ModelId
}
submitFn = func(ctx context.Context, params aiRequestParams, sess *AISession) (interface{}, error) {
return submitLiveVideoToVideo(ctx, params, sess, v)
}
*/
default:
return nil, fmt.Errorf("unsupported request type %T", req)
}
Expand Down

0 comments on commit 7bb0026

Please sign in to comment.