Skip to content

Commit

Permalink
refactor: divide duplicated code as hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Oct 3, 2024
1 parent b6cd9d9 commit 21010f1
Show file tree
Hide file tree
Showing 20 changed files with 211 additions and 193 deletions.
10 changes: 1 addition & 9 deletions docs/user_extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ builder.Register(AddToScheme())
scheme, _ := builder.Build()
```

### Running the Runtime Environment
## Running the Runtime Environment

With the schema created and registered, you can now set up the runtime environment and run workflows that include your new node type. Initialize the runtime environment with the schema and other required components:

Expand All @@ -330,11 +330,6 @@ defer r.Close()

This code creates a new runtime environment using the provided schema, hook, specification store, and secret store. The `defer` statement ensures that resources are cleaned up when done.

## Integration with Existing Services

To integrate the runtime environment with existing services and build an executable, you need to set up the environment to run continuously or in a simpler execution mode.

```go
func main() {
ctx := context.TODO()

Expand Down Expand Up @@ -374,6 +369,3 @@ func main() {
r.Load(ctx)
r.Reconcile(ctx)
}
```

This code keeps the runtime environment running and responsive to external signals. It uses `os.Signal` to listen for termination signals and safely shuts down the environment.
6 changes: 2 additions & 4 deletions docs/user_extensions_kr.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ defer r.Close()

위 코드에서는 `runtime.New`를 사용하여 새로운 런타임 환경을 생성하고, 필요한 모든 구성 요소를 설정합니다. `defer`를 사용하여 종료 시 리소스를 정리합니다.

## 기존 서비스와 통합
## 런타임 실행

이제 만든 런타임 환경을 기존 서비스에 통합하고, 다시 빌드하여 실행 파일을 생성해야 합니다.
이제 만든 런타임 환경을 저장소와 동기화하여 노드 명세를 컴파일하고 실행 가능한 상태로 만들어야 합니다.

```go
func main() {
Expand Down Expand Up @@ -375,5 +375,3 @@ func main() {
r.Reconcile(ctx)
}
```

위 코드에서는 런타임 환경을 지속적으로 실행하여 외부 신호에 반응하도록 설정합니다. `os.Signal`을 통해 종료 신호를 수신하면 런타임 환경을 안전하게 종료합니다.
19 changes: 10 additions & 9 deletions ext/pkg/network/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type HTTPNodeSpec struct {
// HTTPNode represents a node for making HTTP client requests.
type HTTPNode struct {
*node.OneToOneNode
client *http.Client
url *url.URL
timeout time.Duration
mu sync.RWMutex
Expand Down Expand Up @@ -65,7 +66,14 @@ func NewHTTPNodeCodec() scheme.Codec {

// NewHTTPNode creates a new HTTPNode instance.
func NewHTTPNode(url *url.URL) *HTTPNode {
n := &HTTPNode{url: url}
transport := &http.Transport{}
http2.ConfigureTransport(transport)

client := &http.Client{
Transport: transport,
}

n := &HTTPNode{client: client, url: url}
n.OneToOneNode = node.NewOneToOneNode(n.action)
return n
}
Expand Down Expand Up @@ -137,14 +145,7 @@ func (n *HTTPNode) action(proc *process.Process, inPck *packet.Packet) (*packet.
}
r = r.WithContext(ctx)

transport := &http.Transport{}
http2.ConfigureTransport(transport)

client := &http.Client{
Transport: transport,
}

w, err := client.Do(r)
w, err := n.client.Do(r)
if err != nil {
return nil, packet.New(types.NewError(err))
}
Expand Down
6 changes: 3 additions & 3 deletions ext/pkg/system/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ func (n *NativeNode) action(proc *process.Process, inPck *packet.Packet) (*packe

outPayloads := make([]types.Value, len(outs))
for i, out := range outs {
if outPayload, err := types.Marshal(out.Interface()); err != nil {
outPayload, err := types.Marshal(out.Interface())
if err != nil {
return nil, packet.New(types.NewError(err))
} else {
outPayloads[i] = outPayload
}
outPayloads[i] = outPayload
}

if len(outPayloads) == 0 {
Expand Down
1 change: 0 additions & 1 deletion ext/pkg/system/syscall.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func CreateSecrets(s secret.Store) func(context.Context, []*secret.Secret) ([]*s
return nil, err
}
return s.Load(ctx, secrets...)

}
}

Expand Down
12 changes: 3 additions & 9 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Agent struct {
frames map[uuid.UUID][]*Frame
inbounds map[uuid.UUID]map[string]port.Hook
outbounds map[uuid.UUID]map[string]port.Hook
watchers []Watcher
watchers Watchers
mu sync.RWMutex
}

Expand Down Expand Up @@ -252,10 +252,7 @@ func (a *Agent) hooks(proc *process.Process, sym *symbol.Symbol, in *port.InPort

a.mu.Unlock()

for i := len(watchers) - 1; i >= 0; i-- {
watcher := watchers[i]
watcher.OnFrame(frame)
}
watchers.OnFrame(frame)
})

outboundHook := packet.HookFunc(func(pck *packet.Packet) {
Expand Down Expand Up @@ -286,10 +283,7 @@ func (a *Agent) hooks(proc *process.Process, sym *symbol.Symbol, in *port.InPort

a.mu.Unlock()

for i := len(watchers) - 1; i >= 0; i-- {
watcher := watchers[i]
watcher.OnFrame(frame)
}
watchers.OnFrame(frame)
})

return inboundHook, outboundHook
Expand Down
28 changes: 22 additions & 6 deletions pkg/agent/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,51 @@ import "github.com/siyul-park/uniflow/pkg/process"

// Watcher defines methods for handling Frame and Process events.
type Watcher interface {
OnFrame(*Frame) // Called on Frame events.
OnProcess(*process.Process) // Called on Process events.
OnFrame(*Frame) // Triggered when a Frame event occurs.
OnProcess(*process.Process) // Triggered when a Process event occurs.
}

// Watchers is a slice of Watcher interfaces.
type Watchers []Watcher

type watcher struct {
onFrame func(*Frame)
onProcess func(*process.Process)
}

var _ Watcher = (Watchers)(nil)
var _ Watcher = (*watcher)(nil)

// NewFrameWatcher returns a Watcher for Frame events.
// NewFrameWatcher creates a Watcher for handling Frame events.
func NewFrameWatcher(handle func(*Frame)) Watcher {
return &watcher{onFrame: handle}
}

// NewProcessWatcher returns a Watcher for Process events.
// NewProcessWatcher creates a Watcher for handling Process events.
func NewProcessWatcher(handle func(*process.Process)) Watcher {
return &watcher{onProcess: handle}
}

func (w Watchers) OnFrame(frame *Frame) {
for _, watcher := range w {
watcher.OnFrame(frame)
}
}

func (w Watchers) OnProcess(proc *process.Process) {
for _, watcher := range w {
watcher.OnProcess(proc)
}
}

func (w *watcher) OnFrame(frame *Frame) {
if w.onFrame != nil {
w.onFrame(frame)
}
}

func (w *watcher) OnProcess(process *process.Process) {
func (w *watcher) OnProcess(proc *process.Process) {
if w.onProcess != nil {
w.onProcess(process)
w.onProcess(proc)
}
}
11 changes: 11 additions & 0 deletions pkg/packet/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,28 @@ type Hook interface {
Handle(*Packet)
}

// Hooks is a slice of Hook interfaces, allowing multiple Hooks to be handled together.
type Hooks []Hook

type hook struct {
handle func(*Packet)
}

var _ Hook = (Hooks)(nil)
var _ Hook = (*hook)(nil)

// HookFunc creates a new Hook using the provided function.
func HookFunc(handle func(*Packet)) Hook {
return &hook{handle: handle}
}

// Handle processes each packet using the Hooks in the slice.
func (h Hooks) Handle(pck *Packet) {
for _, hook := range h {
hook.Handle(pck)
}
}

func (h *hook) Handle(pck *Packet) {
h.handle(pck)
}
90 changes: 36 additions & 54 deletions pkg/packet/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (

// Reader represents a packet reader that manages incoming packets from multiple writers.
type Reader struct {
writers []*Writer
in chan *Packet
out chan *Packet
done chan struct{}
inboundHooks []Hook
outboundHooks []Hook
mu sync.Mutex
writers []*Writer
in chan *Packet
out chan *Packet
done chan struct{}
inbounds Hooks
outbounds Hooks
mu sync.Mutex
}

// NewReader creates a new Reader instance and starts its processing loop.
Expand All @@ -35,16 +35,6 @@ func NewReader() *Reader {
select {
case pck = <-r.in:
case <-r.done:
for {
w := r.writer()
if w == nil {
break
}

pck := New(types.NewError(ErrDroppedPacket))
r.outboundHook(pck)
w.receive(pck, r)
}
return
}

Expand Down Expand Up @@ -73,13 +63,12 @@ func (r *Reader) AddInboundHook(hook Hook) bool {
r.mu.Lock()
defer r.mu.Unlock()

for _, h := range r.inboundHooks {
for _, h := range r.inbounds {
if h == hook {
return false
}
}

r.inboundHooks = append(r.inboundHooks, hook)
r.inbounds = append(r.inbounds, hook)
return true
}

Expand All @@ -88,12 +77,12 @@ func (r *Reader) AddOutboundHook(hook Hook) bool {
r.mu.Lock()
defer r.mu.Unlock()

for _, h := range r.outboundHooks {
for _, h := range r.outbounds {
if h == hook {
return false
}
}
r.outboundHooks = append(r.outboundHooks, hook)
r.outbounds = append(r.outbounds, hook)
return true
}

Expand All @@ -104,12 +93,20 @@ func (r *Reader) Read() <-chan *Packet {

// Receive receives a packet from a writer and forwards it to the reader's input channel.
func (r *Reader) Receive(pck *Packet) bool {
w := r.writer()
if w == nil {
r.mu.Lock()

if len(r.writers) == 0 {
r.mu.Unlock()
return false
}

r.outboundHook(pck)
w := r.writers[0]
r.writers = r.writers[1:]

r.outbounds.Handle(pck)

r.mu.Unlock()

return w.receive(pck, r)
}

Expand All @@ -120,9 +117,21 @@ func (r *Reader) Close() {

select {
case <-r.done:
return
default:
close(r.done)
}

pck := New(types.NewError(ErrDroppedPacket))
for _, w := range r.writers {
r.outbounds.Handle(pck)
go w.receive(pck, r)
}

close(r.done)

r.writers = nil
r.inbounds = nil
r.outbounds = nil
}

func (r *Reader) write(pck *Packet, writer *Writer) bool {
Expand All @@ -135,35 +144,8 @@ func (r *Reader) write(pck *Packet, writer *Writer) bool {
default:
}

r.inboundHook(pck)
r.writers = append(r.writers, writer)
r.inbounds.Handle(pck)
r.in <- pck

return true
}

func (r *Reader) writer() *Writer {
r.mu.Lock()
defer r.mu.Unlock()

if len(r.writers) == 0 {
return nil
}

writer := r.writers[0]
r.writers = r.writers[1:]

return writer
}

func (r *Reader) inboundHook(pck *Packet) {
for _, hook := range r.inboundHooks {
hook.Handle(pck)
}
}

func (r *Reader) outboundHook(pck *Packet) {
for _, hook := range r.outboundHooks {
hook.Handle(pck)
}
}
Loading

0 comments on commit 21010f1

Please sign in to comment.