Big refactor to reduce memory usage

- Remove support for non-compact responses
- Store peers as hashes, not dests
- Remove UDP connection table
- Generate and validate UDP connection IDs cryptographically
- Add support for sending UDP errors
This commit is contained in:
zzz
2025-04-27 17:04:46 -04:00
parent 0b6d22da3b
commit 1acaef9514
3 changed files with 129 additions and 76 deletions

View File

@ -16,12 +16,10 @@ package net.i2p.zzzot;
*
*/
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.concurrent.ConcurrentMap;
import net.i2p.crypto.SHA256Generator;
import net.i2p.data.Base64;
import net.i2p.data.Base32;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
@ -45,7 +43,7 @@ public class Peer extends HashMap<String, Object> {
put("peer id", id);
put("port", PORT);
// cache the 520-byte address strings
String dest = address.toBase64() + ".i2p";
String dest = address.toBase32().substring(0, 52);
String oldDest = destCache.putIfAbsent(dest, dest);
if (oldDest != null)
dest = oldDest;
@ -65,20 +63,11 @@ public class Peer extends HashMap<String, Object> {
return lastSeen;
}
/** convert b64.i2p to a Hash, then to a binary string */
/* or should we just store it in the constructor? cache it? */
public String getHash() {
try {
return new String(getHashObject().getData(), "ISO-8859-1");
} catch (UnsupportedEncodingException uee) { return null; }
}
/**
* @since 0.19
* @since 0.20
*/
public Hash getHashObject() {
public byte[] getHashBytes() {
String ip = (String) get("ip");
byte[] b = Base64.decode(ip.substring(0, ip.length() - 4));
return SHA256Generator.getInstance().calculateHash(b);
return Base32.decode(ip);
}
}

View File

@ -21,7 +21,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import net.i2p.I2PAppContext;
@ -29,8 +28,11 @@ import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionMuxedListener;
import net.i2p.client.datagram.Datagram2;
import net.i2p.client.datagram.Datagram3;
import net.i2p.crypto.SipHashInline;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Hash;
import net.i2p.i2ptunnel.I2PTunnel;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
@ -48,9 +50,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
private final Log _log;
private final I2PTunnel _tunnel;
private final ZzzOT _zzzot;
// conn ID to dest and time added
private final Map<Long, DestAndTime> _connectCache;
private final Cleaner _cleaner;
private final long sipk0, sipk1;
private volatile boolean _running;
// The listen port.
@ -65,18 +65,22 @@ public class UDPHandler implements I2PSessionMuxedListener {
private static final int EVENT_COMPLETED = 1;
private static final int EVENT_STARTED = 2;
private static final int EVENT_STOPPED = 3;
// keep it short, we should have the leaseset
private final long LOOKUP_TIMEOUT = 1000;
private final long CLEAN_TIME;
private static final byte[] INVALID = DataHelper.getUTF8("Invalid connection ID");
private static final byte[] PROTOCOL = DataHelper.getUTF8("Bad protocol");
private static final byte[] SCRAPE = DataHelper.getUTF8("Scrape unsupported");
public UDPHandler(I2PAppContext ctx, I2PTunnel tunnel, ZzzOT zzzot, int port) {
_context = ctx;
_log = ctx.logManager().getLog(UDPHandler.class);
_tunnel = tunnel;
_zzzot = zzzot;
_connectCache = new ConcurrentHashMap<Long, DestAndTime>();
CLEAN_TIME = (zzzot.getTorrents().getUDPLifetime() + 60) * 1000;
PORT = port;
_cleaner = new Cleaner();
sipk0 = ctx.random().nextLong();
sipk1 = ctx.random().nextLong();
}
public void start() {
@ -89,7 +93,6 @@ public class UDPHandler implements I2PSessionMuxedListener {
*/
public void stop() {
_running = false;
_cleaner.cancel();
}
private class Waiter implements Runnable {
@ -103,8 +106,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
}
I2PSession session = sessions.get(0);
session.addMuxedSessionListener(UDPHandler.this, I2PSession.PROTO_DATAGRAM2, PORT);
session.addMuxedSessionListener(UDPHandler.this, I2PSession.PROTO_DATAGRAM_RAW, PORT);
_cleaner.schedule(CLEAN_TIME);
session.addMuxedSessionListener(UDPHandler.this, I2PSession.PROTO_DATAGRAM3, PORT);
if (_log.shouldInfo())
_log.info("got session");
break;
@ -130,9 +132,10 @@ public class UDPHandler implements I2PSessionMuxedListener {
if (proto == I2PSession.PROTO_DATAGRAM2) {
// load datagram into it
Datagram2 dg = Datagram2.load(_context, session, msg);
handle(session, dg.getSender(), fromPort, dg.getPayload());
} else if (proto == I2PSession.PROTO_DATAGRAM_RAW) {
handle(session, null, fromPort, msg);
handle(session, dg.getSender(), null, fromPort, dg.getPayload());
} else if (proto == I2PSession.PROTO_DATAGRAM3) {
Datagram3 dg = Datagram3.load(_context, session, msg);
handle(session, null, dg.getSender(), fromPort, dg.getPayload());
} else {
if (_log.shouldWarn())
_log.warn("dropping message with unknown protocol " + proto);
@ -146,8 +149,6 @@ public class UDPHandler implements I2PSessionMuxedListener {
public void reportAbuse(I2PSession arg0, int arg1) {}
public void disconnected(I2PSession arg0) {
_cleaner.cancel();
_connectCache.clear();
}
public void errorOccurred(I2PSession arg0, String arg1, Throwable arg2) {
@ -156,7 +157,12 @@ public class UDPHandler implements I2PSessionMuxedListener {
/// end listener methods ///
private void handle(I2PSession session, Destination from, int fromPort, byte[] data) {
/**
* One of from or fromHash non-null
* @param from non-null for connect request
* @param fromHash non-null for announce request
*/
private void handle(I2PSession session, Destination from, Hash fromHash, int fromPort, byte[] data) {
int sz = data.length;
if (sz < 16) {
if (_log.shouldWarn())
@ -173,22 +179,29 @@ public class UDPHandler implements I2PSessionMuxedListener {
}
if (from == null) {
if (_log.shouldWarn())
_log.warn("dropping raw connect");
_log.warn("dropping dg3 connect");
int transID = (int) DataHelper.fromLong(data, 12, 4);
sendError(session, fromHash, fromPort, transID, PROTOCOL);
return;
}
handleConnect(session, from, fromPort, data);
} else if (action == ACTION_ANNOUNCE) {
if (from != null) {
if (fromHash == null) {
if (_log.shouldWarn())
_log.warn("dropping repliable announce");
// TODO send error?
_log.warn("dropping dg2 announce");
int transID = (int) DataHelper.fromLong(data, 12, 4);
sendError(session, from, fromPort, transID, PROTOCOL);
return;
}
handleAnnounce(session, connID, fromPort, data);
handleAnnounce(session, connID, fromHash, fromPort, data);
} else if (action == ACTION_SCRAPE) {
if (_log.shouldWarn())
_log.warn("got unsupported scrape");
// TODO send error?
int transID = (int) DataHelper.fromLong(data, 12, 4);
if (from != null)
sendError(session, from, fromPort, transID, SCRAPE);
else
sendError(session, fromHash, fromPort, transID, SCRAPE);
} else {
if (_log.shouldWarn())
_log.warn("dropping bad action " + action);
@ -201,7 +214,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
*/
private void handleConnect(I2PSession session, Destination from, int fromPort, byte[] data) {
int transID = (int) DataHelper.fromLong(data, 12, 4);
long connID = _context.random().nextLong();
long connID = generateCID(from.calculateHash());
byte[] resp = new byte[18];
DataHelper.toLong(resp, 4, 4, transID);
DataHelper.toLong8(resp, 8, connID);
@ -211,7 +224,6 @@ public class UDPHandler implements I2PSessionMuxedListener {
session.sendMessage(from, resp, I2PSession.PROTO_DATAGRAM_RAW, PORT, fromPort);
if (_log.shouldDebug())
_log.debug("sent connect reply with conn ID " + connID + " to " + from.toBase32());
_connectCache.put(Long.valueOf(connID), new DestAndTime(from, _context.clock().now()));
} catch (I2PSessionException ise) {
if (_log.shouldWarn())
_log.warn("error sending connect reply", ise);
@ -221,24 +233,28 @@ public class UDPHandler implements I2PSessionMuxedListener {
/**
* @param from may be null
*/
private void handleAnnounce(I2PSession session, long connID, int fromPort, byte[] data) {
private void handleAnnounce(I2PSession session, long connID, Hash fromHash, int fromPort, byte[] data) {
int sz = data.length;
if (sz < 96) {
if (_log.shouldWarn())
_log.warn("dropping short announce length " + sz);
return;
}
DestAndTime dat = _connectCache.get(Long.valueOf(connID));
if (dat == null) {
int transID = (int) DataHelper.fromLong(data, 12, 4);
boolean ok = validateCID(fromHash, connID);
if (!ok) {
if (_log.shouldWarn())
_log.warn("conn ID not found: " + connID);
// TODO send error?
_log.warn("conn ID invalid: " + connID);
sendError(session, fromHash, fromPort, transID, INVALID);
return;
}
Destination from = dat.dest;
// TODO use a waiter
Destination from = lookup(session, fromHash);
if (from == null)
return;
// parse packet
int transID = (int) DataHelper.fromLong(data, 12, 4);
byte[] bih = new byte[InfoHash.LENGTH];
System.arraycopy(data, 16, bih, 0, InfoHash.LENGTH);
InfoHash ih = new InfoHash(bih);
@ -320,7 +336,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
DataHelper.toLong(resp, 20, 2, count);
if (peerlist != null) {
for (int i = 0; i < count; i++) {
System.arraycopy(peerlist.get(i).getHashObject().getData(), 0, resp, 22 + (i * 32), 32);
System.arraycopy(peerlist.get(i).getHashBytes(), 0, resp, 22 + (i * 32), 32);
}
}
@ -334,30 +350,75 @@ public class UDPHandler implements I2PSessionMuxedListener {
}
}
private static class DestAndTime {
public final Destination dest;
public final long time;
/**
* @param from non-null
* @param msg non-null
*/
private void sendError(I2PSession session, Hash toHash, int toPort, long transID, byte[] msg) {
// TODO use a waiter
Destination to = lookup(session, toHash);
if (to == null)
return;
sendError(session, to, toPort, transID, msg);
}
public DestAndTime(Destination d, long t) {
dest = d;
time = t;
/**
* @param from non-null
* @param msg non-null
*/
private void sendError(I2PSession session, Destination to, int toPort, long transID, byte[] msg) {
byte[] resp = new byte[8 + msg.length];
DataHelper.toLong(resp, 0, 4, ACTION_ERROR);
DataHelper.toLong(resp, 4, 4, transID);
System.arraycopy(msg, 0, resp, 8, msg.length);
try {
session.sendMessage(to, resp, I2PSession.PROTO_DATAGRAM_RAW, PORT, toPort);
if (_log.shouldDebug())
_log.debug("sent error to " + to.toBase32());
} catch (I2PSessionException ise) {
if (_log.shouldWarn())
_log.warn("error sending connect reply", ise);
}
}
private class Cleaner extends SimpleTimer2.TimedEvent {
public Cleaner() { super(_context.simpleTimer2()); }
public void timeReached() {
if (!_connectCache.isEmpty()) {
long exp = _context.clock().now() - CLEAN_TIME;
for (Iterator<DestAndTime> iter = _connectCache.values().iterator(); iter.hasNext(); ) {
DestAndTime dat = iter.next();
if (dat.time < exp)
iter.remove();
}
}
schedule(CLEAN_TIME);
/**
* Blocking.
* @return null on failure
*/
private Destination lookup(I2PSession session, Hash hash) {
// TODO use a waiter
Destination rv = null;
try {
rv = session.lookupDest(hash, LOOKUP_TIMEOUT);
} catch (I2PSessionException ise) {}
if (rv == null) {
if (_log.shouldWarn())
_log.warn("lookup failed for response to " + hash.toBase32());
}
return rv;
}
private long generateCID(Hash hash) {
byte[] buf = new byte[40];
System.arraycopy(hash.getData(), 0, buf, 0, 32);
long time = _context.clock().now() / CLEAN_TIME;
DataHelper.toLong8(buf, 32, time);
return SipHashInline.hash24(sipk0, sipk1, buf);
}
private boolean validateCID(Hash hash, long cid) {
byte[] buf = new byte[40];
System.arraycopy(hash.getData(), 0, buf, 0, 32);
// current epoch
long time = _context.clock().now() / CLEAN_TIME;
DataHelper.toLong8(buf, 32, time);
long c = SipHashInline.hash24(sipk0, sipk1, buf);
if (cid == c)
return true;
// previous epoch
time--;
DataHelper.toLong8(buf, 32, time);
c = SipHashInline.hash24(sipk0, sipk1, buf);
return cid == c;
}
}

View File

@ -29,6 +29,7 @@
final int MAX_RESPONSES = 25;
final boolean ALLOW_IP_MISMATCH = false;
final boolean ALLOW_COMPACT_RESPONSE = true;
final boolean ALLOW_NONCOMPACT_RESPONSE = false;
// so the chars will turn into bytes correctly
request.setCharacterEncoding("ISO-8859-1");
@ -68,6 +69,11 @@
response.setStatus(403);
}
if (!compact && !ALLOW_NONCOMPACT_RESPONSE && !fail) {
fail = true;
msg = "non-compact responses unsupported";
}
if (info_hash == null && !fail) {
fail = true;
msg = "no info hash";
@ -236,18 +242,15 @@
}
}
if (compact) {
// old experimental way - list of hashes
//List<String> peerhashes = new ArrayList(peerlist.size());
//for (Peer pe : peerlist) {
// peerhashes.add(pe.getHash());
//}
// new way - one big string
// one big string
byte[] peerhashes = new byte[32 * peerlist.size()];
for (int i = 0; i < peerlist.size(); i++)
System.arraycopy(peerlist.get(i).getHash().getBytes("ISO-8859-1"), 0, peerhashes, i * 32, 32);
System.arraycopy(peerlist.get(i).getHashBytes(), 0, peerhashes, i * 32, 32);
m.put("peers", peerhashes);
} else {
} else if (ALLOW_NONCOMPACT_RESPONSE) {
m.put("peers", peerlist);
} else {
// won't get here
}
}
}