Skip to content

Commit

Permalink
Merge pull request #2266 from TheThingsNetwork/release/v3.6.3
Browse files Browse the repository at this point in the history
Release v3.6.3
  • Loading branch information
johanstokking authored Mar 30, 2020
2 parents a234ce3 + 3d8bdcd commit a377688
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 58 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security

## [3.6.3] (2020-03-30)

### Fixed

- Limited throughput in upstream handlers in Gateway Server when one gateway's upstream handler is busy.

## [3.6.2] (2020-03-19)

### Fixed
Expand Down Expand Up @@ -678,7 +684,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
NOTE: These links should respect backports. See https://github.com/TheThingsNetwork/lorawan-stack/pull/1444/files#r333379706.
-->

[unreleased]: https://github.com/TheThingsNetwork/lorawan-stack/compare/v3.6.2...HEAD
[unreleased]: https://github.com/TheThingsNetwork/lorawan-stack/compare/v3.6.3...HEAD
[3.6.3]: https://github.com/TheThingsNetwork/lorawan-stack/compare/v3.6.2...v3.6.3
[3.6.2]: https://github.com/TheThingsNetwork/lorawan-stack/compare/v3.6.1...v3.6.2
[3.6.1]: https://github.com/TheThingsNetwork/lorawan-stack/compare/v3.6.0...v3.6.1
[3.6.0]: https://github.com/TheThingsNetwork/lorawan-stack/compare/v3.5.3...v3.6.0
Expand Down
4 changes: 2 additions & 2 deletions cmd/internal/shared/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// Initialize global packages.
func Initialize(ctx context.Context, config config.ServiceBase) error {
func Initialize(ctx context.Context, config *config.ServiceBase) error {
// Fallback to the default Redis configuration for the cache system
if config.Cache.Redis.IsZero() {
config.Cache.Redis = config.Redis
Expand All @@ -31,7 +31,7 @@ func Initialize(ctx context.Context, config config.ServiceBase) error {
config.Events.Redis = config.Redis
}

if err := InitializeEvents(ctx, config); err != nil {
if err := InitializeEvents(ctx, *config); err != nil {
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/ttn-lw-cli/commands/simulate.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (m *simulateMetadataParams) setDefaults() error {
m.SpreadingFactor, m.Bandwidth = lora.SpreadingFactor, lora.Bandwidth
} else if m.DataRateIndex == 0 {
for i, dr := range phy.DataRates {
if dr.Rate.GetLoRa().SpreadingFactor == m.SpreadingFactor && dr.Rate.GetLoRa().Bandwidth == m.Bandwidth {
if lora := dr.Rate.GetLoRa(); lora != nil && lora.SpreadingFactor == m.SpreadingFactor && lora.Bandwidth == m.Bandwidth {
m.DataRateIndex = uint32(i)
break
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ttn-lw-stack/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var (
ctx = log.NewContext(ctx, logger)

// initialize shared packages
if err := shared.Initialize(ctx, config.ServiceBase); err != nil {
if err := shared.Initialize(ctx, &config.ServiceBase); err != nil {
return err
}

Expand Down
27 changes: 27 additions & 0 deletions config/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -3392,6 +3392,15 @@
"file": "format_protobufv2.go"
}
},
"error:pkg/gatewayserver/io/mqtt:mqtt_frontend_recovered": {
"translations": {
"en": "internal server error"
},
"description": {
"package": "pkg/gatewayserver/io/mqtt",
"file": "mqtt.go"
}
},
"error:pkg/gatewayserver/io/mqtt:not_authorized": {
"translations": {
"en": "not authorized"
Expand Down Expand Up @@ -3491,6 +3500,15 @@
"file": "firewall_ratelimit.go"
}
},
"error:pkg/gatewayserver/io/udp:udp_frontend_recovered": {
"translations": {
"en": "internal server error"
},
"description": {
"package": "pkg/gatewayserver/io/udp",
"file": "udp.go"
}
},
"error:pkg/gatewayserver/io:buffer_full": {
"translations": {
"en": "buffer is full"
Expand Down Expand Up @@ -3770,6 +3788,15 @@
"file": "gatewayserver.go"
}
},
"error:pkg/gatewayserver:handler_recovered": {
"translations": {
"en": "internal server error"
},
"description": {
"package": "pkg/gatewayserver",
"file": "gatewayserver.go"
}
},
"error:pkg/gatewayserver:host_handle": {
"translations": {
"en": "host `{host}` failed to handle message"
Expand Down
2 changes: 1 addition & 1 deletion doc/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pygmentsUseClasses = true
keywords = []
github_repository = "https://github.com/TheThingsNetwork/lorawan-stack"
github_repository_edit = "https://github.com/TheThingsNetwork/lorawan-stack/edit/master/doc/content"
version = "v3.6.2"
version = "v3.6.3"

[markup]
[markup.goldmark]
Expand Down
2 changes: 1 addition & 1 deletion doc/themes/the-things-stack/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "hugo-theme-the-things-stack",
"version": "3.6.2",
"version": "3.6.3",
"private": true,
"description": "Hugo Theme for The Things Stack",
"dependencies": {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ttn-stack",
"version": "3.6.2",
"version": "3.6.3",
"description": "The Things Stack",
"main": "index.js",
"repository": "https://github.com/TheThingsNetwork/lorawan-stack.git",
Expand Down
51 changes: 34 additions & 17 deletions pkg/gatewayserver/gatewayserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"net"
"net/http"
"os"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -480,8 +482,6 @@ var (
maxUpstreamHandlers = int32(1 << 5)
// upstreamHandlerIdleTimeout is the duration after which an idle upstream handler stops to save resources.
upstreamHandlerIdleTimeout = (1 << 7) * time.Millisecond
// upstreamHandlerBusyTimeout is the duration after traffic gets dropped if all upstream handlers are busy.
upstreamHandlerBusyTimeout = (1 << 6) * time.Millisecond
)

type upstreamHost struct {
Expand Down Expand Up @@ -511,6 +511,7 @@ func (gs *GatewayServer) handleUpstream(conn connectionEntry) {
}()

handleFn := func(host *upstreamHost) {
defer recoverHandler(ctx)
defer host.handleWg.Done()
defer atomic.AddInt32(&host.handlers, -1)
for {
Expand Down Expand Up @@ -579,7 +580,7 @@ func (gs *GatewayServer) handleUpstream(conn connectionEntry) {
}
return false
}
hosts = append(hosts, &upstreamHost{
host := &upstreamHost{
name: name,
handler: func(ids *ttnpb.EndDeviceIdentifiers) upstream.Handler {
if ids != nil && ids.DevAddr != nil && !passDevAddr(handler.GetDevAddrPrefixes(), *ids.DevAddr) {
Expand All @@ -588,10 +589,8 @@ func (gs *GatewayServer) handleUpstream(conn connectionEntry) {
return handler
},
handleCh: make(chan upstreamItem),
})
}

for _, host := range hosts {
}
hosts = append(hosts, host)
defer host.handleWg.Wait()
}

Expand Down Expand Up @@ -632,17 +631,17 @@ func (gs *GatewayServer) handleUpstream(conn connectionEntry) {
atomic.AddInt32(&host.handlers, 1)
host.handleWg.Add(1)
go handleFn(host)
go func(host *upstreamHost) {
host.handleCh <- item
}(host)
continue
}
select {
case host.handleCh <- item:
case <-time.After(upstreamHandlerBusyTimeout):
logger.WithField("name", host.name).Warn("Upstream handler busy, drop message")
switch msg := val.(type) {
case *ttnpb.UplinkMessage:
registerFailUplink(ctx, conn.Gateway(), msg, host.name)
case *ttnpb.GatewayStatus:
registerFailStatus(ctx, conn.Gateway(), msg, host.name)
}
logger.WithField("name", host.name).Warn("Upstream handler busy, drop message")
switch msg := val.(type) {
case *ttnpb.UplinkMessage:
registerFailUplink(ctx, conn.Gateway(), msg, host.name)
case *ttnpb.GatewayStatus:
registerFailStatus(ctx, conn.Gateway(), msg, host.name)
}
}
}
Expand Down Expand Up @@ -756,3 +755,21 @@ func (gs *GatewayServer) GetMQTTConfig(ctx context.Context) (*config.MQTT, error
}
return &config.MQTT, nil
}

var errHandlerRecovered = errors.DefineInternal("handler_recovered", "internal server error")

func recoverHandler(ctx context.Context) error {
if p := recover(); p != nil {
fmt.Fprintln(os.Stderr, p)
os.Stderr.Write(debug.Stack())
var err error
if pErr, ok := p.(error); ok {
err = errHandlerRecovered.WithCause(pErr)
} else {
err = errHandlerRecovered.WithAttributes("panic", p)
}
log.FromContext(ctx).WithError(err).Error("Handler failed")
return err
}
return nil
}
8 changes: 1 addition & 7 deletions pkg/gatewayserver/gatewayserver_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,4 @@

package gatewayserver

var (
ErrSchedule = errSchedule
)

func init() {
maxUpstreamHandlers = 1
}
var ErrSchedule = errSchedule
49 changes: 33 additions & 16 deletions pkg/gatewayserver/gatewayserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func TestGatewayServer(t *testing.T) {
for _, tc := range []struct {
Name string
Up *ttnpb.GatewayUp
Forwards []int // Indices of uplink messages in Up that are being forwarded.
Forwards []uint32 // Timestamps of uplink messages in Up that are being forwarded.
}{
{
Name: "GatewayStatus",
Expand Down Expand Up @@ -705,12 +705,12 @@ func TestGatewayServer(t *testing.T) {
},
CodingRate: "4/5",
Frequency: 867900000,
Timestamp: 4242000,
Timestamp: 100,
},
RxMetadata: []*ttnpb.RxMetadata{
{
GatewayIdentifiers: ids,
Timestamp: 4242000,
Timestamp: 100,
RSSI: -69,
ChannelRSSI: -69,
SNR: 11,
Expand All @@ -721,7 +721,7 @@ func TestGatewayServer(t *testing.T) {
},
},
},
Forwards: []int{0},
Forwards: []uint32{100},
},
{
Name: "OneValidFSK",
Expand All @@ -737,12 +737,12 @@ func TestGatewayServer(t *testing.T) {
},
},
Frequency: 867900000,
Timestamp: 4242000,
Timestamp: 100,
},
RxMetadata: []*ttnpb.RxMetadata{
{
GatewayIdentifiers: ids,
Timestamp: 4242000,
Timestamp: 100,
RSSI: -69,
ChannelRSSI: -69,
SNR: 11,
Expand All @@ -753,7 +753,7 @@ func TestGatewayServer(t *testing.T) {
},
},
},
Forwards: []int{0},
Forwards: []uint32{100},
},
{
Name: "OneGarbageWithStatus",
Expand All @@ -771,12 +771,12 @@ func TestGatewayServer(t *testing.T) {
},
CodingRate: "4/5",
Frequency: 868500000,
Timestamp: 1234560000,
Timestamp: 100,
},
RxMetadata: []*ttnpb.RxMetadata{
{
GatewayIdentifiers: ids,
Timestamp: 1234560000,
Timestamp: 100,
RSSI: -112,
ChannelRSSI: -112,
SNR: 2,
Expand All @@ -797,12 +797,12 @@ func TestGatewayServer(t *testing.T) {
},
CodingRate: "4/5",
Frequency: 868100000,
Timestamp: 4242000,
Timestamp: 200,
},
RxMetadata: []*ttnpb.RxMetadata{
{
GatewayIdentifiers: ids,
Timestamp: 4242000,
Timestamp: 200,
RSSI: -69,
ChannelRSSI: -69,
SNR: 11,
Expand All @@ -823,12 +823,12 @@ func TestGatewayServer(t *testing.T) {
},
CodingRate: "4/5",
Frequency: 867700000,
Timestamp: 2424000,
Timestamp: 300,
},
RxMetadata: []*ttnpb.RxMetadata{
{
GatewayIdentifiers: ids,
Timestamp: 2424000,
Timestamp: 300,
RSSI: -36,
ChannelRSSI: -36,
SNR: 5,
Expand All @@ -845,7 +845,7 @@ func TestGatewayServer(t *testing.T) {
Time: time.Unix(4242424, 0),
},
},
Forwards: []int{1, 2},
Forwards: []uint32{200, 300},
},
} {
t.Run(tc.Name, func(t *testing.T) {
Expand Down Expand Up @@ -879,10 +879,27 @@ func TestGatewayServer(t *testing.T) {
uplinkCount += len(tc.Up.UplinkMessages)
}

for _, msgIdx := range tc.Forwards {
notSeen := make(map[uint32]struct{})
for _, t := range tc.Forwards {
notSeen[t] = struct{}{}
}
for len(notSeen) > 0 {
select {
case msg := <-ns.Up():
expected := tc.Up.UplinkMessages[msgIdx]
var expected *ttnpb.UplinkMessage
for _, up := range tc.Up.UplinkMessages {
if ts := up.Settings.Timestamp; ts == msg.Settings.Timestamp {
if _, ok := notSeen[ts]; !ok {
t.Fatalf("Not expecting message %v", msg)
}
expected = up
delete(notSeen, ts)
break
}
}
if expected == nil {
t.Fatalf("Received unexpected message")
}
a.So(time.Since(msg.ReceivedAt), should.BeLessThan, timeout)
a.So(msg.Settings, should.Resemble, expected.Settings)
for _, md := range msg.RxMetadata {
Expand Down
Loading

0 comments on commit a377688

Please sign in to comment.