Extensive netcode changes, bugfixes

This commit is contained in:
Anuken
2018-06-29 14:13:31 -04:00
parent 3898602f0e
commit b4151a256b
20 changed files with 411 additions and 187 deletions

View File

@@ -0,0 +1,89 @@
package io.anuke.kryonet;
import com.esotericsoftware.kryonet.Connection;
import com.esotericsoftware.kryonet.Listener;
import com.esotericsoftware.kryonet.Listener.QueuedListener;
import java.util.LinkedList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class CustomListeners {
static public class LagListener extends QueuedListener {
protected final ScheduledExecutorService threadPool;
private final int lagMillisMin, lagMillisMax;
final LinkedList<Runnable> runnables = new LinkedList();
public LagListener (int lagMillisMin, int lagMillisMax, Listener listener) {
super(listener);
this.lagMillisMin = lagMillisMin;
this.lagMillisMax = lagMillisMax;
threadPool = Executors.newScheduledThreadPool(1, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
});
}
protected int calculateLag() {
return lagMillisMin + (int)(Math.random() * (lagMillisMax - lagMillisMin));
}
@Override
public void queue (Runnable runnable) {
synchronized (runnables) {
runnables.addFirst(runnable);
}
threadPool.schedule(() -> {
Runnable runnable1;
synchronized (runnables) {
runnable1 = runnables.removeLast();
}
runnable1.run();
}, calculateLag(), TimeUnit.MILLISECONDS);
}
}
/**
* Delays, reorders and does not make guarantees to the delivery of incoming objects
* to the wrapped listener (in order to simulate lag, jitter, package loss and
* package duplication).
* Notification events are likely processed on a separate thread after a delay.
* Note that only the delivery of incoming objects is modified. To modify the delivery
* of outgoing objects, use a UnreliableListener at the other end of the connection.
*/
static public class UnreliableListener extends LagListener {
private final float lossPercentage;
private final float duplicationPercentage;
private final CustomListeners.LagListener tcpListener;
public UnreliableListener (int lagMillisMin, int lagMillisMax, float lossPercentage,
float duplicationPercentage, Listener listener) {
super(lagMillisMin, lagMillisMax, listener);
this.tcpListener = new CustomListeners.LagListener(lagMillisMin, lagMillisMax, listener);
this.lossPercentage = lossPercentage;
this.duplicationPercentage = duplicationPercentage;
}
@Override
public void received(Connection connection, Object object) {
if(KryoCore.lastUDP) {
super.received(connection, object);
}else{
tcpListener.received(connection, object);
}
}
@Override
public void queue (Runnable runnable) {
do {
if (Math.random() >= lossPercentage) {
threadPool.schedule(runnable, calculateLag(), TimeUnit.MILLISECONDS);
}
} while (Math.random() < duplicationPercentage);
}
}
}

View File

