Skip to content

Commit

Permalink
[#182]: fix: properly pass user-defined headers
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Nov 17, 2024
2 parents fb53018 + 889f863 commit 2c2f8fc
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 20 deletions.
16 changes: 16 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cel.dev/expr v0.15.0 h1:O1jzfJCQBfL5BFoYktaxwIhuttaQPsVWerH9/EEKx0w=
cel.dev/expr v0.15.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg=
cel.dev/expr v0.16.1/go.mod h1:AsGA5zb3WruAEQeQng1RZdGEXmBj0jvMWh6l5SnNuC8=
cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys=
cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk=
cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y=
Expand Down Expand Up @@ -169,6 +170,7 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
cloud.google.com/go/compute/metadata v0.5.0/go.mod h1:aHnloV2TPI38yx4s9+wAZhHykWvVCfu7hQbF+9CWoiY=
cloud.google.com/go/contactcenterinsights v1.10.0 h1:YR2aPedGVQPpFBZXJnPkqRj8M//8veIZZH5ZvICoXnI=
cloud.google.com/go/contactcenterinsights v1.11.1 h1:dEfCjtdYjS3n8/1HEKbJaOL31l3dEs3q9aeaNsyrJBc=
cloud.google.com/go/contactcenterinsights v1.11.1/go.mod h1:FeNP3Kg8iteKM80lMwSk3zZZKVxr+PGnAId6soKuXwE=
Expand Down Expand Up @@ -753,12 +755,14 @@ github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipa
github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9 h1:uDmaGzcdjhF4i/plgjmEsriH11Y0o7RKapEf/LDaM3w=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ=
Expand All @@ -778,11 +782,13 @@ github.com/envoyproxy/go-control-plane v0.11.1 h1:wSUXTlLfiAQRWs2F+p+EKOY9rUyis1
github.com/envoyproxy/go-control-plane v0.11.1/go.mod h1:uhMcXKCQMEJHiAb0w+YGefQLaTEw+YhGluxZkrTmD0g=
github.com/envoyproxy/go-control-plane v0.12.0 h1:4X+VP1GHd1Mhj6IB5mMeGbLCleqxjletLK6K0rbxyZI=
github.com/envoyproxy/go-control-plane v0.12.0/go.mod h1:ZBTaoJ23lqITozF0M6G4/IragXCQKCnYbmlmtHvwRG0=
github.com/envoyproxy/go-control-plane v0.13.0/go.mod h1:GRaKG3dwvFoTg4nj7aXdZnvMg4d7nvT/wl9WgVXn3Q8=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/go-fonts/dejavu v0.1.0 h1:JSajPXURYqpr+Cu8U9bt8K+XcACIHWqWrvWCKyeFmVQ=
Expand All @@ -809,9 +815,11 @@ github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4=
github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
Expand Down Expand Up @@ -950,6 +958,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/sftp v1.13.1 h1:I2qBYMChEhIjOgazfJmV3/mZM256btk6wkCDRmW7JYs=
github.com/pkg/sftp v1.13.6 h1:JFZT4XbOU7l77xGSpOdW+pwIMqP044IyjXX6FGyEKFo=
github.com/pkg/sftp v1.13.6/go.mod h1:tz1ryNURKu77RL+GuCzmoJYxQczL3wLNNpPWagdg4Qk=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA=
Expand All @@ -970,6 +979,7 @@ github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245 h1:K1Xf3bKttbF+koVGaX5xngRIZ5bVjbmPnaxE/dR08uY=
Expand All @@ -988,10 +998,12 @@ github.com/urfave/cli/v2 v2.23.0 h1:pkly7gKIeYv3olPAeNajNpLjeJrmTPYCoZWaV+2VfvE=
github.com/urfave/cli/v2 v2.23.0/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI=
github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs=
github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ=
github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ=
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM=
github.com/yuin/goldmark v1.3.5 h1:dPmz1Snjq0kmkz159iL7S6WzdahUTHnHB5M56WFVifs=
github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE=
Expand Down Expand Up @@ -1062,6 +1074,7 @@ golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.19.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
Expand All @@ -1076,6 +1089,7 @@ golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA=
golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand All @@ -1099,6 +1113,7 @@ golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0=
golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
Expand All @@ -1118,6 +1133,7 @@ golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c
golang.org/x/tools v0.23.0/go.mod h1:pnu6ufv6vQkll6szChhK3C3L/ruaIv5eBeztNG8wtsI=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/tools v0.27.0/go.mod h1:sUi0ZgbwW9ZPAq26Ekut+weQPR5eIM6GQLQ1Yjm1H0Q=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
Expand Down
8 changes: 4 additions & 4 deletions natsjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,10 @@ func (c *Driver) Push(ctx context.Context, job jobs.Message) error {
return errors.E(op, err)
}

_, err = c.jetstream.PublishMsg(ctx, &nats.Msg{
Data: data,
Subject: c.subject,
})
msg := nats.NewMsg(c.subject)
msg.Data = data
msg.Header = j.headers
_, err = c.jetstream.PublishMsg(ctx, msg)
if err != nil {
return errors.E(op, err)
}
Expand Down
2 changes: 1 addition & 1 deletion natsjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Driver) listenerStart() { //nolint:gocognit
}

