Skip to content

Commit

Permalink
Pass webhook conversation token back to backend (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
ob-stripe authored Oct 8, 2019
1 parent 4d6778b commit 0c42937
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 34 deletions.
16 changes: 8 additions & 8 deletions pkg/proxy/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ type EndpointConfig struct {

// EndpointResponseHandler handles a response from the endpoint.
type EndpointResponseHandler interface {
ProcessResponse(string, string, *http.Response)
ProcessResponse(string, string, string, *http.Response)
}

// EndpointResponseHandlerFunc is an adapter to allow the use of ordinary
// functions as response handlers. If f is a function with the
// appropriate signature, ResponseHandler(f) is a
// ResponseHandler that calls f.
type EndpointResponseHandlerFunc func(string, string, *http.Response)
type EndpointResponseHandlerFunc func(string, string, string, *http.Response)

// ProcessResponse calls f(webhookID, resp).
func (f EndpointResponseHandlerFunc) ProcessResponse(webhookID, forwardURL string, resp *http.Response) {
f(webhookID, forwardURL, resp)
// ProcessResponse calls f(webhookID, webhookConversationID, resp).
func (f EndpointResponseHandlerFunc) ProcessResponse(webhookID, webhookConversationID, forwardURL string, resp *http.Response) {
f(webhookID, webhookConversationID, forwardURL, resp)
}

// EndpointClient is the client used to POST webhook requests to the local endpoint.
Expand Down Expand Up @@ -75,7 +75,7 @@ func (c *EndpointClient) SupportsEventType(connect bool, eventType string) bool
}

// Post sends a message to the local endpoint.
func (c *EndpointClient) Post(webhookID string, body string, headers map[string]string) error {
func (c *EndpointClient) Post(webhookID, webhookConversationID, body string, headers map[string]string) error {
c.cfg.Log.WithFields(log.Fields{
"prefix": "proxy.EndpointClient.Post",
}).Debug("Forwarding event to local endpoint")
Expand Down Expand Up @@ -113,7 +113,7 @@ func (c *EndpointClient) Post(webhookID string, body string, headers map[string]
}
defer resp.Body.Close()

c.cfg.ResponseHandler.ProcessResponse(webhookID, c.URL, resp)
c.cfg.ResponseHandler.ProcessResponse(webhookID, webhookConversationID, c.URL, resp)

return nil
}
Expand All @@ -136,7 +136,7 @@ func NewEndpointClient(url string, headers []string, connect bool, events []stri
}
}
if cfg.ResponseHandler == nil {
cfg.ResponseHandler = EndpointResponseHandlerFunc(func(string, string, *http.Response) {})
cfg.ResponseHandler = EndpointResponseHandlerFunc(func(string, string, string, *http.Response) {})
}

return &EndpointClient{
Expand Down
8 changes: 6 additions & 2 deletions pkg/proxy/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,43 @@ func TestClientHandler(t *testing.T) {
rcvBody := ""
rcvForwardURL := ""
rcvWebhookID := ""
rcvWebhookConversationID := ""
client := NewEndpointClient(
ts.URL,
[]string{" Host: hostname", "customHeader:customHeaderValue", "customHeader2: customHeaderValue 2",
"emptyHeader:", ":", "::", "removeControlCharacters: tab"}, // custom headers
false,
[]string{"*"},
&EndpointConfig{
ResponseHandler: EndpointResponseHandlerFunc(func(webhookID, forwardURL string, resp *http.Response) {
ResponseHandler: EndpointResponseHandlerFunc(func(webhookID, webhookConversationID, forwardURL string, resp *http.Response) {
buf, err := ioutil.ReadAll(resp.Body)
require.Nil(t, err)

rcvBody = string(buf)
rcvForwardURL = forwardURL
rcvWebhookID = webhookID
rcvWebhookConversationID = webhookConversationID

wg.Done()
}),
},
)

webhookID := "wh_123"
webhookConversationID := "wc_123"
payload := "{}"
headers := map[string]string{
"User-Agent": "TestAgent/v1",
"Stripe-Signature": "t=123,v1=hunter2",
}

err := client.Post(webhookID, payload, headers)
err := client.Post(webhookID, webhookConversationID, payload, headers)

wg.Wait()

require.Nil(t, err)
require.Equal(t, "OK!", rcvBody)
require.Equal(t, ts.URL, rcvForwardURL)
require.Equal(t, "wh_123", rcvWebhookID)
require.Equal(t, "wc_123", rcvWebhookConversationID)
}
23 changes: 18 additions & 5 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ func (p *Proxy) processWebhookEvent(msg websocket.IncomingMessage) {
webhookEvent := msg.WebhookEvent

p.cfg.Log.WithFields(log.Fields{
"prefix": "proxy.Proxy.processWebhookEvent",
"webhook_id": webhookEvent.WebhookID,
"prefix": "proxy.Proxy.processWebhookEvent",
"webhook_id": webhookEvent.WebhookID,
"webhook_converesation_id": webhookEvent.WebhookConversationID,
}).Debugf("Processing webhook event")

if p.filterWebhookEvent(webhookEvent) {
Expand Down Expand Up @@ -213,7 +214,12 @@ func (p *Proxy) processWebhookEvent(msg websocket.IncomingMessage) {

for _, endpoint := range p.endpointClients {
if endpoint.SupportsEventType(evt.isConnect(), evt.Type) {
go endpoint.Post(webhookEvent.WebhookID, webhookEvent.EventPayload, webhookEvent.HTTPHeaders)
go endpoint.Post(
webhookEvent.WebhookID,
webhookEvent.WebhookConversationID,
webhookEvent.EventPayload,
webhookEvent.HTTPHeaders,
)
}
}
}
Expand All @@ -222,7 +228,7 @@ func (p *Proxy) processWebhookEvent(msg websocket.IncomingMessage) {
// to pass back to Stripe
}

func (p *Proxy) processEndpointResponse(webhookID, forwardURL string, resp *http.Response) {
func (p *Proxy) processEndpointResponse(webhookID, webhookConversationID, forwardURL string, resp *http.Response) {
localTime := time.Now().Format(timeLayout)

color := ansi.Color(os.Stdout)
Expand Down Expand Up @@ -258,7 +264,14 @@ func (p *Proxy) processEndpointResponse(webhookID, forwardURL string, resp *http
}

if p.webSocketClient != nil {
msg := websocket.NewWebhookResponse(webhookID, forwardURL, resp.StatusCode, body, headers)
msg := websocket.NewWebhookResponse(
webhookID,
webhookConversationID,
forwardURL,
resp.StatusCode,
body,
headers,
)
p.webSocketClient.SendMessage(msg)
}
}
Expand Down
39 changes: 21 additions & 18 deletions pkg/websocket/webhook_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,37 @@ type WebhookEndpoint struct {

// WebhookEvent represents incoming webhook event messages sent by Stripe.
type WebhookEvent struct {
Endpoint WebhookEndpoint `json:"endpoint"`
EventPayload string `json:"event_payload"`
HTTPHeaders map[string]string `json:"http_headers"`
Type string `json:"type"`
WebhookID string `json:"webhook_id"`
Endpoint WebhookEndpoint `json:"endpoint"`
EventPayload string `json:"event_payload"`
HTTPHeaders map[string]string `json:"http_headers"`
Type string `json:"type"`
WebhookConversationID string `json:"webhook_conversation_id"`
WebhookID string `json:"webhook_id"`
}

// WebhookResponse represents outgoing webhook response messages sent to
// Stripe.
type WebhookResponse struct {
ForwardURL string `json:"forward_url"`
Status int `json:"status"`
HTTPHeaders map[string]string `json:"http_headers"`
Body string `json:"body"`
Type string `json:"type"`
WebhookID string `json:"webhook_id"`
ForwardURL string `json:"forward_url"`
Status int `json:"status"`
HTTPHeaders map[string]string `json:"http_headers"`
Body string `json:"body"`
Type string `json:"type"`
WebhookConversationID string `json:"webhook_conversation_id"`
WebhookID string `json:"webhook_id"`
}

// NewWebhookResponse returns a new webhookResponse message.
func NewWebhookResponse(webhookID string, forwardURL string, status int, body string, headers map[string]string) *OutgoingMessage {
func NewWebhookResponse(webhookID, webhookConversationID, forwardURL string, status int, body string, headers map[string]string) *OutgoingMessage {
return &OutgoingMessage{
WebhookResponse: &WebhookResponse{
WebhookID: webhookID,
ForwardURL: forwardURL,
Status: status,
Body: body,
HTTPHeaders: headers,
Type: "webhook_response",
WebhookID: webhookID,
WebhookConversationID: webhookConversationID,
ForwardURL: forwardURL,
Status: status,
Body: body,
HTTPHeaders: headers,
Type: "webhook_response",
},
}
}
7 changes: 6 additions & 1 deletion pkg/websocket/webhook_messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestUnmarshalWebhookEvent(t *testing.T) {
var data = `{"type": "webhook_event", "event_payload": "foo", "http_headers": {"Request-Header": "bar"}, "webhook_id": "wh_123"}`
var data = `{"type": "webhook_event", "event_payload": "foo", "http_headers": {"Request-Header": "bar"}, "webhook_id": "wh_123", "webhook_conversation_id": "wc_123"}`

var msg IncomingMessage
err := json.Unmarshal([]byte(data), &msg)
Expand All @@ -22,11 +22,13 @@ func TestUnmarshalWebhookEvent(t *testing.T) {
require.Equal(t, "bar", msg.WebhookEvent.HTTPHeaders["Request-Header"])
require.Equal(t, "webhook_event", msg.WebhookEvent.Type)
require.Equal(t, "wh_123", msg.WebhookEvent.WebhookID)
require.Equal(t, "wc_123", msg.WebhookEvent.WebhookConversationID)
}

func TestMarshalWebhookResponse(t *testing.T) {
msg := NewWebhookResponse(
"wh_123",
"wc_123",
"http://localhost:5000/webhooks",
200,
"foo",
Expand All @@ -38,6 +40,7 @@ func TestMarshalWebhookResponse(t *testing.T) {

json := string(buf)
require.Equal(t, "wh_123", gjson.Get(json, "webhook_id").String())
require.Equal(t, "wc_123", gjson.Get(json, "webhook_conversation_id").String())
require.Equal(t, "http://localhost:5000/webhooks", gjson.Get(json, "forward_url").String())
require.Equal(t, 200, int(gjson.Get(json, "status").Num))
require.Equal(t, "foo", gjson.Get(json, "body").String())
Expand All @@ -47,6 +50,7 @@ func TestMarshalWebhookResponse(t *testing.T) {
func TestNewWebhookResponse(t *testing.T) {
msg := NewWebhookResponse(
"wh_123",
"wc_123",
"http://localhost:5000/webhooks",
200,
"foo",
Expand All @@ -56,6 +60,7 @@ func TestNewWebhookResponse(t *testing.T) {
require.NotNil(t, msg.WebhookResponse)
require.Equal(t, "webhook_response", msg.Type)
require.Equal(t, "wh_123", msg.WebhookID)
require.Equal(t, "wc_123", msg.WebhookConversationID)
require.Equal(t, "http://localhost:5000/webhooks", msg.ForwardURL)
require.Equal(t, 200, msg.Status)
require.Equal(t, "foo", msg.Body)
Expand Down

0 comments on commit 0c42937

Please sign in to comment.