Compare commits

...

3 commits

Author SHA1 Message Date
a51dcf7e1f discord-ws: Improve heartbeat thread handling
All checks were successful
ci/woodpecker/push/java Pipeline was successful
ci/woodpecker/push/oci-image-build Pipeline was successful
We don't need to manage interrupts since the sleep call will alreade
throw on interrupt.
2024-12-01 18:27:22 +01:00
8f18158ad0 discord-ws: Add status code 1000 & 1001 to produce unresumable close event 2024-12-01 18:14:13 +01:00
16775d3fd4 discord-ws: Improve logging 2024-12-01 18:13:27 +01:00
3 changed files with 17 additions and 7 deletions

View file

@ -1,8 +1,12 @@
package de.hhhammer.dchat.discord.ws.connection; package de.hhhammer.dchat.discord.ws.connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.http.WebSocket; import java.net.http.WebSocket;
public final class IdendificationConnectionInitiator implements ConnectionInitiator { public final class IdendificationConnectionInitiator implements ConnectionInitiator {
private static final Logger logger = LoggerFactory.getLogger(IdendificationConnectionInitiator.class);
private final String token; private final String token;
public IdendificationConnectionInitiator(final String token) { public IdendificationConnectionInitiator(final String token) {
@ -11,6 +15,7 @@ public final class IdendificationConnectionInitiator implements ConnectionInitia
@Override @Override
public void initiate(final WebSocket webSocket) { public void initiate(final WebSocket webSocket) {
logger.info("Initiate identification");
final String identifyPayload = "{\"op\": 2, \"d\": {\"token\": \"" + token + "\", \"intents\": 513, \"properties\": {\"os\": \"linux\", \"browser\": \"dchat_lib\", \"device\": \"dchat_lib\"}}}"; final String identifyPayload = "{\"op\": 2, \"d\": {\"token\": \"" + token + "\", \"intents\": 513, \"properties\": {\"os\": \"linux\", \"browser\": \"dchat_lib\", \"device\": \"dchat_lib\"}}}";
webSocket.sendText(identifyPayload, true); webSocket.sendText(identifyPayload, true);
} }

View file

@ -19,8 +19,9 @@ public final class ResumeConnectionInitiator implements ConnectionInitiator {
@Override @Override
public void initiate(final WebSocket webSocket) { public void initiate(final WebSocket webSocket) {
logger.info("Initiate resume");
logger.debug("Initiate resume for session '{}' on sequence '{}'", sessionId, lastSequence);
final String identifyPayload = "{\"op\": 6, \"d\": {\"token\": \"" + token + "\", \"session_id\": \"" + sessionId + "\", \"seq\": " + lastSequence + "}}"; final String identifyPayload = "{\"op\": 6, \"d\": {\"token\": \"" + token + "\", \"session_id\": \"" + sessionId + "\", \"seq\": " + lastSequence + "}}";
logger.debug("Resuming connection");
webSocket.sendText(identifyPayload, true); webSocket.sendText(identifyPayload, true);
} }
} }

View file

@ -59,13 +59,14 @@ public final class DiscordListener implements WebSocket.Listener {
@Override @Override
public CompletionStage<?> onText(final WebSocket webSocket, final CharSequence data, final boolean last) { public CompletionStage<?> onText(final WebSocket webSocket, final CharSequence data, final boolean last) {
logger.debug("Received message: {}", data); logger.trace("Received text: {}", data);
final Optional<String> optText = textCollector.collect(data, last); final Optional<String> optText = textCollector.collect(data, last);
if (optText.isEmpty()) { if (optText.isEmpty()) {
webSocket.request(1); webSocket.request(1);
return null; return null;
} }
final String text = optText.get(); final String text = optText.get();
logger.debug("Received event: {}", text);
final Event event = eventDeserializer.deserialize(text); final Event event = eventDeserializer.deserialize(text);
final int currentSequence = event.sequence(); final int currentSequence = event.sequence();
if (currentSequence != 0) lastSeq.set(currentSequence); if (currentSequence != 0) lastSeq.set(currentSequence);
@ -83,6 +84,7 @@ public final class DiscordListener implements WebSocket.Listener {
return null; return null;
} }
final int operation = event.operation(); final int operation = event.operation();
logger.debug("handling operation: {}", operation);
switch (operation) { switch (operation) {
// reconnect // reconnect
case 7 -> case 7 ->
@ -94,7 +96,7 @@ public final class DiscordListener implements WebSocket.Listener {
case 10 -> init(webSocket, event.data()); case 10 -> init(webSocket, event.data());
// heartbeat acknowledgement // heartbeat acknowledgement
case 11 -> receivedAck.set(true); case 11 -> receivedAck.set(true);
default -> logger.debug("Operation '{}' has no handler", operation); default -> logger.warn("Operation '{}' has no handler", operation);
} }
webSocket.request(1); webSocket.request(1);
return null; return null;
@ -110,10 +112,12 @@ public final class DiscordListener implements WebSocket.Listener {
@Override @Override
public CompletionStage<?> onClose(final WebSocket webSocket, final int statusCode, final String reason) { public CompletionStage<?> onClose(final WebSocket webSocket, final int statusCode, final String reason) {
logger.info("Connection closed: {}: {}", statusCode, reason); logger.info("Connection closed: {}: {}", statusCode, reason);
// Gateway Close Event Codes -> https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-close-event-codes
switch (statusCode) { switch (statusCode) {
case 0, 4000, 4001, 4002, 4005, 4008 -> case 0, 4000, 4001, 4002, 4005, 4008 ->
this.closeEventQueue.add(new CloseEvent.ResumableCloseEvent(this.resumeGatewayUrl.get(), this.sessionId.get(), lastSeq.get())); this.closeEventQueue.add(new CloseEvent.ResumableCloseEvent(this.resumeGatewayUrl.get(), this.sessionId.get(), lastSeq.get()));
case 4003, 4007, 4009 -> this.closeEventQueue.add(new CloseEvent.UnresumableCloseEvent()); // 1000 and 1001 can be sent or received as close code and should trigger reconnect -> https://discord.com/developers/docs/events/gateway#initiating-a-disconnect
case 1001, 4003, 4007, 4009 -> this.closeEventQueue.add(new CloseEvent.UnresumableCloseEvent());
case 4004, 4010, 4011, 4012, 4013, 4014 -> case 4004, 4010, 4011, 4012, 4013, 4014 ->
this.closeEventQueue.add(new CloseEvent.UnrecoverableCloseEvent()); this.closeEventQueue.add(new CloseEvent.UnrecoverableCloseEvent());
default -> { default -> {
@ -133,12 +137,12 @@ public final class DiscordListener implements WebSocket.Listener {
private void startHeartbeat(final WebSocket webSocket, final int heartbeatInterval) { private void startHeartbeat(final WebSocket webSocket, final int heartbeatInterval) {
logger.info("Starting heartbeat"); logger.info("Starting heartbeat");
CompletableFuture.runAsync(() -> { Thread.startVirtualThread(() -> {
final float jitter = new Random().nextFloat(); final float jitter = new Random().nextFloat();
final int startDelay = Math.round(heartbeatInterval * jitter); final int startDelay = Math.round(heartbeatInterval * jitter);
try { try {
Thread.sleep(startDelay); Thread.sleep(startDelay);
while (!webSocket.isOutputClosed() && receivedAck.get() && !Thread.interrupted()) { while (!webSocket.isOutputClosed() && receivedAck.get()) {
final int intSeq = lastSeq.get(); final int intSeq = lastSeq.get();
final String stringSeq = intSeq != 0 ? String.valueOf(intSeq) : "null"; final String stringSeq = intSeq != 0 ? String.valueOf(intSeq) : "null";
// Send heartbeat // Send heartbeat
@ -148,7 +152,7 @@ public final class DiscordListener implements WebSocket.Listener {
TimeUnit.MILLISECONDS.sleep(heartbeatInterval); TimeUnit.MILLISECONDS.sleep(heartbeatInterval);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); logger.error("Heartbeat interrupted", e);
} }
logger.info("Stopping heartbeat"); logger.info("Stopping heartbeat");
}); });