Skip to content

Commit

Permalink
release 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mskonovalov committed Jan 24, 2017
2 parents 103c71d + 41eca9d commit fa0d580
Show file tree
Hide file tree
Showing 9 changed files with 463 additions and 20 deletions.
12 changes: 12 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
language: java
sudo: false

jdk:
- oraclejdk8

script:
- ./gradlew --info check


notifications:
email: false
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Useful Hystrix addons


[![][travis img]][travis]

1. **ComposedHystrixMetricsPublisher**

According to Hystrix specification it is possible to register only one plugin of every type. This class is supposed to avoid this limitation and register list of plugins.
All of these plugins will be called sequentially in case of Hystrix events. Sometimes it can be useful.

Example:
```java
HystrixCodaHaleMetricsPublisher plugin1 = new HystrixCodaHaleMetricsPublisher(new MetricRegistry());
HystrixMetricsInitializationNotifier plugin2 = new HystrixMetricsInitializationNotifier();
ComposedHystrixMetricsPublisher composedPlugin = new ComposedHystrixMetricsPublisher(plugin1, plugin2);
HystrixPlugins.getInstance().registerMetricsPublisher(composedPlugin);
```

2. **HystrixAggregatedEventStream**

This event stream aggregates all Hystrix events for every Hystrix Command and exposes it as a rx-java Observable.
It also dynamically adds lazy-initialized commands so event stream is fully aggregated.

Example:
```java
HystrixMetricsInitializationNotifier initNotifier = new HystrixMetricsInitializationNotifier();
HystrixAggregatedEventStream aggregatedStream = new HystrixAggregatedEventStream(initNotifier, m -> true);
...
aggregatedStream.observe().map(<...>).filter(<...>);

```

[travis]:https://travis-ci.org/ringcentral/hystrix-addons
[travis img]:https://travis-ci.org/ringcentral/hystrix-addons.svg?branch=master
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apply plugin: 'java'
apply plugin: 'idea'

group = 'com.ringcentral.platform'
version = '1.0.0-SNAPSHOT'
version = '1.0.0'

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand All @@ -12,9 +12,11 @@ repositories {
}

dependencies {

compile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.5.5'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.21'
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.1.8'
testCompile group: 'org.mockito', name: 'mockito-core', version: '2.5.0'
}