@@ -5,8 +5,8 @@ import com.badlogic.gdx.utils.Array;
import com.badlogic.gdx.utils.ObjectMap;
import com.badlogic.gdx.utils.ObjectSet;
import com.esotericsoftware.kryonet.*;
import com.esotericsoftware.kryonet.Listener.LagListener;
import com.esotericsoftware.minlog.Log;
import io.anuke.kryonet.CustomListeners.UnreliableListener;
import io.anuke.mindustry.net.Host;
import io.anuke.mindustry.net.Net;
import io.anuke.mindustry.net.Net.ClientProvider;
@@ -25,9 +25,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedSelectorException;
import java.util.List;
import static io.anuke.mindustry.Vars.netClient;
import static io.anuke.mindustry.Vars.port;
import static io.anuke.mindustry.Vars.threads;
import static io.anuke.mindustry.Vars.*;
public class KryoClient implements ClientProvider{
Client client;
@@ -35,6 +33,8 @@ public class KryoClient implements ClientProvider{
ClientDiscoveryHandler handler;
public KryoClient(){
KryoCore.init();
handler = new ClientDiscoveryHandler() {
@Override
public DatagramPacket onRequestNewDatagramPacket() {
@@ -80,7 +80,7 @@ public class KryoClient implements ClientProvider{
public void received (Connection connection, Object object) {
if(object instanceof FrameworkMessage) return;
Gdx.app.postRunnable(() -> {
threads.runDelay(() -> {
try{
Net.handleClientReceived(object);
}catch (Exception e){
@@ -97,8 +97,8 @@ public class KryoClient implements ClientProvider{
}
};
if(KryoRegistrator.fakeLag){
client.addListener(new LagListener(KryoRegistrator.fakeLagMin, KryoRegistrator.fakeLagMax, listener));
if(KryoCore.fakeLag){
client.addListener(new UnreliableListener(KryoCore.fakeLagMin, KryoCore.fakeLagMax, KryoCore.fakeLagDrop, KryoCore.fakeLagDuplicate, listener));
}else{
client.addListener(listener);
}

View File

@@ -0,0 +1,89 @@
package io.anuke.kryonet;
import com.esotericsoftware.minlog.Log;
import com.esotericsoftware.minlog.Log.Logger;
import io.anuke.ucore.util.ColorCodes;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static io.anuke.mindustry.Vars.headless;
/**Utilities and configs for kryo module.*/
public class KryoCore {
public static boolean fakeLag = false;
public static final int fakeLagMax = 500;
public static final int fakeLagMin = 0;
public static final float fakeLagDrop = 0.1f;
public static final float fakeLagDuplicate = 0.1f;
public static boolean lastUDP;
private static ScheduledExecutorService threadPool;
public static void init(){
Log.set(fakeLag ? Log.LEVEL_DEBUG : Log.LEVEL_WARN);
Log.setLogger(new Logger(){
public void log (int level, String category, String message, Throwable ex) {
if(fakeLag){
if(message.contains("UDP")){
lastUDP = true;
}else if(message.contains("TCP")){
lastUDP = false;
}
return;
}
StringBuilder builder = new StringBuilder(256);
if(headless)
builder.append(ColorCodes.BLUE);
builder.append("Net Error: ");
builder.append(message);
if (ex != null) {
StringWriter writer = new StringWriter(256);
ex.printStackTrace(new PrintWriter(writer));
builder.append('\n');
builder.append(writer.toString().trim());
}
if(headless)
builder.append(ColorCodes.RESET);
io.anuke.ucore.util.Log.info("&b" + builder.toString());
}
});
}
private static int calculateLag() {
return fakeLagMin + (int)(Math.random() * (fakeLagMax - fakeLagMin));
}
/**Executes something in a potentially unreliable way. Used to simulate lag and packet errors with UDP.*/
public static void recieveUnreliable(Runnable run){
if(fakeLag && threadPool == null){
threadPool = Executors.newScheduledThreadPool(1, r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
});
}
if(fakeLag){
do {
if (Math.random() >= fakeLagDrop) {
threadPool.schedule(run, calculateLag(), TimeUnit.MILLISECONDS);
}
} while (Math.random() < fakeLagDuplicate);
}else{
run.run();
}
}
}

View File

@@ -1,45 +0,0 @@
package io.anuke.kryonet;
import com.esotericsoftware.minlog.Log;
import com.esotericsoftware.minlog.Log.Logger;
import io.anuke.ucore.util.ColorCodes;
import java.io.PrintWriter;
import java.io.StringWriter;
import static io.anuke.mindustry.Vars.headless;
public class KryoRegistrator {
public static boolean fakeLag = false;
public static final int fakeLagMax = 1000;
public static final int fakeLagMin = 0;
static{
Log.set(Log.LEVEL_WARN);
Log.setLogger(new Logger(){
public void log (int level, String category, String message, Throwable ex) {
StringBuilder builder = new StringBuilder(256);
if(headless)
builder.append(ColorCodes.BLUE);
builder.append("Net Error: ");
builder.append(message);
if (ex != null) {
StringWriter writer = new StringWriter(256);
ex.printStackTrace(new PrintWriter(writer));
builder.append('\n');
builder.append(writer.toString().trim());
}
if(headless)
builder.append(ColorCodes.RESET);
io.anuke.ucore.util.Log.info("&b" + builder.toString());
}
});
}
}

View File

@@ -6,9 +6,9 @@ import com.badlogic.gdx.utils.Base64Coder;
import com.esotericsoftware.kryonet.Connection;
import com.esotericsoftware.kryonet.FrameworkMessage;
import com.esotericsoftware.kryonet.Listener;
import com.esotericsoftware.kryonet.Listener.LagListener;
import com.esotericsoftware.kryonet.Server;
import com.esotericsoftware.kryonet.util.InputStreamSender;
import io.anuke.kryonet.CustomListeners.UnreliableListener;
import io.anuke.mindustry.Vars;
import io.anuke.mindustry.net.Net;
import io.anuke.mindustry.net.Net.SendMode;
@@ -17,9 +17,9 @@ import io.anuke.mindustry.net.NetConnection;
import io.anuke.mindustry.net.NetworkIO;
import io.anuke.mindustry.net.Packets.Connect;
import io.anuke.mindustry.net.Packets.Disconnect;
import io.anuke.mindustry.net.Streamable;
import io.anuke.mindustry.net.Packets.StreamBegin;
import io.anuke.mindustry.net.Packets.StreamChunk;
import io.anuke.mindustry.net.Streamable;
import io.anuke.ucore.UCore;
import io.anuke.ucore.core.Timers;
import io.anuke.ucore.util.Log;
@@ -52,6 +52,8 @@ public class KryoServer implements ServerProvider {
int lastconnection = 0;
public KryoServer(){
KryoCore.init();
server = new Server(4096*2, 4096, connection -> new ByteSerializer());
server.setDiscoveryHandler((datagramChannel, fromAddress) -> {
ByteBuffer buffer = NetworkIO.writeServerData();
@@ -110,8 +112,8 @@ public class KryoServer implements ServerProvider {
}
};
if(KryoRegistrator.fakeLag){
server.addListener(new LagListener(KryoRegistrator.fakeLagMin, KryoRegistrator.fakeLagMax, listener));
if(KryoCore.fakeLag){
server.addListener(new UnreliableListener(KryoCore.fakeLagMin, KryoCore.fakeLagMax, KryoCore.fakeLagDrop, KryoCore.fakeLagDuplicate, listener));
}else{
server.addListener(listener);
}
@@ -323,6 +325,11 @@ public class KryoServer implements ServerProvider {
this.connection = connection;
}
@Override
public boolean isConnected(){
return connection == null ? !socket.isClosed() : connection.isConnected();
}
@Override
public void send(Object object, SendMode mode){
if(socket != null){