Improved snapshot netcode

This commit is contained in:
Anuken
2018-12-29 22:46:46 -05:00
parent 7c35b6e95a
commit d01b8c7ad1
17 changed files with 99 additions and 216 deletions

View File

@@ -11,10 +11,9 @@ import io.anuke.arc.entities.Entities;
import io.anuke.arc.entities.EntityGroup;
import io.anuke.arc.graphics.Color;
import io.anuke.arc.util.Interval;
import io.anuke.arc.util.io.ReusableByteArrayInputStream;
import io.anuke.arc.math.Mathf;
import io.anuke.arc.util.Log;
import io.anuke.arc.util.Time;
import io.anuke.arc.util.io.ReusableByteArrayInputStream;
import io.anuke.arc.util.serialization.Base64Coder;
import io.anuke.mindustry.Vars;
import io.anuke.mindustry.core.GameState.State;
@@ -55,16 +54,6 @@ public class NetClient implements ApplicationListener{
/**Last sent client snapshot ID.*/
private int lastSent;
/**Last snapshot ID recieved.*/
private int lastSnapshotBaseID = -1;
/**Current snapshot that is being built from chinks.*/
private byte[] currentSnapshot;
/**Counter of how many chunks have been recieved.*/
private int recievedChunkCounter;
/**ID of snapshot that is currently being constructed.*/
private int currentSnapshotID = -1;
/**List of entities that were removed, and need not be added while syncing.*/
private IntSet removed = new IntSet();
/**Byte stream for reading in snapshots.*/
@@ -210,97 +199,11 @@ public class NetClient implements ApplicationListener{
}
@Remote(variants = Variant.one, priority = PacketPriority.low, unreliable = true)
public static void onSnapshot(byte[] chunk, int snapshotID, short chunkID, int totalLength, int uncompressedLength){
int totalChunks = Mathf.ceil((float) totalLength / NetServer.maxSnapshotSize);
if(NetServer.debugSnapshots)
Log.info("Recieved snapshot: len {0} ID {1} chunkID {2} / "+totalChunks+" totalLength {3} bclient-base {4}", chunk.length, snapshotID, chunkID, totalLength, netClient.lastSnapshotBaseID);
//skip snapshot IDs that have already been recieved OR snapshots that are too far in front
if(snapshotID < netClient.lastSnapshotBaseID){
if(NetServer.debugSnapshots) Log.info("//SKIP SNAPSHOT");
return;
}
public static void onEntitySnapshot(byte groupID, short amount, short dataLen, byte[] data){
try{
byte[] snapshot;
//total length exceeds that needed to hold one snapshot, therefore, it is split into chunks
if(totalLength > NetServer.maxSnapshotSize){
//total amount of chunks to recieve
//reset status when a new snapshot sending begins
if(netClient.currentSnapshotID != snapshotID || netClient.currentSnapshot == null){
netClient.currentSnapshotID = snapshotID;
netClient.currentSnapshot = new byte[totalLength];
netClient.recievedChunkCounter = 0;
}
netClient.recievedChunkCounter++; //update recieved status
//copy the recieved bytes into the holding array
System.arraycopy(chunk, 0, netClient.currentSnapshot, chunkID * NetServer.maxSnapshotSize,
Math.min(NetServer.maxSnapshotSize, totalLength - chunkID * NetServer.maxSnapshotSize));
//when all chunks have been recieved, begin
if(netClient.recievedChunkCounter >= totalChunks && netClient.currentSnapshot != null){
snapshot = netClient.currentSnapshot;
}else{
return;
}
}else{
snapshot = chunk;
}
if(NetServer.debugSnapshots)
Log.info("Finished recieving snapshot ID {0} length {1}", snapshotID, chunk.length);
byte[] result = Net.decompressSnapshot(snapshot, uncompressedLength);
int length = result.length;
netClient.lastSnapshotBaseID = snapshotID;
//set stream bytes to begin snapshot reading
netClient.byteStream.setBytes(result, 0, length);
//get data input for reading from the stream
netClient.byteStream.setBytes(Net.decompressSnapshot(data, dataLen));
DataInputStream input = netClient.dataStream;
netClient.readSnapshot(input);
//confirm that snapshot has been recieved
netClient.lastSnapshotBaseID = snapshotID;
}catch(Exception e){
throw new RuntimeException(e);
}
}
public void readSnapshot(DataInputStream input) throws IOException{
//read wave info
state.wavetime = input.readFloat();
state.wave = input.readInt();
state.enemies = input.readInt();
byte cores = input.readByte();
for(int i = 0; i < cores; i++){
int pos = input.readInt();
Tile tile = world.tile(pos);
if(tile != null && tile.entity != null){
tile.entity.items.read(input);
}else{
new ItemModule().read(input);
}
}
long timestamp = input.readLong();
byte totalGroups = input.readByte();
//for each group...
for(int i = 0; i < totalGroups; i++){
//read group info
byte groupID = input.readByte();
short amount = input.readShort();
EntityGroup group = Entities.getGroup(groupID);
//go through each entity
@@ -321,13 +224,41 @@ public class NetClient implements ApplicationListener{
}
//read the entity
entity.read(input, timestamp);
entity.read(input);
if(add){
entity.add();
netClient.addRemovedEntity(entity.getID());
}
}
}catch(IOException e){
throw new RuntimeException(e);
}
}
@Remote(variants = Variant.one, priority = PacketPriority.low, unreliable = true)
public static void onStateSnapshot(float waveTime, int wave, int enemies, short coreDataLen, byte[] coreData){
try{
state.wavetime = waveTime;
state.wave = wave;
state.enemies = enemies;
netClient.byteStream.setBytes(Net.decompressSnapshot(coreData, coreDataLen));
DataInputStream input = netClient.dataStream;
byte cores = input.readByte();
for(int i = 0; i < cores; i++){
int pos = input.readInt();
Tile tile = world.tile(pos);
if(tile != null && tile.entity != null){
tile.entity.items.read(input);
}else{
new ItemModule().read(input);
}
}
}catch(IOException e){
throw new RuntimeException(e);
}
}
@@ -373,9 +304,6 @@ public class NetClient implements ApplicationListener{
connecting = true;
quiet = false;
lastSent = 0;
currentSnapshot = null;
currentSnapshotID = -1;
lastSnapshotBaseID = -1;
Entities.clear();
ui.chatfrag.clearMessages();
@@ -409,9 +337,11 @@ public class NetClient implements ApplicationListener{
Player player = players[0];
BuildRequest[] requests;
//limit to 10 to prevent buffer overflows
int usedRequests = Math.min(player.getPlaceQueue().size, 10);
requests = new BuildRequest[player.getPlaceQueue().size];
for(int i = 0; i < requests.length; i++){
requests = new BuildRequest[usedRequests];
for(int i = 0; i < usedRequests; i++){
requests[i] = player.getPlaceQueue().get(i);
}

View File

@@ -45,13 +45,12 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.zip.DeflaterOutputStream;
import static io.anuke.mindustry.Vars.*;
public class NetServer implements ApplicationListener{
public final static int maxSnapshotSize = 2047;
public final static int maxSnapshotSize = 430;
public final static boolean debugSnapshots = false;
public final static float maxSnapshotDelay = 200;
@@ -226,45 +225,6 @@ public class NetServer implements ApplicationListener{
});
}
/** Sends a raw byte[] snapshot to a client, splitting up into chunks when needed.*/
private static void sendSplitSnapshot(int userid, byte[] bytes, int snapshotID, int uncompressedLength){
if(bytes.length < maxSnapshotSize){
scheduleSnapshot(() -> Call.onSnapshot(userid, bytes, snapshotID, (short) 0, bytes.length, uncompressedLength));
}else{
int remaining = bytes.length;
int offset = 0;
int chunkid = 0;
while(remaining > 0){
int used = Math.min(remaining, maxSnapshotSize);
byte[] toSend;
//re-use sent byte arrays when possible
if(used == maxSnapshotSize && !debugSnapshots){
toSend = reusableSnapArray;
System.arraycopy(bytes, offset, toSend, 0, Math.min(offset + maxSnapshotSize, bytes.length) - offset);
}else{
toSend = Arrays.copyOfRange(bytes, offset, Math.min(offset + maxSnapshotSize, bytes.length));
}
short fchunk = (short)chunkid;
scheduleSnapshot(() -> Call.onSnapshot(userid, toSend, snapshotID, fchunk, bytes.length, uncompressedLength));
remaining -= used;
offset += used;
chunkid++;
}
}
}
private static void scheduleSnapshot(Runnable r){
if(debugSnapshots){
if(!Mathf.chance(snapshotDropchance)){
Time.run(maxSnapshotDelay / 1000f * 60f, r);
}
}else{
r.run();
}
}
public void sendWorldData(Player player, int clientID){
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DeflaterOutputStream def = new DeflaterOutputStream(stream);
@@ -481,35 +441,24 @@ public class NetServer implements ApplicationListener{
admins.save();
}
public void writeSnapshot(Player player, DataOutputStream dataStream) throws IOException{
viewport.setSize(player.con.viewWidth, player.con.viewHeight).setCenter(player.con.viewX, player.con.viewY);
//write wave datas
dataStream.writeFloat(state.wavetime);
dataStream.writeInt(state.wave);
dataStream.writeInt(state.enemies());
public void writeSnapshot(Player player) throws IOException{
syncStream.reset();
ObjectSet<Tile> cores = state.teams.get(player.getTeam()).cores;
dataStream.writeByte(cores.size);
//write all core inventory data
for(Tile tile : cores){
dataStream.writeInt(tile.pos());
tile.entity.items.write(dataStream);
}
//write timestamp
dataStream.writeLong(Time.millis());
dataStream.close();
byte[] stateBytes = syncStream.toByteArray();
int totalGroups = 0;
//write basic state data.
Call.onStateSnapshot(player.con.id, state.wavetime, state.wave, state.enemies, (short)stateBytes.length, Net.compressSnapshot(stateBytes));
for(EntityGroup<?> group : Entities.getAllGroups()){
if(!group.isEmpty() && (group.all().get(0) instanceof SyncTrait)) totalGroups++;
}
//write total amount of serializable groups
dataStream.writeByte(totalGroups);
viewport.setSize(player.con.viewWidth, player.con.viewHeight).setCenter(player.con.viewX, player.con.viewY);
//check for syncable groups
for(EntityGroup<?> group : Entities.getAllGroups()){
@@ -537,15 +486,32 @@ public class NetServer implements ApplicationListener{
}
}
//write group ID + group size
dataStream.writeByte(group.getID());
dataStream.writeShort(returnArray.size);
syncStream.reset();
int sent = 0;
for(Entity entity : returnArray){
//write all entities now
dataStream.writeInt(entity.getID()); //write id
dataStream.writeByte(((SyncTrait) entity).getTypeID()); //write type ID
((SyncTrait) entity).write(dataStream); //write entity
sent ++;
if(syncStream.position() > maxSnapshotSize){
dataStream.close();
byte[] syncBytes = syncStream.toByteArray();
Call.onEntitySnapshot(player.con.id, (byte)group.getID(), (short)sent, (short)syncBytes.length, Net.compressSnapshot(syncBytes));
sent = 0;
syncStream.reset();
}
}
if(sent > 0){
dataStream.close();
byte[] syncBytes = syncStream.toByteArray();
Call.onEntitySnapshot(player.con.id, (byte)group.getID(), (short)sent, (short)syncBytes.length, Net.compressSnapshot(syncBytes));
}
}
}
@@ -619,21 +585,7 @@ public class NetServer implements ApplicationListener{
if(!player.timer.get(Player.timerSync, serverSyncTime) || !connection.hasConnected) continue;
//reset stream to begin writing
Time.mark();
syncStream.reset();
writeSnapshot(player, dataStream);
dataStream.close();
byte[] bytes = syncStream.toByteArray();
int uncompressed = bytes.length;
bytes = Net.compressSnapshot(bytes);
int snapid = connection.lastSentSnapshotID ++;
if(debugSnapshots) Log.info("Sent snapshot: {0} bytes.", bytes.length);
sendSplitSnapshot(connection.id, bytes, snapid, uncompressed);
writeSnapshot(player);
}
}catch(IOException e){

View File

@@ -864,7 +864,7 @@ public class Player extends Unit implements BuilderTrait, CarryTrait, ShooterTra
}
@Override
public void read(DataInput buffer, long time) throws IOException{
public void read(DataInput buffer) throws IOException{
float lastx = x, lasty = y, lastrot = rotation;
super.readSave(buffer);
name = TypeIO.readStringData(buffer);
@@ -880,7 +880,7 @@ public class Player extends Unit implements BuilderTrait, CarryTrait, ShooterTra
readBuilding(buffer, !isLocal);
interpolator.read(lastx, lasty, x, y, time, rotation, baseRotation);
interpolator.read(lastx, lasty, x, y, rotation, baseRotation);
rotation = lastrot;
if(isLocal){

View File

@@ -158,7 +158,7 @@ public class Bullet extends BulletEntity<BulletType> implements TeamTrait, SyncT
}
@Override
public void read(DataInput data, long time) throws IOException{
public void read(DataInput data) throws IOException{
x = data.readFloat();
y = data.readFloat();
velocity.x = data.readFloat();

View File

@@ -177,7 +177,7 @@ public class Fire extends TimedEntity implements SaveTrait, SyncTrait, Poolable{
}
@Override
public void read(DataInput data, long time) throws IOException{
public void read(DataInput data) throws IOException{
x = data.readFloat();
y = data.readFloat();
}

View File

@@ -108,7 +108,7 @@ public class Lightning extends TimedEntity implements DrawTrait, SyncTrait, Time
public void write(DataOutput data){}
@Override
public void read(DataInput data, long time){}
public void read(DataInput data){}
@Override
public float lifetime(){

View File

@@ -301,7 +301,7 @@ public class Puddle extends SolidEntity implements SaveTrait, Poolable, DrawTrai
}
@Override
public void read(DataInput data, long time) throws IOException{
public void read(DataInput data) throws IOException{
x = data.readFloat();
y = data.readFloat();
liquid = content.liquid(data.readByte());

View File

@@ -66,5 +66,5 @@ public interface SyncTrait extends Entity, TypeTrait{
//Read and write sync data, usually position
void write(DataOutput data) throws IOException;
void read(DataInput data, long time) throws IOException;
void read(DataInput data) throws IOException;
}

View File

@@ -13,7 +13,6 @@ import io.anuke.arc.math.geom.Geometry;
import io.anuke.arc.math.geom.Rectangle;
import io.anuke.arc.util.Interval;
import io.anuke.arc.util.Time;
import io.anuke.arc.util.Timer;
import io.anuke.mindustry.Vars;
import io.anuke.mindustry.content.fx.ExplosionFx;
import io.anuke.mindustry.entities.Damage;
@@ -398,12 +397,12 @@ public abstract class BaseUnit extends Unit implements ShooterTrait{
}
@Override
public void read(DataInput data, long time) throws IOException{
public void read(DataInput data) throws IOException{
float lastx = x, lasty = y, lastrot = rotation;
super.readSave(data);
this.type = content.getByID(ContentType.unit, data.readByte());
interpolator.read(lastx, lasty, x, y, time, rotation);
interpolator.read(lastx, lasty, x, y, rotation);
rotation = lastrot;
}

View File

@@ -219,8 +219,8 @@ public abstract class GroundUnit extends BaseUnit{
}
@Override
public void read(DataInput data, long time) throws IOException{
super.read(data, time);
public void read(DataInput data) throws IOException{
super.read(data);
weapon = content.getByID(ContentType.weapon, data.readByte());
}

View File

@@ -99,8 +99,8 @@ public class AlphaDrone extends FlyingUnit {
}
@Override
public void read(DataInput stream, long time) throws IOException {
super.read(stream, time);
public void read(DataInput stream) throws IOException {
super.read(stream);
leader = Vars.playerGroup.getByID(stream.readInt());
}

View File

@@ -362,8 +362,8 @@ public class Drone extends FlyingUnit implements BuilderTrait{
}
@Override
public void read(DataInput data, long time) throws IOException{
super.read(data, time);
public void read(DataInput data) throws IOException{
super.read(data);
int mined = data.readInt();
int repairing = data.readInt();

View File

@@ -15,7 +15,7 @@ public class Interpolator{
public Vector2 pos = new Vector2();
public float[] values = {};
public void read(float cx, float cy, float x, float y, long sent, float... target1ds){
public void read(float cx, float cy, float x, float y, float... target1ds){
if(lastUpdated != 0) updateSpacing = Time.timeSinceMillis(lastUpdated);
lastUpdated = Time.millis();

View File

@@ -21,6 +21,8 @@ import io.anuke.mindustry.net.Packets.StreamChunk;
import io.anuke.mindustry.net.Streamable.StreamBuilder;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import static io.anuke.mindustry.Vars.*;
@@ -58,7 +60,9 @@ public class Net{
String error = t.getMessage() == null ? "" : t.getMessage().toLowerCase();
String type = t.getClass().toString().toLowerCase();
if(error.equals("mismatch")){
if(e instanceof BufferUnderflowException || e instanceof BufferOverflowException){
error = Core.bundle.get("text.error.io");
}else if(error.equals("mismatch")){
error = Core.bundle.get("text.error.mismatch");
}else if(error.contains("port out of range") || error.contains("invalid argument") || (error.contains("invalid") && error.contains("address"))){
error = Core.bundle.get("text.error.invalidaddress");

View File

@@ -111,13 +111,6 @@ public class NetworkIO{
}
}
//now write a snapshot.
player.con.viewX = world.width() * tilesize/2f;
player.con.viewY = world.height() * tilesize/2f;
player.con.viewWidth = world.width() * tilesize;
player.con.viewHeight = world.height() * tilesize;
netServer.writeSnapshot(player, stream);
}catch(IOException e){
throw new RuntimeException(e);
}
@@ -163,7 +156,7 @@ public class NetworkIO{
Entities.clear();
int id = stream.readInt();
player.resetNoAdd();
player.read(stream, Time.millis());
player.read(stream);
player.resetID(id);
player.add();
@@ -252,9 +245,6 @@ public class NetworkIO{
world.endMapLoad();
//read raw snapshot
netClient.readSnapshot(stream);
}catch(IOException e){
throw new RuntimeException(e);
}