Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Cleanup DLQ on create-consumer-group error (#332)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiran RG authored and thuningxu committed Nov 30, 2017
1 parent 371b784 commit 8390176
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 24 deletions.
18 changes: 15 additions & 3 deletions clients/metadata/metadata_cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,18 @@ func parseConsistency(cfgCons string) (lowCons gocql.Consistency, midCons gocql.

switch cons := strings.Split(cfgCons, ","); len(cons) {
case 3:
lowCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[2]))
lowCons = gocql.ParseConsistency(strings.TrimSpace(cons[2]))
fallthrough

case 2:
midCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[1]))
midCons = gocql.ParseConsistency(strings.TrimSpace(cons[1]))
if len(cons) == 2 {
lowCons = midCons
}
fallthrough

case 1:
highCons, _ = gocql.ParseConsistency(strings.TrimSpace(cons[0]))
highCons = gocql.ParseConsistency(strings.TrimSpace(cons[0]))
}

return
Expand Down Expand Up @@ -1350,6 +1350,18 @@ func (s *CassandraMetadataService) CreateConsumerGroupUUID(ctx thrift.Context, r

dstInfo, err := s.ReadDestination(nil, &shared.ReadDestinationRequest{Path: common.StringPtr(createRequest.GetDestinationPath())})
if err != nil {

if dlqUUID != nil {

if e := s.DeleteDestinationUUID(nil, &m.DeleteDestinationUUIDRequest{UUID: dlqUUID}); e != nil {
s.log.WithFields(bark.Fields{
common.TagDst: *dlqUUID,
common.TagCnsm: cgUUID,
common.TagErr: err,
}).Error(`CreateConsumerGroup - failed to cleanup DLQ destination`)
}
}

return nil, err
}
dstUUID := dstInfo.GetDestinationUUID()
Expand Down
2 changes: 1 addition & 1 deletion cmd/tools/cmq/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newMetadataClient(opts *opts) (*metadataClient, error) {
}
}

cluster.Consistency, _ = gocql.ParseConsistency(opts.Consistency)
cluster.Consistency = gocql.ParseConsistency(opts.Consistency)

cluster.NumConns = numConns
cluster.ProtoVersion = protocolVersion
Expand Down
40 changes: 21 additions & 19 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tools/cassandra/cqlclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newCQLClient(config *SchemaUpdaterConfig) (CQLClient, error) {
clusterCfg.Keyspace = config.Keyspace
clusterCfg.Timeout = defaultTimeout
clusterCfg.ProtoVersion = config.ProtoVersion
clusterCfg.Consistency, _ = gocql.ParseConsistency(defaultConsistency)
clusterCfg.Consistency = gocql.ParseConsistency(defaultConsistency)

if config.Username != "" && config.Password != "" {
clusterCfg.Authenticator = gocql.PasswordAuthenticator{
Expand Down

0 comments on commit 8390176

Please sign in to comment.