buildscript {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@

import java.util.function.Predicate;

public class HystrixAggregatedEventStream implements HystrixMetricsInitializationListener, HystrixEventStream<HystrixCommandCompletion> {
/**
* Rx-Java event stream that aggregates all events for all commands and dynamically adds new type of events during it's work
*/
public class AggregatedHystrixCommandCompletionStream implements HystrixMetricsInitializationListener, HystrixEventStream<HystrixCommandCompletion> {

private static final Logger log = LoggerFactory.getLogger(HystrixAggregatedEventStream.class);
private static final Logger log = LoggerFactory.getLogger(AggregatedHystrixCommandCompletionStream.class);
private final Predicate<HystrixCommandMetrics> filter;
private final Subject<Observable<HystrixCommandCompletion>, Observable<HystrixCommandCompletion>> streams;
private final Observable<HystrixCommandCompletion> aggregatedStream;


public HystrixAggregatedEventStream(HystrixMetricsInitializationNotifier notifier, Predicate<HystrixCommandMetrics> filter) {
public AggregatedHystrixCommandCompletionStream(HystrixMetricsInitializationNotifier notifier, Predicate<HystrixCommandMetrics> filter) {
this.filter = filter;
streams = PublishSubject.<Observable<HystrixCommandCompletion>>create().toSerialized();
aggregatedStream = Observable.merge(streams).share();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,50 @@
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCollapser;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCommand;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

/**
* Publisher for metrics that allows to register several publishers and will publish every event into all of them
* This class is useful to avoid limitations that only 1 plugin can be registered in Hystrix.
*/
public class ComposedHystrixMetricsPublisher extends HystrixMetricsPublisher {

private static final Logger log = LoggerFactory.getLogger(ComposedHystrixMetricsPublisher.class);

/** list of registered plugins **/
private final List<HystrixMetricsPublisher> publishers;

public ComposedHystrixMetricsPublisher(HystrixMetricsPublisher... publishers) {
log.trace("Creating ComposedHystrixMetricsPublisher");
this.publishers = Arrays.asList(publishers);
}

@Override
public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(HystrixCommandKey commandKey, HystrixCommandGroupKey commandGroupKey, HystrixCommandMetrics metrics, HystrixCircuitBreaker circuitBreaker, HystrixCommandProperties properties) {
HystrixMetricsPublisherCommand[] array = publishers.stream().map(p ->
p.getMetricsPublisherForCommand(commandKey, commandGroupKey, metrics, circuitBreaker, properties)).toArray(HystrixMetricsPublisherCommand[]::new);
return () -> Stream.of(array).forEach(HystrixMetricsPublisherCommand::initialize);
Stream<HystrixMetricsPublisherCommand> stream = publishers.stream().map(p ->
p.getMetricsPublisherForCommand(commandKey, commandGroupKey, metrics, circuitBreaker, properties));
log.trace("getMetricsPublisherForCommand {}", commandKey.name());
return () -> stream.forEach(HystrixMetricsPublisherCommand::initialize);
}

@Override
public HystrixMetricsPublisherThreadPool getMetricsPublisherForThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolMetrics metrics, HystrixThreadPoolProperties properties) {
HystrixMetricsPublisherThreadPool[] array = publishers.stream().map(p ->
p.getMetricsPublisherForThreadPool(threadPoolKey, metrics, properties)).toArray(HystrixMetricsPublisherThreadPool[]::new);
return () -> Stream.of(array).forEach(HystrixMetricsPublisherThreadPool::initialize);
Stream<HystrixMetricsPublisherThreadPool> stream = publishers.stream().map(p ->
p.getMetricsPublisherForThreadPool(threadPoolKey, metrics, properties));
log.trace("getMetricsPublisherForThreadPool {}", threadPoolKey.name());
return () -> stream.forEach(HystrixMetricsPublisherThreadPool::initialize);
}

@Override
public HystrixMetricsPublisherCollapser getMetricsPublisherForCollapser(HystrixCollapserKey collapserKey, HystrixCollapserMetrics metrics, HystrixCollapserProperties properties) {
HystrixMetricsPublisherCollapser[] array = publishers.stream().map(p ->
p.getMetricsPublisherForCollapser(collapserKey, metrics, properties)).toArray(HystrixMetricsPublisherCollapser[]::new);
return () -> Stream.of(array).forEach(HystrixMetricsPublisherCollapser::initialize);
Stream<HystrixMetricsPublisherCollapser> stream = publishers.stream().map(p ->
p.getMetricsPublisherForCollapser(collapserKey, metrics, properties));
log.trace("getMetricsPublisherForCollapser {}", collapserKey.name());
return () -> stream.forEach(HystrixMetricsPublisherCollapser::initialize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
import com.netflix.hystrix.*;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* {@link #addListener(HystrixMetricsInitializationListener)} are not thread-safe
* Listener that notifies all the listeners about initialization of new command
*/
public class HystrixMetricsInitializationNotifier extends HystrixMetricsPublisher {

private final List<HystrixMetricsInitializationListener> listeners = new ArrayList<>();
private static final Logger log = LoggerFactory.getLogger(HystrixMetricsInitializationNotifier.class);
private final List<HystrixMetricsInitializationListener> listeners = new CopyOnWriteArrayList<>();

@Override
public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(
Expand All @@ -21,11 +24,15 @@ public HystrixMetricsPublisherCommand getMetricsPublisherForCommand(
HystrixCommandMetrics metrics,
HystrixCircuitBreaker circuitBreaker,
HystrixCommandProperties properties) {
return () -> listeners.forEach(listener -> listener.initialize(metrics));
log.debug("Notify {} listeners for command {} and group {}", listeners.size(),
commandKey != null ? commandKey.name() : null,
commandGroupKey != null ? commandGroupKey.name(): null);
return () -> listeners.forEach(listener -> listener.initialize(metrics));
}

public void addListener(HystrixMetricsInitializationListener listener) {
listeners.add(listener);
log.debug("Adding listener to HystrixMetricsInitializationNotifier");
listeners.add(listener);
}

}
123 changes: 123 additions & 0 deletions src/test/java/com/ringcentral/platform/hystrix/Command.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.ringcentral.platform.hystrix;

import com.netflix.hystrix.*;
import com.netflix.hystrix.exception.HystrixBadRequestException;

import java.util.concurrent.atomic.AtomicInteger;

public class Command extends HystrixCommand<Integer> {

private static final AtomicInteger uniqueId = new AtomicInteger(0);
private final String arg;
private final HystrixEventType executionResult;
private final int executionLatency;
private final HystrixEventType fallbackExecutionResult;
private final int fallbackExecutionLatency;

private Command(Setter setter, HystrixEventType executionResult, int executionLatency, String arg,
HystrixEventType fallbackExecutionResult, int fallbackExecutionLatency) {
super(setter);
this.executionResult = executionResult;
this.executionLatency = executionLatency;
this.fallbackExecutionResult = fallbackExecutionResult;
this.fallbackExecutionLatency = fallbackExecutionLatency;
this.arg = arg;
}

static Command from(HystrixCommandGroupKey groupKey, HystrixCommandKey key, HystrixEventType desiredEventType, int latency) {
return from(groupKey, key, desiredEventType, latency, HystrixCommandProperties.ExecutionIsolationStrategy.THREAD);
}

static Command from(HystrixCommandGroupKey groupKey, HystrixCommandKey key, HystrixEventType desiredEventType, int latency,
HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
return from(groupKey, key, desiredEventType, latency, isolationStrategy, HystrixEventType.FALLBACK_SUCCESS, 0);
}

static Command from(HystrixCommandGroupKey groupKey, HystrixCommandKey key, HystrixEventType desiredEventType, int latency,
HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy,
HystrixEventType desiredFallbackEventType, int fallbackLatency) {
Setter setter = Setter.withGroupKey(groupKey)
.andCommandKey(key)
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(600)
.withExecutionIsolationStrategy(isolationStrategy)
.withCircuitBreakerEnabled(true)
.withCircuitBreakerRequestVolumeThreshold(3)
.withMetricsHealthSnapshotIntervalInMilliseconds(100)
.withMetricsRollingStatisticalWindowInMilliseconds(1000)
.withMetricsRollingStatisticalWindowBuckets(10)
.withRequestCacheEnabled(true)
.withRequestLogEnabled(true)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(5))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(groupKey.name()))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(10)
.withMaxQueueSize(-1));

String uniqueArg;

switch (desiredEventType) {
case SUCCESS:
uniqueArg = uniqueId.incrementAndGet() + "";
return new Command(setter, HystrixEventType.SUCCESS, latency, uniqueArg, desiredFallbackEventType, 0);
case FAILURE:
uniqueArg = uniqueId.incrementAndGet() + "";
return new Command(setter, HystrixEventType.FAILURE, latency, uniqueArg, desiredFallbackEventType, fallbackLatency);
case TIMEOUT:
uniqueArg = uniqueId.incrementAndGet() + "";
return new Command(setter, HystrixEventType.SUCCESS, 700, uniqueArg, desiredFallbackEventType, fallbackLatency);
case BAD_REQUEST:
uniqueArg = uniqueId.incrementAndGet() + "";
return new Command(setter, HystrixEventType.BAD_REQUEST, latency, uniqueArg, desiredFallbackEventType, 0);
case RESPONSE_FROM_CACHE:
String arg = uniqueId.get() + "";
return new Command(setter, HystrixEventType.SUCCESS, 0, arg, desiredFallbackEventType, 0);
default:
throw new RuntimeException("not supported yet");
}
}

@Override
protected Integer run() throws Exception {
try {
Thread.sleep(executionLatency);
switch (executionResult) {
case SUCCESS:
return 1;
case FAILURE:
throw new RuntimeException("induced failure");
case BAD_REQUEST:
throw new HystrixBadRequestException("induced bad request");
default:
throw new RuntimeException("unhandled HystrixEventType : " + executionResult);
}
} catch (InterruptedException ex) {
System.out.println("Received InterruptedException : " + ex);
throw ex;
}
}

@Override
protected Integer getFallback() {
try {
Thread.sleep(fallbackExecutionLatency);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
switch (fallbackExecutionResult) {
case FALLBACK_SUCCESS:
return -1;
case FALLBACK_FAILURE:
throw new RuntimeException("induced failure");
case FALLBACK_MISSING:
throw new UnsupportedOperationException("fallback not defined");
default:
throw new RuntimeException("unhandled HystrixEventType : " + fallbackExecutionResult);
}
}

@Override
protected String getCacheKey() {
return arg;
}
}
Loading

0 comments on commit fa0d580

Please sign in to comment.