item := &Item{}
c.unpack(m.Data(), item)
c.unpack(m.Data(), m.Headers(), item)

ctx := c.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers))
ctx, span := c.tracer.Tracer(tracerName).Start(ctx, "nats_listener")
Expand Down
5 changes: 3 additions & 2 deletions natsjobs/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ const (
auto string = "deduced_by_rr"
)

func (c *Driver) unpack(data []byte, item *Item) {
func (c *Driver) unpack(data []byte, headers map[string][]string, item *Item) {
err := json.Unmarshal(data, item)
item.headers = headers
if err != nil {
*item = Item{
Job: auto,
Ident: uuid.NewString(),
Payload: data,
headers: make(map[string][]string, 2),
headers: headers,
Options: &Options{
Priority: (*c.pipeline.Load()).Priority(),
Pipeline: (*c.pipeline.Load()).Name(),
Expand Down
36 changes: 36 additions & 0 deletions tests/configs/.rr-nats-headers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: '3'

rpc:
listen: tcp://127.0.0.1:6464

server:
command: "php php_test_files/jobs/jobs_ok_headers.php"
relay: "pipes"
relay_timeout: "20s"

nats:
addr: "nats://127.0.0.1:4222"

logs:
level: debug
mode: development

jobs:
num_pollers: 1
pool:
num_workers: 2
allocate_timeout: 60s
destroy_timeout: 60s

pipelines:
test-1:
driver: nats
config:
prefetch: 100
subject: "testheaders.*"
stream: "headers-test"
delete_after_ack: true
deliver_new: true
priority: 1

consume: [ "test-1" ]
2 changes: 0 additions & 2 deletions tests/env/docker-compose-nats.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.8"

services:
nats:
image: nats:latest
Expand Down
87 changes: 87 additions & 0 deletions tests/jobs_nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,93 @@ import (
"go.uber.org/zap"
)

func TestNATSHeaders(t *testing.T) {
cont := endure.New(slog.LevelDebug)

cfg := &config.Plugin{
Version: "v2024.2.0",
Path: "configs/.rr-nats-headers.yaml",
Prefix: "rr",
}

l, oLogger := mocklogger.ZapTestLogger(zap.DebugLevel)
err := cont.RegisterAll(
cfg,
&server.Plugin{},
&rpcPlugin.Plugin{},
l,
&jobs.Plugin{},
&resetter.Plugin{},
&informer.Plugin{},
&natsPlugin.Plugin{},
)
assert.NoError(t, err)

err = cont.Init()
if err != nil {
t.Fatal(err)
}

ch, err := cont.Serve()
if err != nil {
t.Fatal(err)
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

wg := &sync.WaitGroup{}
wg.Add(1)

stopCh := make(chan struct{}, 1)

go func() {
defer wg.Done()
for {
select {
case e := <-ch:
assert.Fail(t, "error", e.Error.Error())
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
case <-sig:
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
case <-stopCh:
// timeout
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}
return
}
}
}()

time.Sleep(time.Second * 3)
t.Run("PushPipeline", helpers.PushToPipe("test-1", false, "127.0.0.1:6464"))
time.Sleep(time.Second)

t.Run("DestroyPipeline", helpers.DestroyPipelines("127.0.0.1:6464", "test-1"))
stopCh <- struct{}{}
wg.Wait()

require.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job was processed successfully").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len())

t.Cleanup(func() {
errc := helpers.CleanupNats("nats://127.0.0.1:4222", "headers-test")
if errc != nil {
t.Log(errc)
}
})
}
func TestNATSInit(t *testing.T) {
cont := endure.New(slog.LevelDebug)

Expand Down
2 changes: 1 addition & 1 deletion tests/php_test_files/jobs/jobs_create_memory.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@
$consumer = new Spiral\RoadRunner\Jobs\Consumer();

while ($task = $consumer->waitTask()) {
$task->complete();
$task->ack();
}
4 changes: 2 additions & 2 deletions tests/php_test_files/jobs/jobs_err.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
$total_attempts = (int)$task->getHeaderLine("attempts") + 1;

if ($total_attempts > 3) {
$task->complete();
$task->ack();
} else {
$task->withHeader("attempts",$total_attempts)->withDelay(5)->fail("failed", true);
$task->withHeader("attempts", $total_attempts)->withDelay(5)->nack("failed", true);
}
} catch (\Throwable $e) {
$task->error((string)$e);
Expand Down
2 changes: 1 addition & 1 deletion tests/php_test_files/jobs/jobs_ok.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

while ($task = $consumer->waitTask()) {
try {
$task->complete();
$task->ack();
} catch (\Throwable $e) {
$task->error((string)$e);
}
Expand Down
32 changes: 32 additions & 0 deletions tests/php_test_files/jobs/jobs_ok_headers.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

/**
* @var Goridge\RelayInterface $relay
*/

use Spiral\Goridge;
use Spiral\RoadRunner;
use Spiral\Goridge\StreamRelay;
use Spiral\RoadRunner\Jobs\Consumer;
use Spiral\RoadRunner\Jobs\Serializer\JsonSerializer;

ini_set('display_errors', 'stderr');
require dirname(__DIR__) . "/vendor/autoload.php";

$consumer = new Spiral\RoadRunner\Jobs\Consumer();

while ($task = $consumer->waitTask()) {
try {
$h = $task->getHeader('test')[0] ?? 'undefined';
if ("test2" !== $h) {
throw new RuntimeException(sprintf(
"Expected header '%s', got '%s'",
"test2",
$h
));
}
$task->ack();
} catch (\Throwable $e) {
$task->error((string)$e);
}
}
2 changes: 1 addition & 1 deletion tests/php_test_files/jobs/jobs_ok_pq.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
while ($task = $consumer->waitTask()) {
try {
sleep(15);
$task->complete();
$task->ack();
} catch (\Throwable $e) {
$task->error((string)$e);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/php_test_files/jobs/jobs_ok_queue_name_exist.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
throw new RuntimeException('Queue name was not found');
}

$task->complete();
$task->ack();
} catch (\Throwable $e) {
$task->error((string)$e);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/php_test_files/jobs/jobs_ok_sleep1.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
while ($task = $consumer->waitTask()) {
try {
sleep(1);
$task->complete();
$task->ack();
} catch (\Throwable $e) {
$task->error((string)$e);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/php_test_files/jobs/jobs_ok_slow.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
while ($task = $consumer->waitTask()) {
try {
sleep(60);
$task->complete();
$task->ack();
} catch (\Throwable $e) {
$task->error((string)$e);
}
Expand Down
Loading

0 comments on commit 2c2f8fc

Please sign in to comment.