Added websocket support for no good reason

This commit is contained in:
Anuken
2018-01-17 17:28:33 -05:00
parent b266516fad
commit 1b4a8c83ae
14 changed files with 564 additions and 99 deletions

View File

@@ -0,0 +1,147 @@
package io.anuke.kryonet;
import com.badlogic.gdx.utils.Array;
import com.badlogic.gdx.utils.Base64Coder;
import com.badlogic.gdx.utils.reflect.ClassReflection;
import io.anuke.mindustry.Vars;
import io.anuke.mindustry.net.Host;
import io.anuke.mindustry.net.Net;
import io.anuke.mindustry.net.Net.ClientProvider;
import io.anuke.mindustry.net.Net.SendMode;
import io.anuke.mindustry.net.Packet;
import io.anuke.mindustry.net.Packets.Connect;
import io.anuke.mindustry.net.Packets.Disconnect;
import io.anuke.mindustry.net.Registrator;
import io.anuke.ucore.UCore;
import io.anuke.ucore.function.Consumer;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
public class JavaWebsocketClient implements ClientProvider {
WebSocketClient socket;
ByteBuffer buffer = ByteBuffer.allocate(1024);
boolean debug = false;
@Override
public void connect(String ip, int port) throws IOException {
try {
URI i = new URI("ws://" + ip + ":" + Vars.webPort);
UCore.log("Connecting: " + i);
socket = new WebSocketClient(i, new Draft_6455(), null, 5000) {
Thread thread;
@Override
public void connect() {
if(thread != null )
throw new IllegalStateException( "WebSocketClient objects are not reuseable" );
thread = new Thread(this);
thread.setDaemon(true);
thread.start();
}
@Override
public void onOpen(ServerHandshake handshakedata) {
UCore.log("Connected!");
Connect connect = new Connect();
Net.handleClientReceived(connect);
}
@Override
public void onMessage(String message) {
if(debug) UCore.log("Got message: " + message);
try {
byte[] bytes = Base64Coder.decode(message);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
byte id = buffer.get();
if (id == -2) {
//this is a framework message... do nothing yet?
} else {
Class<?> type = Registrator.getByID(id);
if(debug) UCore.log("Got class ID: " + type);
Packet packet = (Packet) ClassReflection.newInstance(type);
packet.read(buffer);
Net.handleClientReceived(packet);
}
} catch (Exception e) {
e.printStackTrace();
//throw new RuntimeException(e);
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
if(debug) UCore.log("Closed.");
Disconnect disconnect = new Disconnect();
Net.handleClientReceived(disconnect);
}
@Override
public void onError(Exception ex) {
onClose(0, null, true);
ex.printStackTrace();
}
};
socket.connect();
}catch (URISyntaxException e){
throw new IOException(e);
}
}
@Override
public void send(Object object, SendMode mode) {
if(!(object instanceof Packet)) throw new RuntimeException("All sent objects must be packets!");
Packet p = (Packet)object;
buffer.position(0);
buffer.put(Registrator.getID(object.getClass()));
p.write(buffer);
int pos = buffer.position();
buffer.position(0);
byte[] out = new byte[pos];
buffer.get(out);
String string = new String(Base64Coder.encode(out));
if(debug) UCore.log("Sending string: " + string);
socket.send(string);
}
@Override
public void updatePing() {
}
@Override
public int getPing() {
return 0;
}
@Override
public void disconnect() {
socket.close();
}
@Override
public Array<Host> discover() {
return new Array<>();
}
@Override
public void pingHost(String address, int port, Consumer<Host> valid, Consumer<IOException> failed) {
failed.accept(new IOException());
}
@Override
public void register(Class<?>... types) { }
@Override
public void dispose() {
if(socket != null) socket.close();
for(Thread thread : Thread.getAllStackTraces().keySet()){
if(thread.getName().equals("WebsocketWriteThread")) thread.interrupt();
}
}
}

View File

@@ -1,16 +1,19 @@
package io.anuke.kryonet;
import com.badlogic.gdx.Gdx;
import com.badlogic.gdx.utils.IntArray;
import com.badlogic.gdx.utils.Array;
import com.badlogic.gdx.utils.Base64Coder;
import com.esotericsoftware.kryonet.Connection;
import com.esotericsoftware.kryonet.FrameworkMessage;
import com.esotericsoftware.kryonet.Listener;
import com.esotericsoftware.kryonet.Listener.LagListener;
import com.esotericsoftware.kryonet.Server;
import com.esotericsoftware.kryonet.util.InputStreamSender;
import io.anuke.mindustry.Vars;
import io.anuke.mindustry.net.Net;
import io.anuke.mindustry.net.Net.SendMode;
import io.anuke.mindustry.net.Net.ServerProvider;
import io.anuke.mindustry.net.NetConnection;
import io.anuke.mindustry.net.Packets.Connect;
import io.anuke.mindustry.net.Packets.Disconnect;
import io.anuke.mindustry.net.Packets.KickPacket;
@@ -21,13 +24,25 @@ import io.anuke.mindustry.net.Streamable.StreamBegin;
import io.anuke.mindustry.net.Streamable.StreamChunk;
import io.anuke.ucore.UCore;
import io.anuke.ucore.core.Timers;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.CopyOnWriteArrayList;
public class KryoServer implements ServerProvider {
Server server;
IntArray connections = new IntArray();
final Server server;
final SocketServer webServer;
final ByteSerializer serializer = new ByteSerializer();
final ByteBuffer buffer = ByteBuffer.allocate(4096);
final CopyOnWriteArrayList<KryoConnection> connections = new CopyOnWriteArrayList<>();
final Array<KryoConnection> array = new Array<>();
int lastconnection = 0;
public KryoServer(){
server = new Server(4096*2, 2048, connection -> new ByteSerializer()); //TODO tweak
@@ -38,18 +53,21 @@ public class KryoServer implements ServerProvider {
datagramChannel.send(buffer, fromAddress);
return true;
});
webServer = new SocketServer(Vars.webPort);
Listener listener = new Listener(){
@Override
public void connected (Connection connection) {
KryoConnection kn = new KryoConnection(lastconnection ++, connection.getRemoteAddressTCP().toString(), connection);
Connect c = new Connect();
c.id = connection.getID();
c.id = kn.id;
c.addressTCP = connection.getRemoteAddressTCP().toString();
try {
Net.handleServerReceived(c, c.id);
connections.add(c.id);
Net.handleServerReceived(c, kn.id);
connections.add(kn);
}catch (Exception e){
Gdx.app.postRunnable(() -> {throw new RuntimeException(e);});
}
@@ -57,10 +75,12 @@ public class KryoServer implements ServerProvider {
@Override
public void disconnected (Connection connection) {
connections.removeValue(connection.getID());
KryoConnection k = getByKryoID(connection.getID());
if(k == null) return;
connections.remove(k);
Disconnect c = new Disconnect();
c.id = connection.getID();
c.id = k.id;
try{
Net.handleServerReceived(c, c.id);
@@ -71,14 +91,13 @@ public class KryoServer implements ServerProvider {
@Override
public void received (Connection connection, Object object) {
if(object instanceof FrameworkMessage) return;
KryoConnection k = getByKryoID(connection.getID());
if(object instanceof FrameworkMessage || k == null) return;
try{
Net.handleServerReceived(object, connection.getID());
Net.handleServerReceived(object, k.id);
}catch (Exception e){
//...do absolutely nothing.
e.printStackTrace();
//Gdx.app.postRunnable(() -> {throw new RuntimeException(e);});
}
}
};
@@ -93,33 +112,30 @@ public class KryoServer implements ServerProvider {
}
@Override
public IntArray getConnections() {
return connections;
public Array<KryoConnection> getConnections() {
array.clear();
for(KryoConnection c : connections){
array.add(c);
}
return array;
}
@Override
public void kick(int connection) {
Connection conn = getByID(connection);
if(conn == null){
connections.removeValue(connection);
return;
}
KryoConnection con = getByID(connection);
KickPacket p = new KickPacket();
p.reason = KickReason.kick;
conn.sendTCP(p);
Timers.runTask(1f, () -> {
if(conn.isConnected()){
conn.close();
}
});
con.send(p, SendMode.tcp);
Timers.runTask(1f, con::close);
}
@Override
public void host(int port) throws IOException {
lastconnection = 0;
server.bind(port, port);
webServer.start();
Thread thread = new Thread(() -> {
try{
@@ -136,65 +152,91 @@ public class KryoServer implements ServerProvider {
public void close() {
UCore.setPrivate(server, "shutdown", true);
new Thread(() -> server.close()).run();
new Thread(() ->{
try {
server.close();
webServer.stop();
}catch (Exception e){
Gdx.app.postRunnable(() -> {throw new RuntimeException(e);});
}
}).run();
}
@Override
public void sendStream(int id, Streamable stream) {
Connection connection = getByID(id);
KryoConnection connection = getByID(id);
if(connection == null) return;
try {
connection.addListener(new InputStreamSender(stream.stream, 512) {
int id;
if (connection.connection != null) {
protected void start () {
//send an object so the receiving side knows how to handle the following chunks
connection.connection.addListener(new InputStreamSender(stream.stream, 512) {
int id;
protected void start() {
//send an object so the receiving side knows how to handle the following chunks
StreamBegin begin = new StreamBegin();
begin.total = stream.stream.available();
begin.type = stream.getClass();
connection.connection.sendTCP(begin);
id = begin.id;
}
protected Object next(byte[] bytes) {
StreamChunk chunk = new StreamChunk();
chunk.id = id;
chunk.data = bytes;
return chunk; //wrap the byte[] with an object so the receiving side knows how to handle it.
}
});
} else {
int cid;
StreamBegin begin = new StreamBegin();
begin.total = stream.stream.available();
begin.type = stream.getClass();
connection.sendTCP(begin);
id = begin.id;
}
connection.send(begin, SendMode.tcp);
cid = begin.id;
protected Object next (byte[] bytes) {
StreamChunk chunk = new StreamChunk();
chunk.id = id;
chunk.data = bytes;
return chunk; //wrap the byte[] with an object so the receiving side knows how to handle it.
while (stream.stream.available() > 0) {
byte[] bytes = new byte[Math.min(512, stream.stream.available())];
stream.stream.read(bytes);
StreamChunk chunk = new StreamChunk();
chunk.id = cid;
chunk.data = bytes;
connection.send(chunk, SendMode.tcp);
}
}
});
}catch (IOException e){
throw new RuntimeException(e);
}
}
@Override
public void send(Object object, SendMode mode) {
if(mode == SendMode.tcp){
server.sendToAllTCP(object);
}else{
server.sendToAllUDP(object);
for(int i = 0; i < connections.size(); i ++){
connections.get(i).send(object, mode);
}
}
@Override
public void sendTo(int id, Object object, SendMode mode) {
if(mode == SendMode.tcp){
server.sendToTCP(id, object);
}else{
server.sendToUDP(id, object);
}
NetConnection conn = getByID(id);
conn.send(object, mode);
}
@Override
public void sendExcept(int id, Object object, SendMode mode) {
if(mode == SendMode.tcp){
server.sendToAllExceptTCP(id, object);
}else{
server.sendToAllExceptUDP(id, object);
for(int i = 0; i < connections.size(); i ++){
KryoConnection conn = connections.get(i);
if(conn.id != id) conn.send(object, mode);
}
}
@Override
public int getPingFor(int connection) {
return getByID(connection).getReturnTripTime();
public int getPingFor(NetConnection con) {
KryoConnection k = (KryoConnection)con;
return k.connection == null ? 0 : k.connection.getReturnTripTime();
}
@Override
@@ -204,7 +246,8 @@ public class KryoServer implements ServerProvider {
public void dispose(){
try {
server.dispose();
}catch (IOException e){
webServer.stop();
}catch (Exception e){
throw new RuntimeException(e);
}
}
@@ -213,13 +256,146 @@ public class KryoServer implements ServerProvider {
Gdx.app.postRunnable(() -> { throw new RuntimeException(e);});
}
Connection getByID(int id){
for(Connection con : server.getConnections()){
if(con.getID() == id){
KryoConnection getByID(int id){
for(int i = 0; i < connections.size(); i ++){
KryoConnection con = connections.get(i);
if(con.id == id){
return con;
}
}
return null;
}
KryoConnection getByKryoID(int id){
for(int i = 0; i < connections.size(); i ++){
KryoConnection con = connections.get(i);
if(con.connection != null && con.connection.getID() == id){
return con;
}
}
return null;
}
KryoConnection getBySocket(WebSocket socket){
for(int i = 0; i < connections.size(); i ++){
KryoConnection con = connections.get(i);
if(con.socket == socket){
return con;
}
}
return null;
}
class KryoConnection extends NetConnection{
public final WebSocket socket;
public final Connection connection;
public KryoConnection(int id, String address, WebSocket socket) {
super(id, address);
this.socket = socket;
this.connection = null;
}
public KryoConnection(int id, String address, Connection connection) {
super(id, address);
this.socket = null;
this.connection = connection;
}
@Override
public void send(Object object, SendMode mode){
if(socket != null){
try {
synchronized (buffer) {
buffer.position(0);
UCore.log("Sending object with ID " + Registrator.getID(object.getClass()));
serializer.write(buffer, object);
int pos = buffer.position();
buffer.position(0);
byte[] out = new byte[pos];
buffer.get(out);
String string = new String(Base64Coder.encode(out));
UCore.log("Sending string: " + string);
socket.send(string);
}
}catch (Exception e){
e.printStackTrace();
connections.remove(this);
}
}else if (connection != null) {
if (mode == SendMode.tcp) {
connection.sendTCP(object);
} else {
connection.sendUDP(object);
}
}
}
@Override
public void close(){
if(socket != null){
if(socket.isOpen()) socket.close();
}else if (connection != null) {
if(connection.isConnected()) connection.close();
}
}
}
class SocketServer extends WebSocketServer {
public SocketServer(int port) {
super(new InetSocketAddress(port));
}
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
Connect connect = new Connect();
connect.addressTCP = conn.getRemoteSocketAddress().toString();
UCore.log("Websocket connection recieved: " + connect.addressTCP);
KryoConnection kn = new KryoConnection(lastconnection ++, connect.addressTCP, conn);
connections.add(kn);
}
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
if (conn == null) return;
Disconnect disconnect = new Disconnect();
KryoConnection k = getBySocket(conn);
if(k != null) Net.handleServerReceived(disconnect, k.id);
}
@Override
public void onMessage(WebSocket conn, String message) {
try {
UCore.log("Got message: " + message);
KryoConnection k = getBySocket(conn);
if (k == null) return;
byte[] out = Base64Coder.decode(message);
UCore.log("Decoded: " + Arrays.toString(out));
ByteBuffer buffer = ByteBuffer.wrap(out);
Object o = serializer.read(buffer);
Net.handleServerReceived(o, k.id);
}catch (Exception e){
UCore.log("Error reading message!");
e.printStackTrace();
}
}
@Override
public void onError(WebSocket conn, Exception ex) {
UCore.log("WS error:");
ex.printStackTrace();
}
@Override
public void onStart() {
UCore.log("Web server started.");
}
}
}