Skip to content

Commit

Permalink
feat: support in/out port in http
Browse files Browse the repository at this point in the history
  • Loading branch information
siyul-park committed Nov 23, 2023
1 parent 5128818 commit ee0b7cf
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 18 deletions.
34 changes: 30 additions & 4 deletions pkg/plugin/networkx/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type (
listener net.Listener
listenerNetwork string
ioPort *port.Port
inPort *port.Port
outPort *port.Port
errPort *port.Port
mu sync.RWMutex
}
Expand Down Expand Up @@ -268,6 +270,8 @@ func NewHTTPNode(config HTTPNodeConfig) *HTTPNode {
server: new(http.Server),
listenerNetwork: "tcp",
ioPort: port.New(),
inPort: port.New(),
outPort: port.New(),
errPort: port.New(),
}
n.server.Handler = n
Expand All @@ -289,6 +293,10 @@ func (n *HTTPNode) Port(name string) (*port.Port, bool) {
switch name {
case node.PortIO:
return n.ioPort, true
case node.PortIn:
return n.inPort, true
case node.PortOut:
return n.outPort, true
case node.PortErr:
return n.errPort, true
default:
Expand Down Expand Up @@ -350,6 +358,8 @@ func (n *HTTPNode) Close() error {
return err
}
n.ioPort.Close()
n.inPort.Close()
n.outPort.Close()
n.errPort.Close()

return nil
Expand All @@ -373,8 +383,9 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}()

outStream := n.ioPort.Open(proc)
inStream := n.ioPort.Open(proc)
ioStream := n.ioPort.Open(proc)
inStream := n.inPort.Open(proc)
outStream := n.outPort.Open(proc)

req, err := n.request(r)
if err != nil {
Expand All @@ -387,9 +398,24 @@ func (n *HTTPNode) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
outPck := packet.New(outPayload)
outStream.Send(outPck)

inPck, ok := <-inStream.Receive()
if ioStream.Links() > 0 {
ioStream.Send(outPck)
}
if outStream.Links() > 0 {
outStream.Send(outPck)
}
if ioStream.Links()+outStream.Links() == 0 {
return
}

var inPck *packet.Packet
var ok bool

select {
case inPck, ok = <-inStream.Receive():
case inPck, ok = <-ioStream.Receive():
}
if !ok {
_ = n.response(r, w, n.errorPayload(proc, ServiceUnavailable))
return
Expand Down
41 changes: 27 additions & 14 deletions pkg/plugin/networkx/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func TestHTTPNode_Port(t *testing.T) {
assert.True(t, ok)
assert.NotNil(t, p)

p, ok = n.Port(node.PortIn)
assert.True(t, ok)
assert.NotNil(t, p)

p, ok = n.Port(node.PortOut)
assert.True(t, ok)
assert.NotNil(t, p)

p, ok = n.Port(node.PortErr)
assert.True(t, ok)
assert.NotNil(t, p)
Expand Down Expand Up @@ -69,7 +77,7 @@ func TestHTTPNode_StartAndClose(t *testing.T) {
}

func TestHTTPNode_ServeHTTP(t *testing.T) {
t.Run("Hello World", func(t *testing.T) {
t.Run("IO", func(t *testing.T) {
n := NewHTTPNode(HTTPNodeConfig{})
defer func() { _ = n.Close() }()

Expand Down Expand Up @@ -105,29 +113,34 @@ func TestHTTPNode_ServeHTTP(t *testing.T) {
assert.Equal(t, "Hello World!", w.Body.String())
})

t.Run("HTTPError", func(t *testing.T) {
t.Run("In/Out", func(t *testing.T) {
n := NewHTTPNode(HTTPNodeConfig{})
defer func() { _ = n.Close() }()

httpErr := NotFound
in := port.New()
inPort, _ := n.Port(node.PortIn)
inPort.Link(in)

io := port.New()
ioPort, _ := n.Port(node.PortIO)
ioPort.Link(io)
out := port.New()
outPort, _ := n.Port(node.PortOut)
outPort.Link(out)

io.AddInitHook(port.InitHookFunc(func(proc *process.Process) {
ioStream := io.Open(proc)
out.AddInitHook(port.InitHookFunc(func(proc *process.Process) {
inStream := in.Open(proc)
outStream := out.Open(proc)

for {
inPck, ok := <-ioStream.Receive()
inPck, ok := <-outStream.Receive()
if !ok {
return
}

outPayload, _ := primitive.MarshalText(httpErr)
outPck := packet.New(outPayload)
outPck := packet.New(primitive.NewMap(
primitive.NewString("body"), primitive.NewString("Hello World!"),
primitive.NewString("status"), primitive.NewInt(200),
))
proc.Stack().Link(inPck.ID(), outPck.ID())
ioStream.Send(outPck)
inStream.Send(outPck)
}
}))

Expand All @@ -136,8 +149,8 @@ func TestHTTPNode_ServeHTTP(t *testing.T) {

n.ServeHTTP(w, r)

assert.Equal(t, httpErr.Status, w.Result().StatusCode)
assert.Equal(t, 200, w.Result().StatusCode)
assert.Equal(t, TextPlainCharsetUTF8, w.Header().Get(HeaderContentType))
assert.Equal(t, httpErr.Body.Interface(), w.Body.String())
assert.Equal(t, "Hello World!", w.Body.String())
})
}

0 comments on commit ee0b7cf

Please sign in to comment.