-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* refactor(station): replace timed http request with websockets * feat(station): send InitMessage on open connection * refactor: Use jakarta websockets * feat: re-add http fetcher & move WS to own package * fix(ws-fetcher): catch client creation error * feat(ws-fetcher): open WS after setting handlers * feat(ws-fetcher): retrieve missing values for detections from DB & try to fix simplePositioner cursedness with threading * fix(simplePositioner): make handle function synchronised * fix(gradle): replace impl with jersey one is more in line with what we use for our server impl * fix(ws-fetcher): use lombok annotation * fix(ws-fetcher): baton mac's to uppercase * ingrease message size --------- Co-authored-by: FKD13 <44001949+FKD13@users.noreply.github.com>
- Loading branch information
1 parent
29daae2
commit dfd4510
Showing
13 changed files
with
518 additions
and
206 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,182 +1,14 @@ | ||
package telraam.station; | ||
|
||
import org.jdbi.v3.core.Jdbi; | ||
import telraam.database.daos.BatonDAO; | ||
import telraam.database.daos.DetectionDAO; | ||
import telraam.database.daos.StationDAO; | ||
import telraam.database.models.Baton; | ||
import telraam.database.models.Detection; | ||
import telraam.database.models.Station; | ||
import telraam.logic.lapper.Lapper; | ||
import telraam.logic.positioner.Positioner; | ||
import telraam.station.models.RonnyDetection; | ||
import telraam.station.models.RonnyResponse; | ||
|
||
import java.io.IOException; | ||
import java.net.ConnectException; | ||
import java.net.URI; | ||
import java.net.URISyntaxException; | ||
import java.net.http.HttpClient; | ||
import java.net.http.HttpConnectTimeoutException; | ||
import java.net.http.HttpRequest; | ||
import java.net.http.HttpResponse; | ||
import java.sql.Timestamp; | ||
import java.time.Duration; | ||
import java.util.*; | ||
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
import java.util.logging.Logger; | ||
import java.util.stream.Collectors; | ||
|
||
public class Fetcher { | ||
private final Set<Lapper> lappers; | ||
private final Set<Positioner> positioners; | ||
private Station station; | ||
|
||
private final BatonDAO batonDAO; | ||
private final DetectionDAO detectionDAO; | ||
private final StationDAO stationDAO; | ||
|
||
private final HttpClient client = HttpClient.newHttpClient(); | ||
private final Logger logger = Logger.getLogger(Fetcher.class.getName()); | ||
|
||
public interface Fetcher { | ||
//Timeout to wait for before sending the next request after an error. | ||
private final static int ERROR_TIMEOUT_MS = 2000; | ||
int ERROR_TIMEOUT_MS = 2000; | ||
//Timeout for a request to a station. | ||
private final static int REQUEST_TIMEOUT_S = 10; | ||
int REQUEST_TIMEOUT_S = 10; | ||
//Full batch size, if this number of detections is reached, more are probably available immediately. | ||
private final static int FULL_BATCH_SIZE = 1000; | ||
int FULL_BATCH_SIZE = 1000; | ||
//Timeout when result has less than a full batch of detections. | ||
private final static int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds | ||
|
||
|
||
public Fetcher(Jdbi database, Station station, Set<Lapper> lappers, Set<Positioner> positioners) { | ||
this.batonDAO = database.onDemand(BatonDAO.class); | ||
this.detectionDAO = database.onDemand(DetectionDAO.class); | ||
this.stationDAO = database.onDemand(StationDAO.class); | ||
|
||
this.lappers = lappers; | ||
this.positioners = positioners; | ||
this.station = station; | ||
} | ||
|
||
public void fetch() { | ||
logger.info("Running Fetcher for station(" + this.station.getId() + ")"); | ||
JsonBodyHandler<RonnyResponse> bodyHandler = new JsonBodyHandler<>(RonnyResponse.class); | ||
|
||
while (true) { | ||
//Update the station to account for possible changes in the database | ||
this.stationDAO.getById(station.getId()).ifPresentOrElse( | ||
station -> this.station = station, | ||
() -> this.logger.severe("Can't update station from database.") | ||
); | ||
|
||
//Get last detection id | ||
int lastDetectionId = 0; | ||
Optional<Detection> lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId()); | ||
if (lastDetection.isPresent()) { | ||
lastDetectionId = lastDetection.get().getRemoteId(); | ||
} | ||
|
||
//Create URL | ||
URI url; | ||
try { | ||
url = new URI(station.getUrl() + "/detections/" + lastDetectionId); | ||
} catch (URISyntaxException ex) { | ||
this.logger.severe(ex.getMessage()); | ||
try { | ||
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); | ||
} catch (InterruptedException e) { | ||
logger.severe(e.getMessage()); | ||
} | ||
continue; | ||
} | ||
|
||
//Create request | ||
HttpRequest request; | ||
try { | ||
request = HttpRequest.newBuilder() | ||
.uri(url) | ||
.version(HttpClient.Version.HTTP_1_1) | ||
.timeout(Duration.ofSeconds(Fetcher.REQUEST_TIMEOUT_S)) | ||
.build(); | ||
} catch (IllegalArgumentException e) { | ||
logger.severe(e.getMessage()); | ||
try { | ||
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); | ||
} catch (InterruptedException ex) { | ||
logger.severe(ex.getMessage()); | ||
} | ||
continue; | ||
} | ||
|
||
//Do request | ||
HttpResponse<Supplier<RonnyResponse>> response; | ||
try { | ||
try { | ||
response = this.client.send(request, bodyHandler); | ||
} catch (ConnectException | HttpConnectTimeoutException ex) { | ||
this.logger.severe("Could not connect to " + request.uri()); | ||
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); | ||
continue; | ||
} catch (IOException e) { | ||
logger.severe(e.getMessage()); | ||
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS); | ||
continue; | ||
} | ||
} catch (InterruptedException e) { | ||
logger.severe(e.getMessage()); | ||
continue; | ||
} | ||
|
||
//Check response state | ||
if (response.statusCode() != 200) { | ||
this.logger.warning( | ||
"Unexpected status code(" + response.statusCode() + ") when requesting " + url + " for station(" + this.station.getName() + ")" | ||
); | ||
continue; | ||
} | ||
|
||
//Fetch all batons and create a map by batonMAC | ||
Map<String, Baton> baton_mac_map = batonDAO.getAll().stream() | ||
.collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity())); | ||
|
||
//Insert detections | ||
List<Detection> new_detections = new ArrayList<>(); | ||
List<RonnyDetection> detections = response.body().get().detections; | ||
for (RonnyDetection detection : detections) { | ||
if (baton_mac_map.containsKey(detection.mac.toUpperCase())) { | ||
var baton = baton_mac_map.get(detection.mac.toUpperCase()); | ||
new_detections.add(new Detection( | ||
baton.getId(), | ||
station.getId(), | ||
detection.rssi, | ||
detection.battery, | ||
detection.uptimeMs, | ||
detection.id, | ||
new Timestamp((long) (detection.detectionTimestamp * 1000)), | ||
new Timestamp(System.currentTimeMillis()) | ||
)); | ||
} | ||
} | ||
if (!new_detections.isEmpty()) { | ||
detectionDAO.insertAll(new_detections); | ||
new_detections.forEach((detection) -> { | ||
lappers.forEach((lapper) -> lapper.handle(detection)); | ||
positioners.forEach((positioner) -> positioner.handle(detection)); | ||
}); | ||
} | ||
|
||
this.logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size()); | ||
int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds | ||
|
||
//If few detections are retrieved from the station, wait for some time. | ||
if (detections.size() < Fetcher.FULL_BATCH_SIZE) { | ||
try { | ||
Thread.sleep(Fetcher.IDLE_TIMEOUT_MS); | ||
} catch (InterruptedException e) { | ||
logger.severe(e.getMessage()); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
void fetch(); | ||
} |
Oops, something went wrong.