Skip to content
This repository has been archived by the owner on Mar 29, 2023. It is now read-only.

Need an interface like createConsumer(String exchange, String queue, String routingKey) #15

Open
kolyneh opened this issue Feb 2, 2018 · 9 comments

Comments

@kolyneh
Copy link

kolyneh commented Feb 2, 2018

We need an interface like createConsumer(String exchange, String queue, String routingKey) to specify queue rather than a server-named one.
If not , is there any solution to create a cousumer with specified exchang, queue and routingKey?

@karlney
Copy link
Contributor

karlney commented Feb 6, 2018

@kolyneh The rxrabbit consumer does not create any queues or bindings per design. So if you need to set up any state on the broker (apart from channels and connections) you can either use the vanilla amqp-client interfaces or use the https://github.com/meltwater/rxrabbit/blob/master/rxrabbit/src/main/java/com/meltwater/rxrabbit/AdminChannel.java (which you get by calling https://github.com/meltwater/rxrabbit/blob/581ddb8bc7afaa4a8d7b6efb25c82b87fff5a692/rxrabbit/src/main/java/com/meltwater/rxrabbit/impl/DefaultChannelFactory.java#L90)

And then using that AdminChannel to

  • exchangeDeclare(...) // create exchange if not exists
  • queueDeclare(, ..) // create exchange if not exists
  • queueBind(..) //bind the queue and the exchange

After you have done these three calls an exchange a queue and a binding is either created or already present on the rabbit broker. Then you can just start consuming from the queue like:

createConsumeChannel()

@karlney
Copy link
Contributor

karlney commented Feb 6, 2018

If you just want a temporary/ad hoc queue, that is destroyed after disconnect you can use the createConsumeChannel(String exchange, String routingkey) method.

For an example see: https://github.com/meltwater/rxrabbit/blob/581ddb8bc7afaa4a8d7b6efb25c82b87fff5a692/rxrabbit/src/test/java/com/meltwater/rxrabbit/RxRabbitTests.java#L206

@karlney
Copy link
Contributor

karlney commented Feb 6, 2018

If these two scenarios does not cover your use case I need more information on what you are trying to achieve @kolyneh

@kolyneh
Copy link
Author

kolyneh commented Feb 27, 2018

@karlney Sorry for this late reply but thanks a lot!
I've made it by following your steps.

I'm use rxrabbit on Android.
And, I have changed all the code to support Rx2, and covert then to Kotlin.

Every thing is ok,
but after connection is created, i change the Wi-Fi connection and soon it crash.

Log:
E/AndroidRuntime: FATAL EXCEPTION: RxComputationThreadPool-1
Process: com.jingxun.cloud, PID: 16256
io.reactivex.exceptions.OnErrorNotImplementedException: Error while connecting to broker. host='192.168.18.70' port=5672 virtualHost='/' username='1518083055171101'
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onError(FlowableDoOnEach.java:111)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.checkTerminate(FlowableFlatMap.java:563)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:374)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.innerError(FlowableFlatMap.java:602)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onError(FlowableFlatMap.java:665)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runBackfused(FlowableObserveOn.java:446)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:172)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:818)
Caused by: com.meltwater.rxrabbit.impl.DefaultChannelFactory$ConnectionFailureException: Error while connecting to broker. host='192.168.18.70' port=5672 virtualHost='/' username='1518083055171101'
at com.meltwater.rxrabbit.impl.DefaultChannelFactory.createConnection(DefaultChannelFactory.kt:264)
at com.meltwater.rxrabbit.impl.DefaultChannelFactory.getOrCreateConnection(DefaultChannelFactory.kt:192)
at com.meltwater.rxrabbit.impl.DefaultChannelFactory.createChannel(DefaultChannelFactory.kt:164)
at com.meltwater.rxrabbit.impl.DefaultChannelFactory.createConsumeChannel(DefaultChannelFactory.kt:55)
at com.meltwater.rxrabbit.impl.SingleChannelConsumer.startConsuming(SingleChannelConsumer.kt:140)
at com.meltwater.rxrabbit.impl.SingleChannelConsumer.access$startConsuming(SingleChannelConsumer.kt:60)
at com.meltwater.rxrabbit.impl.SingleChannelConsumer$createObservable$1.subscribe(SingleChannelConsumer.kt:95)
at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
at io.reactivex.Flowable.subscribe(Flowable.java:13068)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.Flowable.subscribe(Flowable.java:13068)
at io.reactivex.internal.operators.flowable.FlowableDoOnEach.subscribeActual(FlowableDoOnEach.java:50)
at io.reactivex.Flowable.subscribe(Flowable.java:13068)
at io.reactivex.Flowable.subscribe(Flowable.java:13014)
at io.reactivex.internal.operators.flowable.FlowableRepeatWhen$WhenReceiver.onNext(FlowableRepeatWhen.java:101)
at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.tryEmit(FlowableFlatMap.java:282)
at io.reac

It's occur at this:
DefaultChannelFactory#createConnection

