-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scaffolding for realtime-to-realtime #3210
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments.
cmd/livepeer/starter/starter.go
Outdated
@@ -1348,6 +1348,15 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { | |||
if *cfg.Network != "offchain" { | |||
n.SetBasePriceForCap("default", core.Capability_SegmentAnything2, config.ModelID, autoPrice) | |||
} | |||
case "realtime-to-realtime": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about calling it realtime-ai
or realtime-inference
or realtime-video-ai
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming is the hardest thing in computer science :)
For reference here are the names of the other pipelines, which generally follow <input-type>To<output-type>
go-livepeer/core/capabilities.go
Lines 75 to 81 in 4390579
Capability_TextToImage Capability = 27 | |
Capability_ImageToImage Capability = 28 | |
Capability_ImageToVideo Capability = 29 | |
Capability_Upscale Capability = 30 | |
Capability_AudioToText Capability = 31 | |
Capability_SegmentAnything2 Capability = 32 | |
Capability_LLM Capability = 33 |
How about calling it realtime-ai or realtime-inference or realtime-video-ai?
To me, the -ai
or -inference
suffixes don't fit any of the existing patterns and don't convey information in this context since all other pipelines are AI inference. We may also have other types of 'live' pipelines, so I think we want to specify the output type here too.
Another thing I have been thinking - we have been using the realtime
terminology but I think live
fits a little better (and is more on-brand for Livepeer). Plus it is slightly shorter.
So one option might be to use a prefix, eg live-
for live-video-to-video
This makes the naming easily extendable to other areas too, eg live-audio-to-text
for live transcription, live-segment-anything-2
etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@j0sh I like the live prefixed input output naming scheme.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to live-video-to-video
in a92dacb
server/ai_mediaserver.go
Outdated
@@ -72,6 +72,7 @@ func startAIMediaServer(ls *LivepeerServer) error { | |||
ls.HTTPMux.Handle("/audio-to-text", oapiReqValidator(handle(ls, multipartDecoder[worker.GenAudioToTextMultipartRequestBody], processAudioToText))) | |||
ls.HTTPMux.Handle("/llm", oapiReqValidator(ls.LLM())) | |||
ls.HTTPMux.Handle("/segment-anything-2", oapiReqValidator(handle(ls, multipartDecoder[worker.GenSegmentAnything2MultipartRequestBody], processSegmentAnything2))) | |||
ls.HTTPMux.Handle("/realtime-start", ls.RealtimeStart()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will this endpoint be used for? Don't we need just one endpoint like /realtime-ai
where MediaMTX will send segments? Something similar to what you shared here: livepeer/ai-worker#209 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the endpoint that MediaMTX used to signal when a stream starts. Then we can kick off the RTMP pull from there (but that is not in this PR)
req := worker.StartRealtimeToRealtimeFormdataRequestBody{ | ||
// TODO set model and initial parameters here if necessary (eg, prompt) | ||
} | ||
resp, err := processAIRequest(ctx, params, req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should follow this AI flow here. I guess what we're trying to do is more similar to transcoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not? This selects an orchestrator and allows us to receive and process a response from it (so we know where to send and receive media from)
This method works for now - if you want to change it later as part of the selection work that is fine but this gets us going with the existing RPC mechanisms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I see what @leszko is saying, up until now we have just been sending in synchronous AI jobs, the pipeline runs and returns the result. Whereas here the request is to start up the realtime to realtime stream basically? That said I don't think it's a huge issue when there's lots of common code that we're able to reuse by following the same flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can "re-use" it for this starting of AI runner. But then, the next question would be, when does the runner container stops? Or who stops it? We don't need to solve it in this PR, but something to consider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep as @mjh1 says this basically makes a synchronous request using the normal AI inference path. This is pretty convenient, because it gives us an entry point to kick off the rest of the process - see #3211 for the beginnings of that.
I expect that we'd be building things like our job monitoring / re-selection, payments, etc somewhere near this 'main loop'
But then, the next question would be, when does the runner container stops? Or who stops it? We don't need to solve it in this PR, but something to consider.
This is would be a basic RPC request which can be done via whatever call-path makes sense. However, as far as job tear-down goes, I am not sure if we can rely on signaling for that (anything can disappear without notice), so we should ensure things still work well in the absence of explicit tear-down.
We'll need to solve a similar issue with calls to the model control API. The gateway will need some handle to the orchestrator session, the orchestrator to the (remote) worker, and possibly the worker to the runner (if runners have persistent job state). @victorges for visibility since this is his area
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is would be a basic RPC request which can be done via whatever call-path makes sense. However, as far as job tear-down goes, I am not sure if we can rely on signaling for that (anything can disappear without notice), so we should ensure things still work well in the absence of explicit tear-down.
Yeah, the best approach from the Distributed (and Decentralized) system perspective is what we do for Transcoding:
- The params are included in the stream (segments)
- The AI (Transcoding) pipelines is initialized when the first segment comes in
- The tear down is down on some timeout (so when there are no segments coming for x min, then it tears down)
Any signaling of warm up / tear down is an "optimization" on top of that. This approach is good for reliability, because a lot may happen, O may go down, we may swap the orchestrator, G may go down (before signaling something), etc.
CC: @victorges
Again, I think we can move this discussion outside this PR. The PR can be merged and we can discuss it separately.
a92dacb
to
31077ff
Compare
Rebased + force pushed to resolve merge conflicts with the latest ai-video. Opening this PR up for real so we can hopefully merge and build on top of it as a team to minimize further conflicts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments. When resolved and CI passes, then I think it's good enough to merge as a scaffold. Thanks @j0sh !
go.mod
Outdated
@@ -2,6 +2,8 @@ module github.com/livepeer/go-livepeer | |||
|
|||
go 1.21.5 | |||
|
|||
replace github.com/livepeer/ai-worker => github.com/j0sh/livepeer-ai-worker v0.0.0-20241017214425-b4f177ea52a7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to remove it before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, actually first we need livepeer/ai-worker#229 or maybe livepeer/ai-worker#233 first in order to define the pipeline schema
server/ai_mediaserver.go
Outdated
@@ -72,6 +72,7 @@ func startAIMediaServer(ls *LivepeerServer) error { | |||
ls.HTTPMux.Handle("/audio-to-text", oapiReqValidator(handle(ls, multipartDecoder[worker.GenAudioToTextMultipartRequestBody], processAudioToText))) | |||
ls.HTTPMux.Handle("/llm", oapiReqValidator(ls.LLM())) | |||
ls.HTTPMux.Handle("/segment-anything-2", oapiReqValidator(handle(ls, multipartDecoder[worker.GenSegmentAnything2MultipartRequestBody], processSegmentAnything2))) | |||
ls.HTTPMux.Handle("/live-video-start", ls.StartLiveVideo()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ls.HTTPMux.Handle("/live-video-start", ls.StartLiveVideo()) | |
// This is called by Media Server when stream is ready | |
ls.HTTPMux.Handle("/live-video-start", ls.StartLiveVideo()) |
- I'd add the comment that this endpoint is actually called by Media Server
- Maybe rename to match `/live-video-to-video
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added in 9e461a7
@victorges suggested using a /live/
prefix for these paths which sounds good to me so I did that here. Named it /live/video-to-video/start
since I suspect we will want to append other things to the route, like maybe /live/video-to-video/api
or thereabouts
req := worker.StartRealtimeToRealtimeFormdataRequestBody{ | ||
// TODO set model and initial parameters here if necessary (eg, prompt) | ||
} | ||
resp, err := processAIRequest(ctx, params, req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can "re-use" it for this starting of AI runner. But then, the next question would be, when does the runner container stops? Or who stops it? We don't need to solve it in this PR, but something to consider.
server/ai_mediaserver.go
Outdated
// TODO set model and initial parameters here if necessary (eg, prompt) | ||
} | ||
resp, err := processAIRequest(ctx, params, req) | ||
fmt.Println("stream", streamName, "resp", resp, "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to merge it into ai-video
, then please remove or change to glog
/ clog
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to clog in 9e461a7
btw I might send a PR in the near future to add slog-style parameter lists to clog since they are super convenient for ad-hoc KV pairs (in addition to the pre-defined clog.Add
method we have)
server/ai_http.go
Outdated
|
||
// skipping handleAIRequest for now until we have payments | ||
|
||
// whats the point of openapi if we have to do this manually |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to merge this PR, then please remove this comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit of late-night frustration that crept through :) These handlers are largely boilerplate that should be done for us by codegen. Removed in 9e461a7
server/ai_process.go
Outdated
if resp.JSON500 != nil { | ||
fmt.Println("json500", resp.JSON500) | ||
} | ||
fmt.Println("body is", string(resp.Body)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you plan to merge, then please change to clog
or remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed in 9e461a7
server/ai_process.go
Outdated
@@ -33,6 +33,7 @@ const defaultUpscaleModelID = "stabilityai/stable-diffusion-x4-upscaler" | |||
const defaultAudioToTextModelID = "openai/whisper-large-v3" | |||
const defaultLLMModelID = "meta-llama/llama-3.1-8B-Instruct" | |||
const defaultSegmentAnything2ModelID = "facebook/sam2-hiera-large" | |||
const defaultLiveVideoToVideoModelID = "stream-diffusion" // TODO what should this be? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess stream diffusion is not on huggingface, so you can leave it as stream-diffusion
or change to cumulo-autumn/stream-diffusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to cumulo-autumn/streamdiffusion
in 9e461a7
Hey @j0sh, just a heads up—I had to update the dependencies to ensure the network is compatible with the new Go version and other updated libraries. I found a combination that works, but if you need to use lower versions that are also compatible, feel free to adjust them. I'm flexible with it. I'm also improving the code through small pull requests so that it doesn't cause to many conflicts. If you run into problems or need me to rebase just ping me. |
9e461a7
to
e8f2568
Compare
Force pushed to fix up a bunch of merge conflicts with the latest ai-video branch Also squashed everything down to 1 commit to avoid a cascading series of merge conflicts Builds for this PR are broken for now until the pipeline on the worker / runner side is in, either in livepeer/ai-worker#229 or livepeer/ai-worker#233 |
Updated to remove the dependency on AI Worker changes. The scaffolding that is there now is now even less useful, but there is still the capability definition and pipeline path, both of which have caused merge conflicts recently as we've added new stuff If CI is green here, I'll go ahead and merge so it's easier to stay on the leading edge of the tree. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## ai-video #3210 +/- ##
===================================================
- Coverage 36.07820% 35.87705% -0.20115%
===================================================
Files 124 124
Lines 34525 34713 +188
===================================================
- Hits 12456 12454 -2
- Misses 21381 21570 +189
- Partials 688 689 +1
... and 1 file with indirect coverage changes Continue to review full report in Codecov by Sentry.
|
What does this pull request do? Explain your changes. (required)
Initial bits of the "live-video-to-video" pipeline (better name welcome)
Depends on livepeer/ai-worker#229 for the realtime schema
Adds a MediaMTX handler for the gateway
Adds realtime-to-realtime capability to the orchestrator
Adds selection support between G/O
Orchestrator will respond to the job with trickle endpoints (publish, subscribe)