From 41b87b93456aae3831017795259010197b9e2cca Mon Sep 17 00:00:00 2001 From: Anuken Date: Sun, 26 Feb 2023 12:06:06 -0500 Subject: [PATCH] static --- core/src/mindustry/net/ArcNetProvider.java | 6 +- core/src/mindustry/net/AsyncUdp.java | 125 ++++++++++----------- 2 files changed, 61 insertions(+), 70 deletions(-) diff --git a/core/src/mindustry/net/ArcNetProvider.java b/core/src/mindustry/net/ArcNetProvider.java index 7490fd6a2b..8bde23969b 100644 --- a/core/src/mindustry/net/ArcNetProvider.java +++ b/core/src/mindustry/net/ArcNetProvider.java @@ -209,17 +209,13 @@ public class ArcNetProvider implements NetProvider{ } } - - //TODO remove - static AsyncUdp udp = new AsyncUdp(); - @Override public void pingHost(String address, int port, Cons valid, Cons invalid){ long time = Time.millis(); var socket = new InetSocketAddress(address, port); Log.info("Time to resolve @: @", address, Time.timeSinceMillis(time)); - udp.send(socket, 2000, 512, ByteBuffer.wrap(new byte[]{-2, 1}), data -> { + AsyncUdp.send(socket, 2000, 512, ByteBuffer.wrap(new byte[]{-2, 1}), data -> { Host host = NetworkIO.readServerData((int)Time.timeSinceMillis(time), socket.getAddress().getHostAddress(), data); host.port = port; Core.app.post(() -> valid.get(host)); diff --git a/core/src/mindustry/net/AsyncUdp.java b/core/src/mindustry/net/AsyncUdp.java index 8c96470e4f..45d0d9725b 100644 --- a/core/src/mindustry/net/AsyncUdp.java +++ b/core/src/mindustry/net/AsyncUdp.java @@ -10,26 +10,72 @@ import java.nio.channels.*; import java.util.*; import java.util.concurrent.*; -public class AsyncUdp implements Runnable{ - Selector selector; - DelayQueue removals = new DelayQueue<>(); - TaskQueue tasks = new TaskQueue(); - int emptySelects; +public class AsyncUdp{ + static Selector selector; + static DelayQueue removals = new DelayQueue<>(); + static TaskQueue tasks = new TaskQueue(); + static int emptySelects; - public AsyncUdp(){ + static{ try{ selector = Selector.open(); - Threads.daemon("AsyncUDP", this); + //handle requests and tasks + Threads.daemon("AsyncUDP", () -> { + while(true){ + try{ + long startTime = Time.millis(); + int selected = selector.select(0); + + tasks.run(); + + if(selected == 0){ + //prevent hogging the CPU due to empty selects as per Kryonet implementation + if(emptySelects++ >= 100){ + emptySelects = 0; + long elapsedTime = System.currentTimeMillis() - startTime; + if(elapsedTime < 25) Threads.sleep(25 - elapsedTime); + } + continue; + } + + var keys = selector.selectedKeys(); + for(Iterator iter = keys.iterator(); iter.hasNext(); ){ + var key = iter.next(); + iter.remove(); + + if(key.isReadable() && key.isValid()){ + var request = (Request)key.attachment(); + try{ + var channel = (DatagramChannel)key.channel(); + var buffer = ByteBuffer.allocate(request.bufferSize); + channel.receive(buffer); + buffer.flip(); + + request.received.get(buffer); + request.close(); + }catch(IOException error){ + request.fail(error); + //TODO remove logging, this is not needed outside of debugging + Log.err(error); + } + } + + } + }catch(IOException e){ + Log.err(e); + } + } + }); + + //remove requests with the delay queue Threads.daemon("AsyncUDP-Delay", () -> { while(true){ try{ var request = removals.take(); - synchronized(request){ - request.cancel(new TimeoutException()); - } - }catch(InterruptedException ignored){ - } + tasks.post(() -> request.fail(new TimeoutException())); + selector.wakeup(); + }catch(InterruptedException ignored){} } }); }catch(IOException e){ @@ -37,8 +83,7 @@ public class AsyncUdp implements Runnable{ } } - public void send(InetSocketAddress address, int timeout, int bufferSize, ByteBuffer data, Cons received, Cons failed){ - //TODO is it worth posting to the task queue when you can just run it here? shouldn't be very expensive + public static void send(InetSocketAddress address, int timeout, int bufferSize, ByteBuffer data, Cons received, Cons failed){ tasks.post(() -> { try{ DatagramChannel channel = selector.provider().openDatagramChannel(); @@ -60,56 +105,6 @@ public class AsyncUdp implements Runnable{ selector.wakeup(); } - @Override - public void run(){ - while(true){ - try{ - long startTime = Time.millis(); - int selected = selector.select(0); - - tasks.run(); - - if(selected == 0){ - //prevent hogging the CPU due to empty selects as per Kryonet implementation - if(emptySelects++ >= 100){ - emptySelects = 0; - long elapsedTime = System.currentTimeMillis() - startTime; - if(elapsedTime < 25) Threads.sleep(25 - elapsedTime); - } - continue; - } - - var keys = selector.selectedKeys(); - for(Iterator iter = keys.iterator(); iter.hasNext(); ){ - var key = iter.next(); - iter.remove(); - - if(key.isReadable()){ - var request = (Request)key.attachment(); - - synchronized(request){ - try{ - var channel = (DatagramChannel)key.channel(); - var buffer = ByteBuffer.allocate(request.bufferSize); - channel.receive(buffer); - buffer.flip(); - - request.received.get(buffer); - request.close(); - }catch(IOException error){ - request.cancel(error); - Log.err(error); - } - } - } - - } - }catch(IOException e){ - Log.err(e); - } - } - } - static class Request implements Delayed{ final InetSocketAddress address; final long timeout, connectStartMs; @@ -142,7 +137,7 @@ public class AsyncUdp implements Runnable{ } } - void cancel(Exception error){ + void fail(Exception error){ if(!key.isValid()) return; failed.get(error); close();