Skip to content

Commit

Permalink
Return Cancellable from bot.execute with callback. Stop updates liste…
Browse files Browse the repository at this point in the history
…ner correctly pengrad#345
  • Loading branch information
pengrad committed Oct 6, 2023
1 parent a04156d commit 7b2f754
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.pengrad.telegrambot;

public interface Cancellable {
void cancel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public <T extends BaseRequest<T, R>, R extends BaseResponse> R execute(BaseReque
return api.send(request);
}

public <T extends BaseRequest<T, R>, R extends BaseResponse> void execute(T request, Callback<T, R> callback) {
api.send(request, callback);
public <T extends BaseRequest<T, R>, R extends BaseResponse> Cancellable execute(T request, Callback<T, R> callback) {
return api.send(request, callback);
}

public String getToken() {
Expand Down Expand Up @@ -77,6 +77,7 @@ public void setUpdatesListener(UpdatesListener listener, ExceptionHandler except
}

public void setUpdatesListener(UpdatesListener listener, ExceptionHandler exceptionHandler, GetUpdates request) {
updatesHandler.stop();
updatesHandler.start(this, listener, exceptionHandler, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.gson.Gson;
import com.pengrad.telegrambot.Callback;
import com.pengrad.telegrambot.Cancellable;
import com.pengrad.telegrambot.model.request.InputFile;
import com.pengrad.telegrambot.request.BaseRequest;
import com.pengrad.telegrambot.response.BaseResponse;
Expand Down Expand Up @@ -37,9 +38,11 @@ public TelegramBotClient(OkHttpClient client, Gson gson, String baseUrl) {
this.clientWithTimeout = client;
}

public <T extends BaseRequest<T, R>, R extends BaseResponse> void send(final T request, final Callback<T, R> callback) {
public <T extends BaseRequest<T, R>, R extends BaseResponse> Cancellable send(final T request, final Callback<T, R> callback) {
OkHttpClient client = getOkHttpClient(request);
client.newCall(createRequest(request)).enqueue(new okhttp3.Callback() {

Call call = client.newCall(createRequest(request));
call.enqueue(new okhttp3.Callback() {
@Override
public void onResponse(Call call, Response response) {
R result = null;
Expand All @@ -64,6 +67,8 @@ public void onFailure(Call call, IOException e) {
callback.onFailure(request, e);
}
});

return call::cancel;
}

public <T extends BaseRequest<T, R>, R extends BaseResponse> R send(final BaseRequest<T, R> request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class UpdatesHandler {
private TelegramBot bot;
private UpdatesListener listener;
private ExceptionHandler exceptionHandler;
private Cancellable pendingRequest;

private final long sleepTimeout;

Expand All @@ -41,12 +42,16 @@ public void stop() {
bot = null;
listener = null;
exceptionHandler = null;
if (pendingRequest != null) {
pendingRequest.cancel();
pendingRequest = null;
}
}

private void getUpdates(GetUpdates request) {
if (bot == null || listener == null) return;

bot.execute(request, new Callback<GetUpdates, GetUpdatesResponse>() {
pendingRequest = bot.execute(request, new Callback<GetUpdates, GetUpdatesResponse>() {
@Override
public void onResponse(GetUpdates request, GetUpdatesResponse response) {
if (listener == null) return;
Expand Down Expand Up @@ -83,6 +88,10 @@ public void onResponse(GetUpdates request, GetUpdatesResponse response) {

@Override
public void onFailure(GetUpdates request, IOException e) {
// TODO: better way to identify canceled request
if (e.getMessage().equals("Canceled")) {
return;
}
if (exceptionHandler != null) {
exceptionHandler.onException(new TelegramException(e));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public void getChat() throws MalformedURLException, URISyntaxException {
chat = bot.execute(new GetChat(chatId)).chat();
assertNotNull(chat.firstName());
assertNull(chat.lastName());
assertEquals("yo", chat.bio());
assertNull(chat.bio());
assertTrue(chat.hasPrivateForwards());

chat = bot.execute(new GetChat(localGroup)).chat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.pengrad.telegrambot.model.*;

import com.pengrad.telegrambot.request.GetUpdates;
import org.junit.*;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
Expand All @@ -26,7 +28,7 @@ public static String token() {
String token;
try {
Properties properties = new Properties();
properties.load(new FileInputStream("local.properties"));
properties.load(Files.newInputStream(Paths.get("local.properties")));
token = properties.getProperty("TEST_TOKEN");
} catch (Exception e) {
token = System.getenv("TEST_TOKEN");
Expand All @@ -41,6 +43,34 @@ public void initBot() {
bot = new TelegramBot.Builder(token()).debug().build();
}

@Test
public void setAndRemoveListener() throws InterruptedException {
// Two simultaneous getUpdates requests throw error
// 409 Conflict: terminated by other getUpdates request; make sure that only one bot instance is running
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exception = new AtomicReference<>();
UpdatesListener listener = updates -> UpdatesListener.CONFIRMED_UPDATES_NONE;
ExceptionHandler exceptionHandler = e -> {
System.out.println("Exception in handler: " + e.getMessage());
exception.set(e);
latch.countDown();
};
bot.setUpdatesListener(listener, exceptionHandler);
// trigger another getUpdates and cancel first one
bot.setUpdatesListener(listener, exceptionHandler);
// cancel second one
bot.removeGetUpdatesListener();

UpdatesListener listener2 = updates -> {
latch.countDown();
return UpdatesListener.CONFIRMED_UPDATES_ALL;
};
bot.setUpdatesListener(listener2, exceptionHandler, new GetUpdates().offset(-1).allowedUpdates());
latch.await(3, TimeUnit.SECONDS);
bot.removeGetUpdatesListener();
assertNull(exception.get());
}

@Test
public void receiveUpdates() throws InterruptedException {
withLatch(1, latch -> {
Expand All @@ -55,8 +85,8 @@ public void receiveUpdates() throws InterruptedException {
.message("")
.build();
response = response.newBuilder().body(ResponseBody
.create(MediaType.parse("application/json"),
"{\"ok\":true,\"result\":[{\"update_id\":" + i.getAndIncrement() + "}]}"
.create("{\"ok\":true,\"result\":[{\"update_id\":" + i.getAndIncrement() + "}]}",
MediaType.parse("application/json")
)).build();
return response;
})
Expand Down

0 comments on commit 7b2f754

Please sign in to comment.