Skip to content

Commit

Permalink
Merge pull request #1190 from convox/20160904
Browse files Browse the repository at this point in the history
[RELEASE] 20160904
  • Loading branch information
ddollar authored Sep 5, 2016
2 parents a2fadb0 + 6e04141 commit cbc5330
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 54 deletions.
2 changes: 1 addition & 1 deletion api/controllers/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func AppLogs(ws *websocket.Conn) *httperr.Error {
err = models.Provider().LogStream(app, ws, structs.LogStreamOptions{
Filter: header.Get("Filter"),
Follow: follow,
Since: since,
Since: time.Now().Add(-1 * since),
})
if err != nil {
if strings.HasSuffix(err.Error(), "write: broken pipe") {
Expand Down
2 changes: 1 addition & 1 deletion api/controllers/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func SystemLogs(ws *websocket.Conn) *httperr.Error {
err = models.Provider().SystemLogs(ws, structs.LogStreamOptions{
Filter: header.Get("Filter"),
Follow: follow,
Since: since,
Since: time.Now().Add(-1 * since),
})
if err != nil {
return httperr.Server(err)
Expand Down
6 changes: 3 additions & 3 deletions api/structs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package structs
import "time"

type LogStreamOptions struct {
Filter string `json:"filter"`
Follow bool `json:"follow"`
Since time.Duration `json:"since"`
Filter string
Follow bool
Since time.Time
}
28 changes: 15 additions & 13 deletions provider/aws/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,25 @@ func (p *AWSProvider) CapacityGet() (*structs.Capacity, error) {

for _, service := range services {
if len(service.LoadBalancers) > 0 {
td, err := p.describeTaskDefinition(*service.TaskDefinition)
if err != nil {
log.Error(err)
return nil, err
}
for _, deployment := range service.Deployments {
td, err := p.describeTaskDefinition(*deployment.TaskDefinition)
if err != nil {
log.Error(err)
return nil, err
}

tdPorts := map[string]int64{}
tdPorts := map[string]int64{}

for _, cd := range td.ContainerDefinitions {
for _, pm := range cd.PortMappings {
tdPorts[fmt.Sprintf("%s.%d", *cd.Name, *pm.ContainerPort)] = *pm.HostPort
for _, cd := range td.ContainerDefinitions {
for _, pm := range cd.PortMappings {
tdPorts[fmt.Sprintf("%s.%d", *cd.Name, *pm.ContainerPort)] = *pm.HostPort
}
}
}

for _, lb := range service.LoadBalancers {
if port, ok := tdPorts[fmt.Sprintf("%s.%d", *lb.ContainerName, *lb.ContainerPort)]; ok {
portWidth[port] += *service.DesiredCount
for _, lb := range service.LoadBalancers {
if port, ok := tdPorts[fmt.Sprintf("%s.%d", *lb.ContainerName, *lb.ContainerPort)]; ok {
portWidth[port] += *deployment.DesiredCount
}
}
}
}
Expand Down
53 changes: 47 additions & 6 deletions provider/aws/capacity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ func TestCapacityGet(t *testing.T) {
cycleCapacityDescribeContainerInstances,
cycleCapacityListServices,
cycleCapacityDescribeServices,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition2,
cycleCapacityDescribeTaskDefinition1,
cycleCapacityDescribeTaskDefinition1,
)
defer provider.Close()

Expand All @@ -30,7 +31,7 @@ func TestCapacityGet(t *testing.T) {
ProcessCount: 2,
ProcessCPU: 400,
ProcessMemory: 512,
ProcessWidth: 2,
ProcessWidth: 3,
}, r)
}

Expand Down Expand Up @@ -145,12 +146,22 @@ var cycleCapacityDescribeServices = awsutil.Cycle{
"clusterArn": "cluster-test",
"serviceArn": "arn:aws:ecs:us-west-2:901416387788:service/convox-test-myapp-staging-worker-SCELGCIYSKF",
"deployments": [
{
"status": "ACTIVE",
"pendingCount": 0,
"createdAt": 1449511658.683,
"desiredCount": 2,
"taskDefinition": "arn:aws:ecs:us-west-2:901416387788:task-definition/convox-test-myapp-staging-worker:2",
"updatedAt": 1449511869.412,
"id": "ecs-svc/9223370587343117124",
"runningCount": 1
},
{
"status": "ACTIVE",
"pendingCount": 0,
"createdAt": 1449511658.683,
"desiredCount": 1,
"taskDefinition": "arn:aws:ecs:us-east-1:901416387788:task-definition/convox-test-myapp-staging-worker:1",
"taskDefinition": "arn:aws:ecs:us-west-2:901416387788:task-definition/convox-test-myapp-staging-worker:1",
"updatedAt": 1449511869.412,
"id": "ecs-svc/9223370587343117124",
"runningCount": 1
Expand All @@ -171,7 +182,7 @@ var cycleCapacityDescribeServices = awsutil.Cycle{
},
}

var cycleCapacityDescribeTaskDefinition = awsutil.Cycle{
var cycleCapacityDescribeTaskDefinition1 = awsutil.Cycle{
Request: awsutil.Request{
RequestURI: "/",
Operation: "AmazonEC2ContainerServiceV20141113.DescribeTaskDefinition",
Expand All @@ -187,7 +198,37 @@ var cycleCapacityDescribeTaskDefinition = awsutil.Cycle{
"name":"worker",
"cpu":200,
"memory":256,
"image":"test-image",
"image":"test-image:1",
"environment":[{"name":"PROCESS","value":"worker"}],
"mountPoints":[{"sourceVolume":"worker-0-0","readOnly":false,"containerPath":"/var/run/docker.sock"}],
"portMappings":[{"hostPort":5000,"containerPort":80}]
}
],
"volumes":[
{"host":{"sourcePath":"/var/run/docker.sock"},"name":"convox-test-myapp-staging-0-0"}
]
}
}`,
},
}

var cycleCapacityDescribeTaskDefinition2 = awsutil.Cycle{
Request: awsutil.Request{
RequestURI: "/",
Operation: "AmazonEC2ContainerServiceV20141113.DescribeTaskDefinition",
Body: `{"taskDefinition":"arn:aws:ecs:us-west-2:901416387788:task-definition/convox-test-myapp-staging-worker:2"}`,
},
Response: awsutil.Response{
StatusCode: 200,
Body: `{
"taskDefinition":{
"family":"convox-test-myapp-staging-worker",
"containerDefinitions":[
{
"name":"worker",
"cpu":200,
"memory":256,
"image":"test-image:2",
"environment":[{"name":"PROCESS","value":"worker"}],
"mountPoints":[{"sourceVolume":"worker-0-0","readOnly":false,"containerPath":"/var/run/docker.sock"}],
"portMappings":[{"hostPort":5000,"containerPort":80}]
Expand Down
25 changes: 15 additions & 10 deletions provider/aws/formation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ func TestFormationSave(t *testing.T) {
cycleCapacityDescribeContainerInstances,
cycleCapacityListServices,
cycleCapacityDescribeServices,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition2,
cycleCapacityDescribeTaskDefinition1,
cycleCapacityDescribeTaskDefinition1,
cycleNotificationPublish,
test.DescribeAppStackCycle("convox-httpd"),
cycleFormationUpdateStack,
Expand Down Expand Up @@ -258,8 +259,9 @@ func TestFormationSaveBadCount(t *testing.T) {
cycleCapacityDescribeContainerInstances,
cycleCapacityListServices,
cycleCapacityDescribeServices,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition2,
cycleCapacityDescribeTaskDefinition1,
cycleCapacityDescribeTaskDefinition1,
)
defer provider.Close()

Expand All @@ -280,8 +282,9 @@ func TestFormationSaveCpuTooSmall(t *testing.T) {
cycleCapacityDescribeContainerInstances,
cycleCapacityListServices,
cycleCapacityDescribeServices,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition2,
cycleCapacityDescribeTaskDefinition1,
cycleCapacityDescribeTaskDefinition1,
)
defer provider.Close()

Expand All @@ -302,8 +305,9 @@ func TestFormationSaveCpuTooLarge(t *testing.T) {
cycleCapacityDescribeContainerInstances,
cycleCapacityListServices,
cycleCapacityDescribeServices,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition2,
cycleCapacityDescribeTaskDefinition1,
cycleCapacityDescribeTaskDefinition1,
)
defer provider.Close()

Expand All @@ -324,8 +328,9 @@ func TestFormationSaveMemoryTooLarge(t *testing.T) {
cycleCapacityDescribeContainerInstances,
cycleCapacityListServices,
cycleCapacityDescribeServices,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition,
cycleCapacityDescribeTaskDefinition2,
cycleCapacityDescribeTaskDefinition1,
cycleCapacityDescribeTaskDefinition1,
)
defer provider.Close()

Expand Down
39 changes: 19 additions & 20 deletions provider/aws/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@ func (p *AWSProvider) LogStream(app string, w io.Writer, opts structs.LogStreamO
}

func (p *AWSProvider) subscribeLogs(w io.Writer, group string, opts structs.LogStreamOptions) error {
if opts.Since.Nanoseconds() == 0 {
opts.Since = 2 * time.Minute
}
since := opts.Since

since := 2 * time.Minute
if opts.Since.Nanoseconds() > 0 {
since = opts.Since
if since.IsZero() {
since = time.Now().Add(10 * time.Minute)
}

// number of milliseconds since Jan 1, 1970 00:00:00 UTC
start := time.Now().Add(-since).UnixNano() / int64(time.Millisecond)
start := since.UnixNano() / int64(time.Millisecond)

for {
s, err := p.fetchLogs(w, group, opts.Filter, start)
Expand All @@ -58,34 +55,30 @@ func (p *AWSProvider) fetchLogs(w io.Writer, group, filter string, start int64)
Interleaved: aws.Bool(true),
LogGroupName: aws.String(group),
StartTime: aws.Int64(start),
EndTime: aws.Int64(start + (1000 * 60 * 10)),
Limit: aws.Int64(10000),
}

if filter != "" {
req.FilterPattern = aws.String(filter)
}

events := []*cloudwatchlogs.FilteredLogEvent{}

for {
res, err := p.cloudwatchlogs().FilterLogEvents(req)
if ae, ok := err.(awserr.Error); ok && ae.Code() == "ThrottlingException" {
// backoff
log.Error(err)
time.Sleep(1 * time.Second)
time.Sleep(200 * time.Millisecond)
continue
}
if err != nil {
log.Error(err)
return 0, err
}

latest, err := p.writeLogEvents(w, res.Events)
if err != nil {
log.Error(err)
return 0, err
}

if latest > start {
start = latest
}
events = append(events, res.Events...)

if res.NextToken == nil {
break
Expand All @@ -94,8 +87,14 @@ func (p *AWSProvider) fetchLogs(w io.Writer, group, filter string, start int64)
req.NextToken = res.NextToken
}

log.Successf("end=%d", start)
return start, nil
latest, err := p.writeLogEvents(w, events)
if err != nil {
log.Error(err)
return 0, err
}

log.Successf("end=%d", latest)
return latest, nil
}

func (p *AWSProvider) writeLogEvents(w io.Writer, events []*cloudwatchlogs.FilteredLogEvent) (int64, error) {
Expand All @@ -117,7 +116,7 @@ func (p *AWSProvider) writeLogEvents(w io.Writer, events []*cloudwatchlogs.Filte

sec := *e.Timestamp / 1000
nsec := *e.Timestamp - (sec * 1000)
t := time.Unix(sec, nsec)
t := time.Unix(sec, nsec).UTC()
line := fmt.Sprintf("%s %s\n", t.Format(time.RFC3339), *e.Message)

if _, err := w.Write([]byte(line)); err != nil {
Expand Down
Loading

0 comments on commit cbc5330

Please sign in to comment.