This commit is contained in:
Anuken
2023-02-26 12:06:06 -05:00
parent 84e52bdee3
commit 41b87b9345
2 changed files with 61 additions and 70 deletions

View File

@@ -209,17 +209,13 @@ public class ArcNetProvider implements NetProvider{
}
}
//TODO remove
static AsyncUdp udp = new AsyncUdp();
@Override
public void pingHost(String address, int port, Cons<Host> valid, Cons<Exception> invalid){
long time = Time.millis();
var socket = new InetSocketAddress(address, port);
Log.info("Time to resolve @: @", address, Time.timeSinceMillis(time));
udp.send(socket, 2000, 512, ByteBuffer.wrap(new byte[]{-2, 1}), data -> {
AsyncUdp.send(socket, 2000, 512, ByteBuffer.wrap(new byte[]{-2, 1}), data -> {
Host host = NetworkIO.readServerData((int)Time.timeSinceMillis(time), socket.getAddress().getHostAddress(), data);
host.port = port;
Core.app.post(() -> valid.get(host));

View File

@@ -10,26 +10,72 @@ import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;
public class AsyncUdp implements Runnable{
Selector selector;
DelayQueue<Request> removals = new DelayQueue<>();
TaskQueue tasks = new TaskQueue();
int emptySelects;
public class AsyncUdp{
static Selector selector;
static DelayQueue<Request> removals = new DelayQueue<>();
static TaskQueue tasks = new TaskQueue();
static int emptySelects;
public AsyncUdp(){
static{
try{
selector = Selector.open();
Threads.daemon("AsyncUDP", this);
//handle requests and tasks
Threads.daemon("AsyncUDP", () -> {
while(true){
try{
long startTime = Time.millis();
int selected = selector.select(0);
tasks.run();
if(selected == 0){
//prevent hogging the CPU due to empty selects as per Kryonet implementation
if(emptySelects++ >= 100){
emptySelects = 0;
long elapsedTime = System.currentTimeMillis() - startTime;
if(elapsedTime < 25) Threads.sleep(25 - elapsedTime);
}
continue;
}
var keys = selector.selectedKeys();
for(Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ){
var key = iter.next();
iter.remove();
if(key.isReadable() && key.isValid()){
var request = (Request)key.attachment();
try{
var channel = (DatagramChannel)key.channel();
var buffer = ByteBuffer.allocate(request.bufferSize);
channel.receive(buffer);
buffer.flip();
request.received.get(buffer);
request.close();
}catch(IOException error){
request.fail(error);
//TODO remove logging, this is not needed outside of debugging
Log.err(error);
}
}
}
}catch(IOException e){
Log.err(e);
}
}
});
//remove requests with the delay queue
Threads.daemon("AsyncUDP-Delay", () -> {
while(true){
try{
var request = removals.take();
synchronized(request){
request.cancel(new TimeoutException());
}
}catch(InterruptedException ignored){
}
tasks.post(() -> request.fail(new TimeoutException()));
selector.wakeup();
}catch(InterruptedException ignored){}
}
});
}catch(IOException e){
@@ -37,8 +83,7 @@ public class AsyncUdp implements Runnable{
}
}
public void send(InetSocketAddress address, int timeout, int bufferSize, ByteBuffer data, Cons<ByteBuffer> received, Cons<Exception> failed){
//TODO is it worth posting to the task queue when you can just run it here? shouldn't be very expensive
public static void send(InetSocketAddress address, int timeout, int bufferSize, ByteBuffer data, Cons<ByteBuffer> received, Cons<Exception> failed){
tasks.post(() -> {
try{
DatagramChannel channel = selector.provider().openDatagramChannel();
@@ -60,56 +105,6 @@ public class AsyncUdp implements Runnable{
selector.wakeup();
}
@Override
public void run(){
while(true){
try{
long startTime = Time.millis();
int selected = selector.select(0);
tasks.run();
if(selected == 0){
//prevent hogging the CPU due to empty selects as per Kryonet implementation
if(emptySelects++ >= 100){
emptySelects = 0;
long elapsedTime = System.currentTimeMillis() - startTime;
if(elapsedTime < 25) Threads.sleep(25 - elapsedTime);
}
continue;
}
var keys = selector.selectedKeys();
for(Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ){
var key = iter.next();
iter.remove();
if(key.isReadable()){
var request = (Request)key.attachment();
synchronized(request){
try{
var channel = (DatagramChannel)key.channel();
var buffer = ByteBuffer.allocate(request.bufferSize);
channel.receive(buffer);
buffer.flip();
request.received.get(buffer);
request.close();
}catch(IOException error){
request.cancel(error);
Log.err(error);
}
}
}
}
}catch(IOException e){
Log.err(e);
}
}
}
static class Request implements Delayed{
final InetSocketAddress address;
final long timeout, connectStartMs;
@@ -142,7 +137,7 @@ public class AsyncUdp implements Runnable{
}
}
void cancel(Exception error){
void fail(Exception error){
if(!key.isValid()) return;
failed.get(error);
close();