if (connection == null) { throw ConnectionFailureException(cf, lastException) }

How to deal with this?

@karlney
Copy link
Contributor

karlney commented Feb 27, 2018

@kolyneh I think the key is the OnErrorNotImplementedException part. What you should do is that when you get an error in your RxJava flow you need to start the re-connect loop, so that whenever you get another available connection (i.e switching over from WiFi to 4G network) then you re-connect to the broker using the new connection.

In the existing code that is handled in https://github.com/meltwater/rxrabbit/blob/581ddb8bc7afaa4a8d7b6efb25c82b87fff5a692/rxrabbit/src/main/java/com/meltwater/rxrabbit/impl/SingleChannelConsumer.java#L119 with the implementation in

https://github.com/meltwater/rxrabbit/blob/master/rxrabbit/src/main/java/com/meltwater/rxrabbit/impl/ConnectionRetryHandler.java

Without knowing how the new 2.x code you have now I don't think I can help you further.

TL;DR you have forgotten about error handling in one or more places - and the error handling is done by dropping the existing connections and consumers, and re-create them again when a new NW connection is established.

@kolyneh
Copy link
Author

kolyneh commented Feb 27, 2018

@karlney Seems there is only a BackoffAlgorithm interface that we can use to change the backoff behavior out of the box, right?

I found there is some param we can config in the origin rabbit codes:
connectionFactory.isAutomaticRecoveryEnabled = true
connectionFactory.isTopologyRecoveryEnabled = true

It works on my old project.

@karlney
Copy link
Contributor

karlney commented Feb 27, 2018

Yes you can implement your own BackoffAlgorithm.

Regarding the underlying connection factory settings - they might work but we found that they were unreliable in some cases - but they might work in your case so please go ahead and try them. You might have to change some code in the rx-rabbit ConnectionFactory to be able to set them - but it should not be difficult.

@kolyneh
Copy link
Author

kolyneh commented Feb 28, 2018

@karlney I have implement my BackoffAlgorithm like:

private val delay: AtomicInteger = AtomicInteger(-1)
override fun getDelayMs(attempts: Int?): Int {
val delayMs = delay.get()
delay.set(-1) // getDelayMs will run before we receive network change event
return delayMs
}
//Fade code:
if(“network is available”){
delay.set(5000)
}else{
delay.set(-1)
}

When network is available, delay is set to 5000, if not is set to -1.

And modify ConnectionRetryHandler like:
`override fun apply(observable: Flowable): Flowable<*> {
return observable.flatMap { throwable ->
val conAttempt = connectAttempt.get()
if (maxReconnectAttempts == ConsumerSettings.RETRY_FOREVER || conAttempt < maxReconnectAttempts) {
val delayMs = backoffAlgorithm.getDelayMs(conAttempt)
connectAttempt.incrementAndGet()
log.infoWithParams("Scheduling attempting to restart consumer",
"attempt", connectAttempt,
"delayMs", delayMs)

           if(delayMs<0){
                return@flatMap Flowable.empty<Long>()
            }

            return@flatMap Flowable.timer(delayMs.toLong(), TimeUnit.MILLISECONDS)
        } else {
            return@flatMap Flowable.error<Long>(throwable)
        }
    }
}`

Only added this code:
if(delayMs<0){ return@flatMap Flowable.empty<Long>() }

And run create connection when network is reconnect . It works.

But when publishing message, it crash , log:

java.lang.IllegalArgumentException: millis < 0: -1
at java.lang.Thread.sleep(Thread.java:1004)
at java.lang.Thread.sleep(Thread.java:985)
at com.meltwater.rxrabbit.impl.SingleChannelPublisher.getChannel(SingleChannelPublisher.kt:199)
at com.meltwater.rxrabbit.impl.SingleChannelPublisher.basicPublish(SingleChannelPublisher.kt:154)
at com.meltwater.rxrabbit.impl.SingleChannelPublisher.access$basicPublish(SingleChannelPublisher.kt:45)
at com.meltwater.rxrabbit.impl.SingleChannelPublisher$schedulePublish$1.run(SingleChannelPublisher.kt:137)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:818)

Where the code is :
try { Thread.sleep(backoffAlgorithm.getDelayMs(attempt - 1).toLong()) } catch (ignored: InterruptedException) { }

It's cause by this code in my BackoffAlgorithm :
delay.set(-1)

I add this code because getDelayMs will call before i receive network change event.
If don't set to -1, it is 5000 when getDelayMs is calling, then crash.

How to handle this?

@karlney
Copy link
Contributor

karlney commented Jun 25, 2018

@kolyneh did you sort this out yourself.

Otherwise I would just use:

Flowable.timer(Math.max(0L,delayMs.toLong()), TimeUnit.MILLISECONDS)
instead of
Flowable.timer(delayMs.toLong(), TimeUnit.MILLISECONDS)

The reason is that its a race condition to first do an if statement, and then act like the if argument is also true inside the if body in the absence if any locking mechanism.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants