Thread I2CP lookups

until we have a nonblocking I2CP lookup API.
Most of the time, this won't be needed, as our cache
will have it, but if additional announces come in over
the lifetime, we'll have to do a I2CP lookup.
Even then, the I2CP cache may have it,
or the router should have the LS, so it should be quick.
This commit is contained in:
zzz
2025-05-01 18:40:29 -04:00
parent 02328bd5d4
commit 5c08658360
3 changed files with 137 additions and 20 deletions

View File

@ -36,6 +36,13 @@ public class Peer {
hash = address.calculateHash();
}
/**
* @since 0.20.0
*/
public Peer(byte[] id, Hash h) {
hash = h;
}
public void setLeft(long l) {
bytesLeft = l;
lastSeen = System.currentTimeMillis();

View File

@ -22,6 +22,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
@ -57,6 +63,9 @@ public class UDPHandler implements I2PSessionMuxedListener {
private final Map<Hash, Destination> _destCache;
private final AtomicInteger _announces = new AtomicInteger();
private volatile boolean _running;
private ThreadPoolExecutor _executor;
/** how long to wait before dropping an idle thread */
private static final long HANDLER_KEEPALIVE_MS = 2*60*1000;
// The listen port.
public final int PORT;
@ -70,8 +79,9 @@ public class UDPHandler implements I2PSessionMuxedListener {
private static final int EVENT_COMPLETED = 1;
private static final int EVENT_STARTED = 2;
private static final int EVENT_STOPPED = 3;
// keep it short, we should have the leaseset
private final long LOOKUP_TIMEOUT = 1000;
// keep it short, we should have the leaseset,
// if a new ratchet session was created
private final long LOOKUP_TIMEOUT = 2000;
private final long CLEAN_TIME;
private final long STAT_TIME = 2*60*1000;
private static final byte[] INVALID = DataHelper.getUTF8("Invalid connection ID");
@ -93,8 +103,10 @@ public class UDPHandler implements I2PSessionMuxedListener {
_destCache = new LHMCache<Hash, Destination>(1024);
}
public void start() {
public synchronized void start() {
_running = true;
_executor = new CustomThreadPoolExecutor();
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
(new I2PAppThread(new Waiter(), "ZzzOT UDP startup", true)).start();
long[] r = new long[] { 5*60*1000 };
_context.statManager().createRequiredRateStat("plugin.zzzot.announces.udp", "UDP announces per minute", "Plugins", r);
@ -103,8 +115,11 @@ public class UDPHandler implements I2PSessionMuxedListener {
/**
* @since 0.20.0
*/
public void stop() {
public synchronized void stop() {
_running = false;
_executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
_executor.shutdownNow();
_executor = null;
_cleaner.cancel();
_context.statManager().removeRateStat("plugin.zzzot.announces.udp");
_announces.set(0);
@ -269,11 +284,6 @@ public class UDPHandler implements I2PSessionMuxedListener {
return;
}
// TODO use a waiter
Destination from = lookup(session, fromHash);
if (from == null)
return;
// parse packet
byte[] bih = new byte[InfoHash.LENGTH];
System.arraycopy(data, 16, bih, 0, InfoHash.LENGTH);
@ -316,7 +326,7 @@ public class UDPHandler implements I2PSessionMuxedListener {
} else {
Peer p = peers.get(pid);
if (p == null) {
p = new Peer(pid.getData(), from);
p = new Peer(pid.getData(), fromHash);
Peer p2 = peers.putIfAbsent(pid, p);
if (p2 != null)
p = p2;
@ -360,6 +370,17 @@ public class UDPHandler implements I2PSessionMuxedListener {
}
}
Destination from = lookupCache(fromHash);
if (from == null) {
try {
_executor.execute(new Lookup(session, fromHash, fromPort, resp));
} catch (RejectedExecutionException ree) {
if (_log.shouldWarn())
_log.warn("error sending announce reply - thread pool full");
}
return;
}
try {
session.sendMessage(from, resp, I2PSession.PROTO_DATAGRAM_RAW, PORT, fromPort);
if (_log.shouldDebug())
@ -375,10 +396,13 @@ public class UDPHandler implements I2PSessionMuxedListener {
* @param msg non-null
*/
private void sendError(I2PSession session, Hash toHash, int toPort, long transID, byte[] msg) {
// TODO use a waiter
Destination to = lookup(session, toHash);
if (to == null)
Destination to = lookupCache(toHash);
if (to == null) {
if (_log.shouldInfo())
_log.info("don't have cached dest to send error to " + toHash.toBase32());
return;
}
// don't bother looking up via I2CP
sendError(session, to, toPort, transID, msg);
}
@ -406,16 +430,37 @@ public class UDPHandler implements I2PSessionMuxedListener {
* @return null on failure
*/
private Destination lookup(I2PSession session, Hash hash) {
Destination rv;
synchronized(_destCache) {
rv = _destCache.get(hash);
}
Destination rv = lookupCache(hash);
if (rv != null)
return rv;
// TODO use a waiter
return lookupI2CP(session, hash);
}
/**
* Nonblocking.
* @return null on failure
*/
private Destination lookupCache(Hash hash) {
// Test deferred
//if (true) return null;
synchronized(_destCache) {
return _destCache.get(hash);
}
}
/**
* Blocking.
* @return null on failure
*/
private Destination lookupI2CP(I2PSession session, Hash hash) {
Destination rv;
try {
rv = session.lookupDest(hash, LOOKUP_TIMEOUT);
} catch (I2PSessionException ise) {}
} catch (I2PSessionException ise) {
if (_log.shouldWarn())
_log.warn("lookup error", ise);
return null;
}
if (rv == null) {
if (_log.shouldWarn())
_log.warn("lookup failed for response to " + hash.toBase32());
@ -458,4 +503,69 @@ public class UDPHandler implements I2PSessionMuxedListener {
schedule(STAT_TIME);
}
}
/**
* Until we have a nonblocking lookup API in I2CP
*
* @since 0.20.0
*/
private class Lookup implements Runnable {
private final I2PSession _session;
private final Hash _hash;
private final int _port;
private final byte[] _msg;
public Lookup(I2PSession sess, Hash h, int port, byte[] msg) {
_session = sess;
_hash = h;
_port = port;
_msg = msg;
}
public void run() {
// blocking
Destination d = lookupI2CP(_session, _hash);
if (d == null) {
if (_log.shouldWarn())
_log.warn("deferred lookup failed for " + _hash.toBase32());
return;
}
try {
_session.sendMessage(d, _msg, I2PSession.PROTO_DATAGRAM_RAW, PORT, _port);
if (_log.shouldDebug())
_log.debug("sent deferred reply to " + _hash.toBase32());
} catch (I2PSessionException ise) {
if (_log.shouldWarn())
_log.warn("error sending deferred reply", ise);
}
}
}
/**
* Until we have a nonblocking lookup API in I2CP
*
* @since 0.20.0
*/
private static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor() {
super(0, 25, HANDLER_KEEPALIVE_MS, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new CustomThreadFactory());
}
}
/**
* Just to set the name and set Daemon
*
* @since 0.20.0
*/
private static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger _executorThreadCount = new AtomicInteger();
public Thread newThread(Runnable r) {
Thread rv = Executors.defaultThreadFactory().newThread(r);
rv.setName("ZzzOT lookup " + _executorThreadCount.incrementAndGet());
rv.setDaemon(true);
return rv;
}
}
}

View File

@ -82,7 +82,7 @@ public class ZzzOTController implements ClientApp {
private static final String NAME = "ZzzOT";
private static final String DEFAULT_SITENAME = "ZZZOT";
private static final String PROP_SITENAME = "sitename";
private static final String VERSION = "0.20.0-beta";
private static final String VERSION = "0.20.0-beta2";
private static final String DEFAULT_SHOWFOOTER = "true";
private static final String PROP_SHOWFOOTER = "showfooter";
private static final String DEFAULT_FOOTERTEXT = "Running <a href=\"http://git.idk.i2p/i2p-hackers/i2p.plugins.zzzot\" target=\"_blank\">ZZZOT</a> " + VERSION;