From 46a8edb7810b90639468720087048c2a2b8c8b27 Mon Sep 17 00:00:00 2001 From: Anuken Date: Sat, 8 Sep 2018 17:07:25 -0400 Subject: [PATCH] LZ4 compression --- build.gradle | 1 + .../io/anuke/mindustry/core/NetClient.java | 6 +- .../io/anuke/mindustry/core/NetServer.java | 10 ++- core/src/io/anuke/mindustry/net/Net.java | 86 +++++++------------ .../world/blocks/production/Fracker.java | 3 +- .../world/blocks/production/PowerCrafter.java | 2 +- .../world/blocks/production/PowerSmelter.java | 4 +- .../world/blocks/production/SolidPump.java | 4 +- kryonet/src/io/anuke/kryonet/KryoClient.java | 10 +++ kryonet/src/io/anuke/kryonet/KryoServer.java | 8 ++ 10 files changed, 67 insertions(+), 67 deletions(-) diff --git a/build.gradle b/build.gradle index 98357633f2..9ebe70f2a2 100644 --- a/build.gradle +++ b/build.gradle @@ -230,6 +230,7 @@ project(":kryonet") { dependencies { compile project(":core") compile 'com.github.crykn:kryonet:2.22.1' + compile "org.lz4:lz4-java:1.4.1" compile 'com.github.Anuken:WaifUPnP:05eb46bc577fd7674596946ba288c96c0cedd893' } } diff --git a/core/src/io/anuke/mindustry/core/NetClient.java b/core/src/io/anuke/mindustry/core/NetClient.java index 7a8ad01ecb..f41f82d7eb 100644 --- a/core/src/io/anuke/mindustry/core/NetClient.java +++ b/core/src/io/anuke/mindustry/core/NetClient.java @@ -190,7 +190,7 @@ public class NetClient extends Module{ } @Remote(variants = Variant.one, priority = PacketPriority.low, unreliable = true) - public static void onSnapshot(byte[] chunk, int snapshotID, short chunkID, int totalLength){ + public static void onSnapshot(byte[] chunk, int snapshotID, short chunkID, int totalLength, int uncompressedLength){ int totalChunks = Mathf.ceil((float) totalLength / NetServer.maxSnapshotSize); if(NetServer.debugSnapshots) @@ -234,8 +234,8 @@ public class NetClient extends Module{ if(NetServer.debugSnapshots) Log.info("Finished recieving snapshot ID {0} length {1}", snapshotID, chunk.length); - byte[] result = snapshot; - int length = snapshot.length; + byte[] result = Net.decompressSnapshot(snapshot, uncompressedLength); + int length = result.length; netClient.lastSnapshotBaseID = snapshotID; diff --git a/core/src/io/anuke/mindustry/core/NetServer.java b/core/src/io/anuke/mindustry/core/NetServer.java index 110d59b7ff..bb6233c9a6 100644 --- a/core/src/io/anuke/mindustry/core/NetServer.java +++ b/core/src/io/anuke/mindustry/core/NetServer.java @@ -228,9 +228,9 @@ public class NetServer extends Module{ } /** Sends a raw byte[] snapshot to a client, splitting up into chunks when needed.*/ - private static void sendSplitSnapshot(int userid, byte[] bytes, int snapshotID){ + private static void sendSplitSnapshot(int userid, byte[] bytes, int snapshotID, int uncompressedLength){ if(bytes.length < maxSnapshotSize){ - scheduleSnapshot(() -> Call.onSnapshot(userid, bytes, snapshotID, (short) 0, bytes.length)); + scheduleSnapshot(() -> Call.onSnapshot(userid, bytes, snapshotID, (short) 0, bytes.length, uncompressedLength)); }else{ int remaining = bytes.length; int offset = 0; @@ -247,7 +247,7 @@ public class NetServer extends Module{ } short fchunk = (short)chunkid; - scheduleSnapshot(() -> Call.onSnapshot(userid, toSend, snapshotID, fchunk, bytes.length)); + scheduleSnapshot(() -> Call.onSnapshot(userid, toSend, snapshotID, fchunk, bytes.length, uncompressedLength)); remaining -= used; offset += used; @@ -608,10 +608,12 @@ public class NetServer extends Module{ dataStream.close(); byte[] bytes = syncStream.toByteArray(); + int uncompressed = bytes.length; + bytes = Net.compressSnapshot(bytes); int snapid = connection.lastSentSnapshotID ++; if(debugSnapshots) Log.info("Sent snapshot: {0} bytes.", bytes.length); - sendSplitSnapshot(connection.id, bytes, snapid); + sendSplitSnapshot(connection.id, bytes, snapid, uncompressed); } }catch(IOException e){ diff --git a/core/src/io/anuke/mindustry/net/Net.java b/core/src/io/anuke/mindustry/net/Net.java index 720e57eaf5..83d4c83f12 100644 --- a/core/src/io/anuke/mindustry/net/Net.java +++ b/core/src/io/anuke/mindustry/net/Net.java @@ -119,6 +119,14 @@ public class Net{ active = false; } + public static byte[] compressSnapshot(byte[] input){ + return serverProvider.compressSnapshot(input); + } + + public static byte[] decompressSnapshot(byte[] input, int size){ + return clientProvider.decompressSnapshot(input, size); + } + /** * Starts discovering servers on a different thread. Does not work with GWT. * Callback is run on the main libGDX thread. @@ -337,99 +345,69 @@ public class Net{ tcp, udp } - /** - * Client implementation. - */ + /**Client implementation.*/ public interface ClientProvider{ - /** - * Connect to a server. - */ + /**Connect to a server.*/ void connect(String ip, int port) throws IOException; - /** - * Send an object to the server. - */ + /**Send an object to the server.*/ void send(Object object, SendMode mode); - /** - * Update the ping. Should be done every second or so. - */ + /**Update the ping. Should be done every second or so.*/ void updatePing(); - /** - * Get ping in milliseconds. Will only be valid after a call to updatePing. - */ + /**Get ping in milliseconds. Will only be valid after a call to updatePing.*/ int getPing(); - /** - * Disconnect from the server. - */ + /**Disconnect from the server.*/ void disconnect(); + /**Decompress an input snapshot byte array.*/ + byte[] decompressSnapshot(byte[] input, int size); + /** * Discover servers. This should run the callback regardless of whether any servers are found. Should not block. * Callback should be run on libGDX main thread. */ void discover(Consumer> callback); - /** - * Ping a host. If an error occured, failed() should be called with the exception. - */ + /**Ping a host. If an error occured, failed() should be called with the exception.*/ void pingHost(String address, int port, Consumer valid, Consumer failed); - /** - * Close all connections. - */ + /**Close all connections.*/ void dispose(); } - /** - * Server implementation. - */ + /**Server implementation.*/ public interface ServerProvider{ - /** - * Host a server at specified port. - */ + /**Host a server at specified port.*/ void host(int port) throws IOException; - /** - * Sends a large stream of data to a specific client. - */ + /**Sends a large stream of data to a specific client.*/ void sendStream(int id, Streamable stream); - /** - * Send an object to everyone connected. - */ + /**Send an object to everyone connected.*/ void send(Object object, SendMode mode); - /** - * Send an object to a specific client ID. - */ + /**Send an object to a specific client ID.*/ void sendTo(int id, Object object, SendMode mode); - /** - * Send an object to everyone except a client ID. - */ + /**Send an object to everyone except a client ID.*/ void sendExcept(int id, Object object, SendMode mode); - /** - * Close the server connection. - */ + /**Close the server connection.*/ void close(); - /** - * Return all connected users. - */ + /**Compress an input snapshot byte array.*/ + byte[] compressSnapshot(byte[] input); + + /**Return all connected users.*/ Array getConnections(); - /** - * Returns a connection by ID. - */ + /**Returns a connection by ID.*/ NetConnection getByID(int id); - /** - * Close all connections. - */ + /**Close all connections.*/ void dispose(); } } diff --git a/core/src/io/anuke/mindustry/world/blocks/production/Fracker.java b/core/src/io/anuke/mindustry/world/blocks/production/Fracker.java index ccdb7235d1..5ce1eaca7f 100644 --- a/core/src/io/anuke/mindustry/world/blocks/production/Fracker.java +++ b/core/src/io/anuke/mindustry/world/blocks/production/Fracker.java @@ -5,7 +5,6 @@ import io.anuke.mindustry.entities.TileEntity; import io.anuke.mindustry.type.Item; import io.anuke.mindustry.world.Tile; import io.anuke.mindustry.world.consumers.ConsumeItem; -import io.anuke.ucore.core.Timers; import io.anuke.ucore.graphics.Draw; public class Fracker extends SolidPump{ @@ -70,7 +69,7 @@ public class Fracker extends SolidPump{ if(entity.cons.valid() && entity.accumulator < itemUseTime){ super.update(tile); - entity.accumulator += Timers.delta(); + entity.accumulator += entity.delta(); }else{ tryDumpLiquid(tile, result); } diff --git a/core/src/io/anuke/mindustry/world/blocks/production/PowerCrafter.java b/core/src/io/anuke/mindustry/world/blocks/production/PowerCrafter.java index 12cb6161c4..b02053a544 100644 --- a/core/src/io/anuke/mindustry/world/blocks/production/PowerCrafter.java +++ b/core/src/io/anuke/mindustry/world/blocks/production/PowerCrafter.java @@ -64,7 +64,7 @@ public class PowerCrafter extends Block{ GenericCrafterEntity entity = tile.entity(); if(entity.cons.valid()){ - entity.progress += 1f / craftTime; + entity.progress += 1f / craftTime * entity.delta(); entity.totalProgress += entity.delta(); } diff --git a/core/src/io/anuke/mindustry/world/blocks/production/PowerSmelter.java b/core/src/io/anuke/mindustry/world/blocks/production/PowerSmelter.java index de5c8ea6fb..9662903be6 100644 --- a/core/src/io/anuke/mindustry/world/blocks/production/PowerSmelter.java +++ b/core/src/io/anuke/mindustry/world/blocks/production/PowerSmelter.java @@ -101,7 +101,7 @@ public class PowerSmelter extends PowerBlock{ //heat it up if there's enough power if(entity.cons.valid()){ entity.heat += 1f / heatUpTime * entity.delta(); - if(Mathf.chance(Timers.delta() * burnEffectChance)) + if(Mathf.chance(entity.delta() * burnEffectChance)) Effects.effect(burnEffect, entity.x + Mathf.range(size * 4f), entity.y + Mathf.range(size * 4)); }else{ entity.heat -= 1f / heatUpTime * Timers.delta(); @@ -122,6 +122,8 @@ public class PowerSmelter extends PowerBlock{ } } + entity.craftTime += entity.delta(); + if(entity.items.get(result) >= itemCapacity //output full || entity.heat <= minHeat //not burning || entity.craftTime < craftTime*baseSmeltSpeed){ //not yet time diff --git a/core/src/io/anuke/mindustry/world/blocks/production/SolidPump.java b/core/src/io/anuke/mindustry/world/blocks/production/SolidPump.java index 8ac6745300..9a46f02404 100644 --- a/core/src/io/anuke/mindustry/world/blocks/production/SolidPump.java +++ b/core/src/io/anuke/mindustry/world/blocks/production/SolidPump.java @@ -80,13 +80,13 @@ public class SolidPump extends Pump{ float maxPump = Math.min(liquidCapacity - typeLiquid(tile), pumpAmount * entity.delta() * fraction); tile.entity.liquids.add(result, maxPump); entity.warmup = Mathf.lerpDelta(entity.warmup, 1f, 0.02f); - if(Mathf.chance(Timers.delta() * updateEffectChance)) + if(Mathf.chance(entity.delta() * updateEffectChance)) Effects.effect(updateEffect, entity.x + Mathf.range(size * 2f), entity.y + Mathf.range(size * 2f)); }else{ entity.warmup = Mathf.lerpDelta(entity.warmup, 0f, 0.02f); } - entity.pumpTime += entity.warmup * Timers.delta(); + entity.pumpTime += entity.warmup * entity.delta(); tryDumpLiquid(tile, result); } diff --git a/kryonet/src/io/anuke/kryonet/KryoClient.java b/kryonet/src/io/anuke/kryonet/KryoClient.java index 0d6aa3a1c4..2fddd60cc6 100644 --- a/kryonet/src/io/anuke/kryonet/KryoClient.java +++ b/kryonet/src/io/anuke/kryonet/KryoClient.java @@ -16,6 +16,8 @@ import io.anuke.mindustry.net.Packets.Disconnect; import io.anuke.ucore.function.Consumer; import io.anuke.ucore.util.Pooling; import io.anuke.ucore.util.Strings; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; import java.io.IOException; import java.net.DatagramPacket; @@ -32,6 +34,7 @@ public class KryoClient implements ClientProvider{ Client client; ObjectMap addresses = new ObjectMap<>(); ClientDiscoveryHandler handler; + LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor(); public KryoClient(){ KryoCore.init(); @@ -105,6 +108,13 @@ public class KryoClient implements ClientProvider{ } } + @Override + public byte[] decompressSnapshot(byte[] input, int size){ + byte[] result = new byte[size]; + decompressor.decompress(input, result); + return result; + } + @Override public void connect(String ip, int port) throws IOException { //just in case diff --git a/kryonet/src/io/anuke/kryonet/KryoServer.java b/kryonet/src/io/anuke/kryonet/KryoServer.java index 259bc526f6..50e2bf4cf4 100644 --- a/kryonet/src/io/anuke/kryonet/KryoServer.java +++ b/kryonet/src/io/anuke/kryonet/KryoServer.java @@ -20,6 +20,8 @@ import io.anuke.mindustry.net.Packets.StreamChunk; import io.anuke.ucore.UCore; import io.anuke.ucore.core.Timers; import io.anuke.ucore.util.Log; +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; import java.io.IOException; import java.nio.ByteBuffer; @@ -33,6 +35,7 @@ public class KryoServer implements ServerProvider { final CopyOnWriteArrayList connections = new CopyOnWriteArrayList<>(); final CopyOnWriteArraySet missing = new CopyOnWriteArraySet<>(); final Array array = new Array<>(); + final LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor(); Thread serverThread; int lastconnection = 0; @@ -105,6 +108,11 @@ public class KryoServer implements ServerProvider { } } + @Override + public byte[] compressSnapshot(byte[] input){ + return compressor.compress(input); + } + @Override public Array getConnections() { array.clear();