Skip to content

Commit

Permalink
[#87][#88][#89] bug fixes to turbo and lifecycle. (#90)
Browse files Browse the repository at this point in the history
* [#87] [#88][#89] bugfixes to turbo and lifecycle.

* [#87][88] lifecycle and turbo bug fixes
  • Loading branch information
nandagopalan authored Jan 16, 2025
1 parent 1eeb036 commit 9d8c099
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 64 deletions.
7 changes: 5 additions & 2 deletions lifecycle/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ var ErrInvalidComponentState = errors.New("invalid component state")
type Component interface {
// Id is the unique identifier for the component.
Id() string
//OnChange is the function that will be called when the component state changes.
OnChange(prevState, newState ComponentState)
// OnChange is the function that will be called when the component state changes.
// It will be called with the previous state and the new state.
OnChange(f func(prevState, newState ComponentState))
// Start will starting the LifeCycle.
Start() error
// Stop will stop the LifeCycle.
Expand All @@ -47,6 +48,8 @@ type ComponentManager interface {
GetState(id string) ComponentState
//List will return a list of all the Components.
List() []Component
// OnChange is the function that will be called when the component state changes.
OnChange(id string, f func(prevState, newState ComponentState))
// Register will register a new Components.
Register(component Component) Component
// StartAll will start all the Components. Returns the number of components started
Expand Down
138 changes: 83 additions & 55 deletions lifecycle/simple_component.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (

// SimpleComponent is the struct that implements the Component interface.
type SimpleComponent struct {
// stateChangeFuncs
stateChangeFuncs []func(prevState, newState ComponentState)
//mutex
mutex sync.RWMutex
// CompId is the unique identifier for the component.
CompId string
// AfterStart is the function that will be called after the component is started
// The function will be called with the error returned by the StartFunc.
Expand All @@ -24,8 +29,6 @@ type SimpleComponent struct {
BeforeStop func()
// CompState is the current state of the component.
CompState ComponentState
// OnStateChange is the function that will be called when the component state changes.
OnStateChange func(prevState, newState ComponentState)
//StartFunc is the function that will be called when the component is started.
// It returns an error if the component failed to start.
StartFunc func() error
Expand All @@ -34,15 +37,13 @@ type SimpleComponent struct {
StopFunc func() error
}

// ComponentId is the unique identifier for the component.
func (sc *SimpleComponent) Id() string {
return sc.CompId
}

// OnChange is the function that will be called when the component state changes.
func (sc *SimpleComponent) OnChange(prevState, newState ComponentState) {
if sc.OnStateChange != nil {
sc.OnStateChange(prevState, newState)
// handleStateChange is the function that will be called when the component state changes.
func (sc *SimpleComponent) handleStateChange(prevState, newState ComponentState) {
// if sc.OnStateChange != nil {
// sc.OnStateChange(prevState, newState)
// }
for _, f := range sc.stateChangeFuncs {
f(prevState, newState)
}
if newState == Starting && sc.BeforeStart != nil {
sc.BeforeStart()
Expand All @@ -51,10 +52,22 @@ func (sc *SimpleComponent) OnChange(prevState, newState ComponentState) {
}
}

// ComponentId is the unique identifier for the component.
func (sc *SimpleComponent) Id() string {
return sc.CompId
}

// OnChange is the function that will be called when the component state changes.
func (sc *SimpleComponent) OnChange(f func(prevState, newState ComponentState)) {
sc.mutex.Lock()
defer sc.mutex.Unlock()
sc.stateChangeFuncs = append(sc.stateChangeFuncs, f)
}

// Start will starting the LifeCycle.
func (sc *SimpleComponent) Start() (err error) {
if sc.StartFunc != nil {
sc.OnChange(sc.CompState, Starting)
sc.handleStateChange(sc.CompState, Starting)
sc.CompState = Starting
err = sc.StartFunc()
if err != nil {
Expand All @@ -63,9 +76,7 @@ func (sc *SimpleComponent) Start() (err error) {
sc.CompState = Running

}
if sc.OnStateChange != nil {
sc.OnStateChange(Starting, sc.CompState)
}
sc.handleStateChange(Starting, sc.CompState)
if sc.AfterStart != nil {
sc.AfterStart(err)
}
Expand All @@ -77,17 +88,15 @@ func (sc *SimpleComponent) Start() (err error) {
// Stop will stop the LifeCycle.
func (sc *SimpleComponent) Stop() (err error) {
if sc.StopFunc != nil {
sc.OnChange(sc.CompState, Stopping)
sc.handleStateChange(sc.CompState, Stopping)
sc.CompState = Stopping
err = sc.StopFunc()
if err != nil {
sc.CompState = Error
} else {
sc.CompState = Stopped
}
if sc.OnStateChange != nil {
sc.OnStateChange(Stopping, sc.CompState)
}
sc.handleStateChange(Stopping, sc.CompState)
if sc.AfterStop != nil {
sc.AfterStop(err)

Expand All @@ -104,9 +113,10 @@ func (sc *SimpleComponent) State() ComponentState {

// SimpleComponentManager is the struct that manages the component.
type SimpleComponentManager struct {
components map[string]Component
cMutex *sync.RWMutex
waitChan chan struct{}
components map[string]Component
componentIds []string
cMutex *sync.RWMutex
waitChan chan struct{}
}

// GetState will return the current state of the LifeCycle for the component with the given id.
Expand All @@ -126,12 +136,22 @@ func (scm *SimpleComponentManager) List() []Component {
defer scm.cMutex.RUnlock()
// Create a slice of Component and iterate over the components map and append the components to the slice.
components := make([]Component, 0, len(scm.components))
for _, component := range scm.components {
components = append(components, component)
for _, compId := range scm.componentIds {
components = append(components, scm.components[compId])
}
return components
}

// OnChange is the function that will be called when the component state changes.
func (scm *SimpleComponentManager) OnChange(id string, f func(prevState, newState ComponentState)) {
scm.cMutex.Lock()
defer scm.cMutex.Unlock()
component, exists := scm.components[id]
if exists {
component.OnChange(f)
}
}

// Register will register a new Components.
// if the component is already registered, get the old component.
func (scm *SimpleComponentManager) Register(component Component) Component {
Expand All @@ -141,33 +161,11 @@ func (scm *SimpleComponentManager) Register(component Component) Component {
oldComponent, exists := scm.components[component.Id()]
if !exists {
scm.components[component.Id()] = component
scm.componentIds = append(scm.componentIds, component.Id())
}
return oldComponent
}

// StartAll will start all the Components. Returns the number of components started
func (scm *SimpleComponentManager) StartAll() error {
var err *errutils.MultiError = errutils.NewMultiErr(nil)
for id := range scm.components {
e := scm.Start(id)
if e != nil {
err.Add(e)
}
}
if err.HasErrors() {
return err
} else {
return nil
}
}

// StartAndWait will start all the Components. And will wait for them to be stopped.
func (scm *SimpleComponentManager) StartAndWait() {
scm.StartAll() // Start all the components
scm.Wait() // Wait for all the components to finish

}

// Start will start the LifeCycle for the component with the given id. It returns if the component was started.
func (scm *SimpleComponentManager) Start(id string) (err error) {
scm.cMutex.Lock()
Expand All @@ -190,14 +188,38 @@ func (scm *SimpleComponentManager) Start(id string) (err error) {
return ErrCompNotFound
}

// StartAll will start all the Components. Returns the number of components started
func (scm *SimpleComponentManager) StartAll() error {
var err *errutils.MultiError = errutils.NewMultiErr(nil)
for _, id := range scm.componentIds {
e := scm.Start(id)
if e != nil {
err.Add(e)
}
}
if err.HasErrors() {
return err
} else {
return nil
}
}

// StartAndWait will start all the Components. And will wait for them to be stopped.
func (scm *SimpleComponentManager) StartAndWait() {
scm.StartAll() // Start all the components
scm.Wait() // Wait for all the components to finish

}

// StopAll will stop all the Components.
func (scm *SimpleComponentManager) StopAll() error {
logger.InfoF("Stopping all components")
err := errutils.NewMultiErr(nil)
scm.cMutex.Lock()
defer scm.cMutex.Unlock()
wg := &sync.WaitGroup{}
for _, component := range scm.components {
for i := len(scm.componentIds) - 1; i >= 0; i-- {
component := scm.components[scm.componentIds[i]]
if component.State() == Running {
wg.Add(1)
go func(c Component, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -257,18 +279,24 @@ func (scm *SimpleComponentManager) Unregister(id string) {
component.Stop()
}
delete(scm.components, id)
for i, compId := range scm.componentIds {
if compId == id {
scm.componentIds = append(scm.componentIds[:i], scm.componentIds[i+1:]...)
break
}
}
}
}

// Wait will wait for all the Components to finish.
func (scm *SimpleComponentManager) Wait() {
go func() {
// Wait for a signal to stop the components.
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
<-signalChan
scm.StopAll()
}()
// go func() {
// // Wait for a signal to stop the components.
// signalChan := make(chan os.Signal, 1)
// signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
// <-signalChan
// scm.StopAll()
// }()
<-scm.waitChan

}
Expand Down
7 changes: 1 addition & 6 deletions turbo/turbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,7 @@ func GetPathParamAsBool(id string, r *http.Request) (bool, error) {

// GetQueryParam fetches the query parameters
func GetQueryParam(id string, r *http.Request) (string, error) {
val := r.URL.Query().Get(id)
if val == "" {
logger.ErrorF("Error Fetching Query Param %s", id)
return "err", fmt.Errorf("error fetching query param %s", id)
}
return val, nil
return r.URL.Query().Get(id), nil
}

// GetQueryParamAsInt fetches the int query parameters
Expand Down
2 changes: 1 addition & 1 deletion turbo/turbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func TestRouter_GetQueryParams(t *testing.T) {
id: "test3",
r: &http.Request{URL: strUrl},
},
want: "err",
want: "",
},
}
for _, tt := range tests {
Expand Down

0 comments on commit 9d8c099

Please sign in to comment.