[freenet-cvs] r17479 - in trunk/freenet/src/freenet: io/comm node

toad at freenetproject.org toad at freenetproject.org
Sat Feb 2 20:05:30 UTC 2008


Author: toad
Date: 2008-02-02 20:05:30 +0000 (Sat, 02 Feb 2008)
New Revision: 17479

Modified:
   trunk/freenet/src/freenet/io/comm/DMT.java
   trunk/freenet/src/freenet/node/CHKInsertHandler.java
   trunk/freenet/src/freenet/node/FailureTable.java
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeClientCore.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/NodeStats.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestHandler.java
   trunk/freenet/src/freenet/node/RequestSender.java
   trunk/freenet/src/freenet/node/RequestStarter.java
   trunk/freenet/src/freenet/node/SSKInsertHandler.java
Log:
Another big chunk of ULPRs.

Modified: trunk/freenet/src/freenet/io/comm/DMT.java
===================================================================
--- trunk/freenet/src/freenet/io/comm/DMT.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/io/comm/DMT.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -858,6 +858,39 @@
 		return msg;
 	}
 	
+	// Short timeout so PRIORITY_HIGH
+	public static MessageType FNPGetOfferedKey = new MessageType("FNPGetOfferedKey", PRIORITY_HIGH) {{
+		addField(KEY, Key.class);
+		addField(OFFER_AUTHENTICATOR, ShortBuffer.class);
+		addField(NEED_PUB_KEY, Boolean.class);
+		addField(UID, Long.class);
+	}};
+	
+	public static Message createFNPGetOfferedKey(Key key, byte[] authenticator, boolean needPubkey, long uid) {
+		Message msg = new Message(FNPGetOfferedKey);
+		msg.set(KEY, key);
+		msg.set(OFFER_AUTHENTICATOR, new ShortBuffer(authenticator));
+		msg.set(NEED_PUB_KEY, needPubkey);
+		msg.set(UID, uid);
+		return msg;
+	}
+	
+	// Permanently rejected. RejectedOverload means temporarily rejected.
+	public static MessageType FNPGetOfferedKeyInvalid = new MessageType("FNPGetOfferedKeyInvalid", PRIORITY_HIGH) {{ // short timeout
+		addField(UID, Long.class);
+		addField(REASON, Short.class);
+	}};
+	
+	public static Message createFNPGetOfferedKeyInvalid(long uid, short reason) {
+		Message msg = new Message(FNPGetOfferedKeyInvalid);
+		msg.set(UID, uid);
+		msg.set(REASON, reason);
+		return msg;
+	}
+	
+	public static short GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR = 1;
+	public static short GET_OFFERED_KEY_REJECTED_NO_KEY = 2;
+	
 	public static final MessageType FNPPing = new MessageType("FNPPing", PRIORITY_HIGH) {{
 		addField(PING_SEQNO, Integer.class);
 	}};

Modified: trunk/freenet/src/freenet/node/CHKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/CHKInsertHandler.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/CHKInsertHandler.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -80,7 +80,7 @@
             Logger.error(this, "Caught in run() "+t, t);
         } finally {
         	if(logMINOR) Logger.minor(this, "Exiting CHKInsertHandler.run() for "+uid);
-            node.unlockUID(uid, false, true, false);
+            node.unlockUID(uid, false, true, false, false);
         }
     }
 

Modified: trunk/freenet/src/freenet/node/FailureTable.java
===================================================================
--- trunk/freenet/src/freenet/node/FailureTable.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/FailureTable.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -4,10 +4,19 @@
 package freenet.node;
 
 import java.lang.ref.WeakReference;
+import java.util.Vector;
 
+import freenet.io.comm.DMT;
+import freenet.io.comm.Message;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.xfer.BlockTransmitter;
+import freenet.io.xfer.PartiallyReceivedBlock;
+import freenet.keys.CHKBlock;
 import freenet.keys.Key;
 import freenet.keys.KeyBlock;
 import freenet.keys.NodeCHK;
+import freenet.keys.NodeSSK;
+import freenet.keys.SSKBlock;
 import freenet.support.LRUHashtable;
 
 // FIXME it is ESSENTIAL that we delete the ULPR data on requestors etc once we have found the key.
@@ -140,7 +149,7 @@
 			this.offers = new BlockOffer[] { offer };
 		}
 
-		public long expires() {
+		public synchronized long expires() {
 			long last = 0;
 			for(int i=0;i<offers.length;i++) {
 				if(offers[i].offeredTime > last) last = offers[i].offeredTime;
@@ -148,26 +157,62 @@
 			return last + OFFER_EXPIRY_TIME;
 		}
 
-		public boolean isEmpty(long now) {
+		public synchronized boolean isEmpty(long now) {
 			for(int i=0;i<offers.length;i++) {
 				if(offers[i].offeredTime > now) return false;
 			}
 			return true;
 		}
+
+		public synchronized void deleteOffer(BlockOffer offer) {
+			int idx = -1;
+			for(int i=0;i<offers.length;i++) {
+				if(offers[i] == offer) idx = i;
+			}
+			if(idx == -1) return;
+			BlockOffer[] newOffers = new BlockOffer[offers.length-1];
+			if(idx > 0)
+				System.arraycopy(offers, 0, newOffers, 0, idx);
+			if(idx < newOffers.length)
+				System.arraycopy(offers, idx+1, newOffers, idx, offers.length-idx);
+			offers = newOffers;
+		}
+
+		public synchronized void addOffer(BlockOffer offer) {
+			BlockOffer[] newOffers = new BlockOffer[offers.length+1];
+			System.arraycopy(offers, 0, newOffers, 0, offers.length);
+			newOffers[offers.length] = offer;
+			offers = newOffers;
+		}
 	}
 	
-	private final class BlockOffer {
+	final class BlockOffer {
 		final long offeredTime;
 		/** Either offered by or offered to this node */
 		final WeakReference nodeRef;
 		/** Authenticator */
 		final byte[] authenticator;
+		/** Boot ID when the offer was made */
+		final long bootID;
 		
-		BlockOffer(PeerNode pn, long now, byte[] authenticator) {
+		BlockOffer(PeerNode pn, long now, byte[] authenticator, long bootID) {
 			this.nodeRef = pn.myRef;
 			this.offeredTime = now;
 			this.authenticator = authenticator;
+			this.bootID = bootID;
 		}
+
+		public PeerNode getPeerNode() {
+			return (PeerNode) nodeRef.get();
+		}
+
+		public boolean isExpired(long now) {
+			return now > (offeredTime + OFFER_EXPIRY_TIME);
+		}
+
+		public boolean isExpired() {
+			return isExpired(System.currentTimeMillis());
+		}
 	}
 	
 	/**
@@ -181,6 +226,7 @@
 			entry = (FailureTableEntry) entriesByKey.get(key);
 			if(entry == null) return; // Nobody cares
 			entriesByKey.removeKey(key);
+			blockOfferListByKey.removeKey(key);
 		}
 		entry.offer();
 	}
@@ -250,9 +296,11 @@
 			// Add to offers list
 			
 			BlockOfferList bl = (BlockOfferList) blockOfferListByKey.get(key);
-			BlockOffer offer = new BlockOffer(peer, now, authenticator);
+			BlockOffer offer = new BlockOffer(peer, now, authenticator, peer.getBootID());
 			if(bl == null) {
 				bl = new BlockOfferList(entry, offer);
+			} else {
+				bl.addOffer(offer);
 			}
 			blockOfferListByKey.push(key, offer);
 			trimOffersList(now);
@@ -276,4 +324,116 @@
 			}
 		}
 	}
+
+	/**
+	 * We offered a key, a node has responded to the offer. Note that this runs on the incoming
+	 * packets thread so should allocate a new thread if it does anything heavy.
+	 * @param key The key to send.
+	 * @param isSSK Whether it is an SSK.
+	 * @param uid The UID.
+	 * @param source The node that asked for the key.
+	 * @throws NotConnectedException If the sender ceases to be connected.
+	 */
+	public void sendOfferedKey(Key key, boolean isSSK, boolean needPubKey, long uid, PeerNode source) throws NotConnectedException {
+		if(isSSK) {
+			SSKBlock block = node.fetch((NodeSSK)key, false);
+			if(block == null) {
+				// Don't have the key
+				source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, null);
+				return;
+			}
+			Message df = DMT.createFNPSSKDataFound(uid, block.getRawHeaders(), block.getRawData());
+			source.sendAsync(df, null, 0, null);
+			if(needPubKey) {
+				Message pk = DMT.createFNPSSKPubKey(uid, block.getPubKey());
+				source.sendAsync(pk, null, 0, null);
+			}
+		} else {
+			CHKBlock block = node.fetch((NodeCHK)key, false);
+			if(block == null) {
+				// Don't have the key
+				source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, DMT.GET_OFFERED_KEY_REJECTED_NO_KEY), null, 0, null);
+				return;
+			}
+			Message df = DMT.createFNPCHKDataFound(uid, block.getRawHeaders());
+			source.sendAsync(df, null, 0, null);
+        	PartiallyReceivedBlock prb =
+        		new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE, block.getRawData());
+        	BlockTransmitter bt =
+        		new BlockTransmitter(node.usm, source, uid, prb, node.outputThrottle, null);
+        	bt.sendAsync(node.executor);
+		}
+	}
+	
+	class OfferList {
+
+		OfferList(BlockOfferList offerList) {
+			this.offerList = offerList;
+			recentOffers = new Vector();
+			expiredOffers = new Vector();
+			long now = System.currentTimeMillis();
+			BlockOffer[] offers = offerList.offers;
+			for(int i=0;i<offers.length;i++) {
+				if(!offers[i].isExpired(now))
+					recentOffers.add(offers[i]);
+				else
+					expiredOffers.add(offers[i]);
+			}
+		}
+		
+		private final BlockOfferList offerList;
+		
+		private final Vector recentOffers;
+		private final Vector expiredOffers;
+		
+		/** The last offer we returned */
+		private BlockOffer lastOffer;
+		
+		public BlockOffer getFirstOffer() {
+			if(lastOffer != null) {
+				throw new IllegalStateException("Last offer not dealt with");
+			}
+			if(!recentOffers.isEmpty()) {
+				int x = node.random.nextInt(recentOffers.size());
+				return lastOffer = (BlockOffer) recentOffers.remove(x);
+			}
+			if(!expiredOffers.isEmpty()) {
+				int x = node.random.nextInt(expiredOffers.size());
+				return lastOffer = (BlockOffer) expiredOffers.remove(x);
+			}
+			// No more offers.
+			return null;
+		}
+		
+		/**
+		 * Delete the last offer - we have used it, successfully or not.
+		 */
+		public void deleteLastOffer() {
+			offerList.deleteOffer(lastOffer);
+			lastOffer = null;
+		}
+
+		/**
+		 * Keep the last offer - we weren't able to use it e.g. because of RejectedOverload.
+		 * Maybe it will be useful again in the future.
+		 */
+		public void keepLastOffer() {
+			lastOffer = null;
+		}
+		
+	}
+
+	public OfferList getOffers(Key key) {
+		BlockOfferList bl;
+		synchronized(this) {
+			bl = (BlockOfferList) blockOfferListByKey.get(key);
+			if(bl == null) return null;
+		}
+		return new OfferList(bl);
+	}
+
+	/** Called when a node disconnects */
+	public void onDisconnect(final PeerNode pn) {
+		// FIXME do something (off thread if expensive)
+	}
 }

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/Node.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -319,6 +319,8 @@
 	private final HashSet runningSSKGetUIDs;
 	private final HashSet runningCHKPutUIDs;
 	private final HashSet runningSSKPutUIDs;
+	private final HashSet runningCHKOfferReplyUIDs;
+	private final HashSet runningSSKOfferReplyUIDs;
 	
 	/** Semi-unique ID for swap requests. Used to identify us so that the
 	 * topology can be reconstructed. */
@@ -613,6 +615,8 @@
 		runningSSKGetUIDs = new HashSet();
 		runningCHKPutUIDs = new HashSet();
 		runningSSKPutUIDs = new HashSet();
+		runningCHKOfferReplyUIDs = new HashSet();
+		runningSSKOfferReplyUIDs = new HashSet();
 		
 		// Directory for node-related files other than store
 		
@@ -2179,10 +2183,10 @@
 		return is;
 	}
 	
-	public boolean lockUID(long uid, boolean ssk, boolean insert) {
+	public boolean lockUID(long uid, boolean ssk, boolean insert, boolean offerReply) {
 		if(logMINOR) Logger.minor(this, "Locking "+uid);
 		Long l = new Long(uid);
-		HashSet set = getUIDTracker(ssk, insert);
+		HashSet set = getUIDTracker(ssk, insert, offerReply);
 		synchronized(set) {
 			set.add(l);
 		}
@@ -2193,11 +2197,11 @@
 		}
 	}
 	
-	public void unlockUID(long uid, boolean ssk, boolean insert, boolean canFail) {
+	public void unlockUID(long uid, boolean ssk, boolean insert, boolean canFail, boolean offerReply) {
 		if(logMINOR) Logger.minor(this, "Unlocking "+uid);
 		Long l = new Long(uid);
 		completed(uid);
-		HashSet set = getUIDTracker(ssk, insert);
+		HashSet set = getUIDTracker(ssk, insert, offerReply);
 		synchronized(set) {
 			set.remove(l);
 		}
@@ -2207,10 +2211,14 @@
 		}
 	}
 
-	HashSet getUIDTracker(boolean ssk, boolean insert) {
+	HashSet getUIDTracker(boolean ssk, boolean insert, boolean offerReply) {
 		if(ssk) {
+			if(offerReply)
+				return runningSSKOfferReplyUIDs;
 			return insert ? runningSSKPutUIDs : runningSSKGetUIDs;
 		} else {
+			if(offerReply)
+				return runningCHKOfferReplyUIDs;
 			return insert ? runningCHKPutUIDs : runningCHKGetUIDs;
 		}
 	}
@@ -2289,6 +2297,14 @@
 		return runningCHKPutUIDs.size();
 	}
 	
+	public int getNumSSKOfferReplies() {
+		return runningSSKOfferReplyUIDs.size();
+	}
+	
+	public int getNumCHKOfferReplies() {
+		return runningCHKOfferReplyUIDs.size();
+	}
+	
 	public int getNumTransferringRequestSenders() {
 		synchronized(transferringRequestSenders) {
 			return transferringRequestSenders.size();

Modified: trunk/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeClientCore.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/NodeClientCore.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -450,7 +450,7 @@
 		logMINOR = Logger.shouldLog(Logger.MINOR, this);
 		long startTime = System.currentTimeMillis();
 		long uid = random.nextLong();
-		if(!node.lockUID(uid, false, false)) {
+		if(!node.lockUID(uid, false, false, false)) {
 			Logger.error(this, "Could not lock UID just randomly generated: "+uid+" - probably indicates broken PRNG");
 			throw new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
 		}
@@ -507,7 +507,8 @@
 						(status == RequestSender.RECENTLY_FAILED) ||
 						(status == RequestSender.SUCCESS) ||
 						(status == RequestSender.ROUTE_NOT_FOUND) ||
-						(status == RequestSender.VERIFY_FAILURE))) {
+						(status == RequestSender.VERIFY_FAILURE) ||
+						(status == RequestSender.GET_OFFER_VERIFY_FAILURE))) {
 					long rtt = System.currentTimeMillis() - startTime;
 					if(!rejectedOverload)
 						requestStarters.requestCompleted(false, false);
@@ -539,8 +540,10 @@
 				case RequestSender.ROUTE_NOT_FOUND:
 					throw new LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
 				case RequestSender.TRANSFER_FAILED:
+				case RequestSender.GET_OFFER_TRANSFER_FAILED:
 					throw new LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
 				case RequestSender.VERIFY_FAILURE:
+				case RequestSender.GET_OFFER_VERIFY_FAILURE:
 					throw new LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
 				case RequestSender.GENERATED_REJECTED_OVERLOAD:
 				case RequestSender.TIMED_OUT:
@@ -554,7 +557,7 @@
 			}
 		}
 		} finally {
-			node.unlockUID(uid, false, false, true);
+			node.unlockUID(uid, false, false, true, false);
 		}
 	}
 
@@ -562,7 +565,7 @@
 		logMINOR = Logger.shouldLog(Logger.MINOR, this);
 		long startTime = System.currentTimeMillis();
 		long uid = random.nextLong();
-		if(!node.lockUID(uid, true, false)) {
+		if(!node.lockUID(uid, true, false, false)) {
 			Logger.error(this, "Could not lock UID just randomly generated: "+uid+" - probably indicates broken PRNG");
 			throw new LowLevelGetException(LowLevelGetException.INTERNAL_ERROR);
 		}
@@ -620,7 +623,8 @@
 						(status == RequestSender.RECENTLY_FAILED) ||
 						(status == RequestSender.SUCCESS) ||
 						(status == RequestSender.ROUTE_NOT_FOUND) ||
-						(status == RequestSender.VERIFY_FAILURE))) {
+						(status == RequestSender.VERIFY_FAILURE) ||
+						(status == RequestSender.GET_OFFER_VERIFY_FAILURE))) {
 					long rtt = System.currentTimeMillis() - startTime;
 					
 					if(!rejectedOverload)
@@ -651,9 +655,11 @@
 				case RequestSender.ROUTE_NOT_FOUND:
 					throw new LowLevelGetException(LowLevelGetException.ROUTE_NOT_FOUND);
 				case RequestSender.TRANSFER_FAILED:
+				case RequestSender.GET_OFFER_TRANSFER_FAILED:
 					Logger.error(this, "WTF? Transfer failed on an SSK? on "+uid);
 					throw new LowLevelGetException(LowLevelGetException.TRANSFER_FAILED);
 				case RequestSender.VERIFY_FAILURE:
+				case RequestSender.GET_OFFER_VERIFY_FAILURE:
 					throw new LowLevelGetException(LowLevelGetException.VERIFY_FAILED);
 				case RequestSender.GENERATED_REJECTED_OVERLOAD:
 				case RequestSender.TIMED_OUT:
@@ -666,7 +672,7 @@
 			}
 		}
 		} finally {
-			node.unlockUID(uid, true, false, true);
+			node.unlockUID(uid, true, false, true, false);
 		}
 	}
 
@@ -686,7 +692,7 @@
 		PartiallyReceivedBlock prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE, data);
 		CHKInsertSender is;
 		long uid = random.nextLong();
-		if(!node.lockUID(uid, false, true)) {
+		if(!node.lockUID(uid, false, true, false)) {
 			Logger.error(this, "Could not lock UID just randomly generated: "+uid+" - probably indicates broken PRNG");
 			throw new LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
 		}
@@ -795,7 +801,7 @@
 			}
 		}
 		} finally {
-			node.unlockUID(uid, false, true, true);
+			node.unlockUID(uid, false, true, true, false);
 		}
 	}
 
@@ -803,7 +809,7 @@
 		logMINOR = Logger.shouldLog(Logger.MINOR, this);
 		SSKInsertSender is;
 		long uid = random.nextLong();
-		if(!node.lockUID(uid, true, true)) {
+		if(!node.lockUID(uid, true, true, false)) {
 			Logger.error(this, "Could not lock UID just randomly generated: "+uid+" - probably indicates broken PRNG");
 			throw new LowLevelPutException(LowLevelPutException.INTERNAL_ERROR);
 		}
@@ -921,7 +927,7 @@
 			}
 		}
 		} finally {
-			node.unlockUID(uid, true, true, true);
+			node.unlockUID(uid, true, true, true, false);
 		}
 	}
 

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -15,6 +15,7 @@
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.Peer;
 import freenet.keys.Key;
+import freenet.keys.NodeSSK;
 import freenet.support.Fields;
 import freenet.support.Logger;
 import freenet.support.ShortBuffer;
@@ -160,7 +161,9 @@
 //			return handleProbeTrace(m, source);
 		} else if(spec == DMT.FNPOfferKey) {
 			return handleOfferKey(m, source);
-		} 
+		} else if(spec == DMT.FNPGetOfferedKey) {
+			return handleGetOfferedKey(m, source);
+		}
 		return false;
 	}
 
@@ -171,6 +174,52 @@
 		return true;
 	}
 
+	private boolean handleGetOfferedKey(Message m, PeerNode source) {
+		Key key = (Key) m.getObject(DMT.KEY);
+		byte[] authenticator = ((ShortBuffer) m.getObject(DMT.OFFER_AUTHENTICATOR)).getData();
+		long uid = m.getLong(DMT.UID);
+		HMAC hash = new HMAC(SHA256.getInstance());
+		if(!hash.verify(node.failureTable.offerAuthenticatorKey, key.getFullKey(), authenticator)) {
+			Logger.error(this, "Invalid offer from "+source+" : authenticator did not verify");
+			try {
+				source.sendAsync(DMT.createFNPGetOfferedKeyInvalid(uid, DMT.GET_OFFERED_KEY_REJECTED_BAD_AUTHENTICATOR), null, 0, null);
+			} catch (NotConnectedException e) {
+				// Too bad.
+			}
+			return true;
+		}
+		if(logMINOR) Logger.minor(this, "Valid GetOfferedKey for "+key+" from "+source);
+		
+		// Do we want it? We can RejectOverload if we don't have the bandwidth...
+		boolean isSSK = key instanceof NodeSSK;
+		boolean needPubKey = m.getBoolean(DMT.NEED_PUB_KEY);
+		if(isSSK) {
+			
+		}
+		String reject = 
+			nodeStats.shouldRejectRequest(true, false, isSSK, false, true, source);
+		if(reject != null) {
+			Logger.normal(this, "Rejecting FNPGetOfferedKey from "+source+" for "+key+" : "+reject);
+			Message rejected = DMT.createFNPRejectedOverload(uid, true);
+			try {
+				source.sendAsync(rejected, null, 0, null);
+			} catch (NotConnectedException e) {
+				Logger.normal(this, "Rejecting (overload) data request from "+source.getPeer()+": "+e);
+			}
+			node.unlockUID(uid, isSSK, false, false, false);
+			return true;
+		}
+		
+		// Accept it.
+		
+		try {
+			node.failureTable.sendOfferedKey(key, isSSK, needPubKey, uid, source);
+		} catch (NotConnectedException e) {
+			// Too bad.
+		}
+		return true;
+	}
+
 	private void handleDisconnect(final Message m, final PeerNode source) {
 		// Must run ON the packet sender thread as it sends a packet directly
 		node.getTicker().queueTimedJob(new FastRunnable() {
@@ -229,7 +278,7 @@
 			}
 			return true;
 		}
-		if(!node.lockUID(id, isSSK, false)) {
+		if(!node.lockUID(id, isSSK, false, false)) {
 			if(logMINOR) Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already running)");
 			Message rejected = DMT.createFNPRejectedLoop(id);
 			try {
@@ -241,7 +290,7 @@
 		} else {
 			if(logMINOR) Logger.minor(this, "Locked "+id);
 		}
-		String rejectReason = nodeStats.shouldRejectRequest(!isSSK, false, isSSK, false, source);
+		String rejectReason = nodeStats.shouldRejectRequest(!isSSK, false, isSSK, false, false, source);
 		if(rejectReason != null) {
 			// can accept 1 CHK request every so often, but not with SSKs because they aren't throttled so won't sort out bwlimitDelayTime, which was the whole reason for accepting them when overloaded...
 			Logger.normal(this, "Rejecting request from "+source.getPeer()+" preemptively because "+rejectReason);
@@ -251,7 +300,7 @@
 			} catch (NotConnectedException e) {
 				Logger.normal(this, "Rejecting (overload) data request from "+source.getPeer()+": "+e);
 			}
-			node.unlockUID(id, isSSK, false, false);
+			node.unlockUID(id, isSSK, false, false, false);
 			return true;
 		}
 		//if(!node.lockUID(id)) return false;
@@ -271,7 +320,7 @@
 			}
 			return true;
 		}
-		if(!node.lockUID(id, isSSK, true)) {
+		if(!node.lockUID(id, isSSK, true, false)) {
 			if(logMINOR) Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already running)");
 			Message rejected = DMT.createFNPRejectedLoop(id);
 			try {
@@ -282,7 +331,7 @@
 			return true;
 		}
 		// SSKs don't fix bwlimitDelayTime so shouldn't be accepted when overloaded.
-		String rejectReason = nodeStats.shouldRejectRequest(!isSSK, true, isSSK, false, source);
+		String rejectReason = nodeStats.shouldRejectRequest(!isSSK, true, isSSK, false, false, source);
 		if(rejectReason != null) {
 			Logger.normal(this, "Rejecting insert from "+source.getPeer()+" preemptively because "+rejectReason);
 			Message rejected = DMT.createFNPRejectedOverload(id, true);
@@ -291,7 +340,7 @@
 			} catch (NotConnectedException e) {
 				Logger.normal(this, "Rejecting (overload) insert request from "+source.getPeer()+": "+e);
 			}
-			node.unlockUID(id, isSSK, true, false);
+			node.unlockUID(id, isSSK, true, false, false);
 			return true;
 		}
 		long now = System.currentTimeMillis();

Modified: trunk/freenet/src/freenet/node/NodeStats.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeStats.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/NodeStats.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -113,10 +113,14 @@
 	final TimeDecayingRunningAverage successfulSskFetchBytesSentAverage;
 	final TimeDecayingRunningAverage successfulChkInsertBytesSentAverage;
 	final TimeDecayingRunningAverage successfulSskInsertBytesSentAverage;
+	final TimeDecayingRunningAverage successfulChkOfferReplyBytesSentAverage;
+	final TimeDecayingRunningAverage successfulSskOfferReplyBytesSentAverage;
 	final TimeDecayingRunningAverage successfulChkFetchBytesReceivedAverage;
 	final TimeDecayingRunningAverage successfulSskFetchBytesReceivedAverage;
 	final TimeDecayingRunningAverage successfulChkInsertBytesReceivedAverage;
 	final TimeDecayingRunningAverage successfulSskInsertBytesReceivedAverage;
+	final TimeDecayingRunningAverage successfulChkOfferReplyBytesReceivedAverage;
+	final TimeDecayingRunningAverage successfulSskOfferReplyBytesReceivedAverage;
 	
 	final TrivialRunningAverage globalFetchPSuccess;
 	final TrivialRunningAverage chkFetchPSuccess;
@@ -312,10 +316,14 @@
 		successfulSskFetchBytesSentAverage = new TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("SuccessfulSskFetchBytesSentAverage"), node);
 		successfulChkInsertBytesSentAverage = new TimeDecayingRunningAverage(32768+32768+1024, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("SuccessfulChkInsertBytesSentAverage"), node);
 		successfulSskInsertBytesSentAverage = new TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("SuccessfulSskInsertBytesSentAverage"), node);
+		successfulChkOfferReplyBytesSentAverage = new TimeDecayingRunningAverage(32768+500, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("successfulChkOfferReplyBytesSentAverage"), node);
+		successfulSskOfferReplyBytesSentAverage = new TimeDecayingRunningAverage(3072, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("successfulSskOfferReplyBytesSentAverage"), node);
 		successfulChkFetchBytesReceivedAverage = new TimeDecayingRunningAverage(32768+1024+500+2048/*path folding*/, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("SuccessfulChkFetchBytesReceivedAverage"), node);
 		successfulSskFetchBytesReceivedAverage = new TimeDecayingRunningAverage(2048+500, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("SuccessfulSskFetchBytesReceivedAverage"), node);
 		successfulChkInsertBytesReceivedAverage = new TimeDecayingRunningAverage(32768+1024+500, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("SuccessfulChkInsertBytesReceivedAverage"), node);
 		successfulSskInsertBytesReceivedAverage = new TimeDecayingRunningAverage(1024+1024+500, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("SuccessfulSskInsertBytesReceivedAverage"), node);
+		successfulChkOfferReplyBytesReceivedAverage = new TimeDecayingRunningAverage(32768+500, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("successfulChkOfferReplyBytesReceivedAverage"), node);
+		successfulSskOfferReplyBytesReceivedAverage = new TimeDecayingRunningAverage(3072, 180000, 0.0, 1024*1024*1024, throttleFS == null ? null : throttleFS.subset("successfulSskOfferReplyBytesReceivedAverage"), node);
 		
 		globalFetchPSuccess = new TrivialRunningAverage();
 		chkFetchPSuccess = new TrivialRunningAverage();
@@ -390,7 +398,7 @@
 	};
 	
 	/* return reject reason as string if should reject, otherwise return null */
-	public String shouldRejectRequest(boolean canAcceptAnyway, boolean isInsert, boolean isSSK, boolean isLocal, PeerNode source) {
+	public String shouldRejectRequest(boolean canAcceptAnyway, boolean isInsert, boolean isSSK, boolean isLocal, boolean isOfferReply, PeerNode source) {
 		logMINOR = Logger.shouldLog(Logger.MINOR, this);
 		if(logMINOR) dumpByteCostAverages();
 		
@@ -460,6 +468,8 @@
 		int numSSKRequests = node.getNumSSKRequests() + 1;
 		int numCHKInserts = node.getNumCHKInserts() + 1;
 		int numSSKInserts = node.getNumSSKInserts() + 1;
+		int numCHKOfferReplies = node.getNumCHKOfferReplies() + 1;
+		int numSSKOfferReplies = node.getNumSSKOfferReplies() + 1;
 		if(logMINOR)
 			Logger.minor(this, "Running (adjusted): CHK fetch "+numCHKRequests+" SSK fetch "+numSSKRequests+" CHK insert "+numCHKInserts+" SSK insert "+numSSKInserts);
 		
@@ -612,7 +622,9 @@
 				" CHK insert "+successfulChkInsertBytesSentAverage.currentValue()+ '/' +successfulChkInsertBytesReceivedAverage.currentValue()+
 				" SSK insert "+successfulSskInsertBytesSentAverage.currentValue()+ '/' +successfulSskInsertBytesReceivedAverage.currentValue()+
 				" CHK fetch "+successfulChkFetchBytesSentAverage.currentValue()+ '/' +successfulChkFetchBytesReceivedAverage.currentValue()+
-				" SSK fetch "+successfulSskFetchBytesSentAverage.currentValue()+ '/' +successfulSskFetchBytesReceivedAverage.currentValue());
+				" SSK fetch "+successfulSskFetchBytesSentAverage.currentValue()+ '/' +successfulSskFetchBytesReceivedAverage.currentValue()+
+				" CHK offer reply "+successfulChkOfferReplyBytesSentAverage.currentValue()+ '/' +successfulChkOfferReplyBytesReceivedAverage.currentValue()+
+				" SSK offer reply "+successfulSskOfferReplyBytesSentAverage.currentValue()+ '/' +successfulSskOfferReplyBytesReceivedAverage.currentValue());
 		
 	}
 
@@ -693,10 +705,14 @@
 		fs.put("SuccessfulSskFetchBytesSentAverage", successfulSskFetchBytesSentAverage.exportFieldSet(true));
 		fs.put("SuccessfulChkInsertBytesSentAverage", successfulChkInsertBytesSentAverage.exportFieldSet(true));
 		fs.put("SuccessfulSskInsertBytesSentAverage", successfulSskInsertBytesSentAverage.exportFieldSet(true));
+		fs.put("SuccessfulChkOfferReplyBytesSentAverage", successfulChkOfferReplyBytesSentAverage.exportFieldSet(true));
+		fs.put("SuccessfulSskOfferReplyBytesSentAverage", successfulSskOfferReplyBytesSentAverage.exportFieldSet(true));		
 		fs.put("SuccessfulChkFetchBytesReceivedAverage", successfulChkFetchBytesReceivedAverage.exportFieldSet(true));
 		fs.put("SuccessfulSskFetchBytesReceivedAverage", successfulSskFetchBytesReceivedAverage.exportFieldSet(true));
 		fs.put("SuccessfulChkInsertBytesReceivedAverage", successfulChkInsertBytesReceivedAverage.exportFieldSet(true));
 		fs.put("SuccessfulSskInsertBytesReceivedAverage", successfulSskInsertBytesReceivedAverage.exportFieldSet(true));
+		fs.put("SuccessfulChkOfferReplyBytesReceivedAverage", successfulChkOfferReplyBytesReceivedAverage.exportFieldSet(true));
+		fs.put("SuccessfulSskOfferReplyBytesReceivedAverage", successfulSskOfferReplyBytesReceivedAverage.exportFieldSet(true));		
 		
 		//These are not really part of the 'throttling' data, but are also running averages which should be persisted
 		fs.put("AverageCacheLocation", avgCacheLocation.exportFieldSet(true));

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/PeerNode.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -1116,6 +1116,7 @@
 		final long now = System.currentTimeMillis();
 		Logger.normal(this, "Disconnected " + this);
 		node.usm.onDisconnect(this);
+		node.failureTable.onDisconnect(this);
 		node.peers.disconnected(this);
 		boolean ret;
 		synchronized(this) {

Modified: trunk/freenet/src/freenet/node/RequestHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestHandler.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/RequestHandler.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -84,11 +84,11 @@
         } catch (NotConnectedException e) {
         	Logger.normal(this, "requestor gone, could not start request handler wait");
 			node.removeTransferringRequestHandler(uid);
-            node.unlockUID(uid, key instanceof NodeSSK, false, false);
+            node.unlockUID(uid, key instanceof NodeSSK, false, false, false);
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
 			node.removeTransferringRequestHandler(uid);
-            node.unlockUID(uid, key instanceof NodeSSK, false, false);
+            node.unlockUID(uid, key instanceof NodeSSK, false, false, false);
         }
     }
     
@@ -288,6 +288,7 @@
             		}
 					return;
             	case RequestSender.VERIFY_FAILURE:
+            	case RequestSender.GET_OFFER_VERIFY_FAILURE:
             		if(key instanceof NodeCHK) {
 						if(bt == null) {
             				// Bug! This is impossible!
@@ -305,6 +306,7 @@
             		sendTerminal(reject);
             		return;
             	case RequestSender.TRANSFER_FAILED:
+            	case RequestSender.GET_OFFER_TRANSFER_FAILED:
             		if(key instanceof NodeCHK) {
             			if(bt == null) {
             				// Bug! This is impossible!
@@ -372,7 +374,7 @@
 
 	private void unregisterRequestHandlerWithNode() {
 		node.removeTransferringRequestHandler(uid);
-		node.unlockUID(uid, key instanceof NodeSSK, false, false);
+		node.unlockUID(uid, key instanceof NodeSSK, false, false, false);
 	}
 	
 	/**

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/RequestSender.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -28,6 +28,8 @@
 import freenet.keys.NodeSSK;
 import freenet.keys.SSKBlock;
 import freenet.keys.SSKVerifyException;
+import freenet.node.FailureTable.BlockOffer;
+import freenet.node.FailureTable.OfferList;
 import freenet.store.KeyCollisionException;
 import freenet.support.Logger;
 import freenet.support.ShortBuffer;
@@ -49,6 +51,7 @@
 
     // Constants
     static final int ACCEPTED_TIMEOUT = 5000;
+    static final int GET_OFFER_TIMEOUT = 10000;
     static final int FETCH_TIMEOUT = 120000;
     /** Wait up to this long to get a path folding reply */
     static final int OPENNET_TIMEOUT = 120000;
@@ -72,7 +75,10 @@
     private byte[] sskData;
     private SSKBlock block;
     private boolean hasForwarded;
-	
+    
+    /** If true, only try to fetch the key from nodes which have offered it */
+    private boolean tryOffersOnly;
+    
 	private ArrayList listeners=new ArrayList();
     
     // Terminal status
@@ -89,6 +95,8 @@
     static final int GENERATED_REJECTED_OVERLOAD = 7;
     static final int INTERNAL_ERROR = 8;
     static final int RECENTLY_FAILED = 9;
+    static final int GET_OFFER_VERIFY_FAILURE = 10;
+    static final int GET_OFFER_TRANSFER_FAILED = 11;
     private PeerNode successFrom;
     
     static String getStatusString(int status) {
@@ -103,8 +111,12 @@
     		return "DATA NOT FOUND";
     	case TRANSFER_FAILED:
     		return "TRANSFER FAILED";
+    	case GET_OFFER_TRANSFER_FAILED:
+    		return "GET OFFER TRANSFER FAILED";
     	case VERIFY_FAILURE:
     		return "VERIFY FAILURE";
+    	case GET_OFFER_VERIFY_FAILURE:
+    		return "GET OFFER VERIFY FAILURE";
     	case TIMED_OUT:
     		return "TIMED OUT";
     	case GENERATED_REJECTED_OVERLOAD:
@@ -158,7 +170,7 @@
         	realRun();
         } catch (Throwable t) {
             Logger.error(this, "Caught "+t, t);
-            finish(INTERNAL_ERROR, null);
+            finish(INTERNAL_ERROR, null, false);
         } finally {
         	if(logMINOR) Logger.minor(this, "Leaving RequestSender.run() for "+uid);
             node.removeRequestSender(key, origHTL, this);
@@ -171,6 +183,191 @@
         	pubKey = ((NodeSSK)key).getPubKey();
         }
         
+        // First ask any nodes that have offered the data
+        
+        OfferList offers = node.failureTable.getOffers(key);
+        
+        while(true) {
+        	// Fetches valid offers, then expired ones. Expired offers don't count towards failures,
+        	// but they're still worth trying.
+        	BlockOffer offer = offers.getFirstOffer();
+        	if(offer == null) break;
+        	PeerNode pn = offer.getPeerNode();
+        	if(pn == null) {
+        		offers.deleteLastOffer();
+        		continue;
+        	}
+        	if(pn.getBootID() != offer.bootID) {
+        		offers.deleteLastOffer();
+        		continue;
+        	}
+        	Message msg = DMT.createFNPGetOfferedKey(key, offer.authenticator, pubKey == null, uid);
+        	try {
+				pn.sendAsync(msg, null, 0, this);
+			} catch (NotConnectedException e2) {
+				if(logMINOR)
+					Logger.minor(this, "Disconnected: "+pn+" getting offer for "+key);
+				offers.deleteLastOffer();
+				continue;
+			}
+        	MessageFilter mfRO = MessageFilter.create().setSource(pn).setField(DMT.UID, uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPRejectedOverload);
+        	MessageFilter mfGetInvalid = MessageFilter.create().setSource(pn).setField(DMT.UID, uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPGetOfferedKeyInvalid);
+        	// Wait for a response.
+        	if(key instanceof NodeCHK) {
+        		// Headers first, then block transfer.
+        		MessageFilter mfDF = MessageFilter.create().setSource(pn).setField(DMT.UID, uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPCHKDataFound);
+        		Message reply;
+				try {
+					reply = node.usm.waitFor(mfDF.or(mfRO.or(mfGetInvalid)), this);
+				} catch (DisconnectedException e2) {
+					if(logMINOR)
+						Logger.minor(this, "Disconnected: "+pn+" getting offer for "+key);
+					offers.deleteLastOffer();
+					continue;
+				}
+        		if(reply == null) {
+        			// We gave it a chance, don't give it another.
+        			offers.deleteLastOffer();
+        			continue;
+        		} else if(reply.getSpec() == DMT.FNPRejectedOverload) {
+        			// Non-fatal, keep it.
+        			if(logMINOR)
+        				Logger.minor(this, "Node "+pn+" rejected FNPGetOfferedKey for "+key+" (expired="+offer.isExpired());
+        			offers.keepLastOffer();
+        			continue;
+        		} else if(reply.getSpec() == DMT.FNPGetOfferedKeyInvalid) {
+        			// Fatal, delete it.
+        			if(logMINOR)
+        				Logger.minor(this, "Node "+pn+" rejected FNPGetOfferedKey as invalid with reason "+reply.getShort(DMT.REASON));
+        			offers.deleteLastOffer();
+        			continue;
+        		} else if(reply.getSpec() == DMT.FNPCHKDataFound) {
+        			headers = ((ShortBuffer)reply.getObject(DMT.BLOCK_HEADERS)).getData();
+        			// Receive the data
+        			
+                	// FIXME: Validate headers
+                	
+                	node.addTransferringSender((NodeCHK)key, this);
+                	
+                	try {
+                		
+                		prb = new PartiallyReceivedBlock(Node.PACKETS_IN_BLOCK, Node.PACKET_SIZE);
+                		
+                		synchronized(this) {
+                			notifyAll();
+                		}
+                		fireCHKTransferBegins();
+						
+                		BlockReceiver br = new BlockReceiver(node.usm, pn, uid, prb, this);
+                		
+                		try {
+                			if(logMINOR) Logger.minor(this, "Receiving data");
+                			byte[] data = br.receive();
+                			if(logMINOR) Logger.minor(this, "Received data");
+                			// Received data
+                			try {
+                				verifyAndCommit(data);
+                			} catch (KeyVerifyException e1) {
+                				Logger.normal(this, "Got data but verify failed: "+e1, e1);
+                				finish(GET_OFFER_VERIFY_FAILURE, pn, true);
+                        		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, pn, -1, System.currentTimeMillis());
+                        		offers.deleteLastOffer();
+                				return;
+                			}
+                			finish(SUCCESS, pn, true);
+                			return;
+                		} catch (RetrievalException e) {
+							if (e.getReason()==RetrievalException.SENDER_DISCONNECTED)
+								Logger.normal(this, "Transfer failed (disconnect): "+e, e);
+							else
+								Logger.error(this, "Transfer failed ("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e, e);
+                			finish(GET_OFFER_TRANSFER_FAILED, pn, true);
+                    		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, pn, -1, System.currentTimeMillis());
+                    		offers.deleteLastOffer();
+                			return;
+                		}
+                	} finally {
+                		node.removeTransferringSender((NodeCHK)key, this);
+                	}
+        		}
+        	} else {
+        		// Data, possibly followed by pubkey
+        		MessageFilter mfDF = MessageFilter.create().setSource(pn).setField(DMT.UID, uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPSSKDataFound);
+        		Message reply;
+				try {
+					reply = node.usm.waitFor(mfDF.or(mfRO.or(mfGetInvalid)), this);
+				} catch (DisconnectedException e) {
+					if(logMINOR)
+						Logger.minor(this, "Disconnected: "+pn+" getting offer for "+key);
+					offers.deleteLastOffer();
+					continue;
+				}
+        		if(reply == null) {
+            		offers.deleteLastOffer();
+        			continue;
+        		} else if(reply.getSpec() == DMT.FNPRejectedOverload) {
+        			// Non-fatal, keep it.
+        			if(logMINOR)
+        				Logger.minor(this, "Node "+pn+" rejected FNPGetOfferedKey for "+key+" (expired="+offer.isExpired());
+        			offers.keepLastOffer();
+        			continue;
+        		} else if(reply.getSpec() == DMT.FNPGetOfferedKeyInvalid) {
+        			// Fatal, delete it.
+        			if(logMINOR)
+        				Logger.minor(this, "Node "+pn+" rejected FNPGetOfferedKey as invalid with reason "+reply.getShort(DMT.REASON));
+        			offers.deleteLastOffer();
+        			continue;
+        		} else if(reply.getSpec() == DMT.FNPSSKDataFound) {
+        			// Receive the data
+        			headers = ((ShortBuffer) reply.getObject(DMT.BLOCK_HEADERS)).getData();
+        			sskData = ((ShortBuffer) reply.getObject(DMT.DATA)).getData();
+        			if(pubKey != null) {
+        				MessageFilter mfPK = MessageFilter.create().setSource(pn).setField(DMT.UID, uid).setTimeout(GET_OFFER_TIMEOUT).setType(DMT.FNPSSKPubKey);
+        				Message pk;
+						try {
+							pk = node.usm.waitFor(mfPK, this);
+						} catch (DisconnectedException e) {
+							if(logMINOR)
+								Logger.minor(this, "Disconnected: "+pn+" getting pubkey for offer for "+key);
+							offers.deleteLastOffer();
+							continue;
+						}
+        				if(pk == null) {
+        					Logger.error(this, "Got data but not pubkey from "+pn+" for offer for "+key);
+        					offers.deleteLastOffer();
+        					continue;
+        				}
+        				try {
+							pubKey = DSAPublicKey.create(((ShortBuffer)pk.getObject(DMT.PUBKEY_AS_BYTES)).getData());
+						} catch (CryptFormatException e) {
+							Logger.error(this, "Bogus pubkey from "+pn+" for offer for "+key+" : "+e, e);
+        					offers.deleteLastOffer();
+							continue;
+						}
+        			}
+        			
+        			try {
+						((NodeSSK)key).setPubKey(pubKey);
+					} catch (SSKVerifyException e) {
+						Logger.error(this, "Bogus SSK data from "+pn+" for offer for "+key+" : "+e, e);
+    					offers.deleteLastOffer();
+						continue;
+					}
+        			
+        			if(finishSSKFromGetOffer(pn)) {
+        				if(logMINOR) Logger.minor(this, "Successfully fetched SSK from offer from "+pn+" for "+key);
+        				return;
+        			} else {
+                		offers.deleteLastOffer();
+        				continue;
+        			}
+        		}
+        	}
+        	// RejectedOverload is possible - but we need to include it in the statistics.
+        	// We don't remove the offer in that case. Otherwise we do, even if it fails.
+        	// FNPGetOfferedKeyInvalid is also possible.
+        }
+        
 		int routeAttempts=0;
 		int rejectOverloads=0;
         HashSet nodesRoutedTo = new HashSet();
@@ -180,7 +377,7 @@
             if(htl == 0) {
             	// This used to be RNF, I dunno why
 				//???: finish(GENERATED_REJECTED_OVERLOAD, null);
-                finish(DATA_NOT_FOUND, null);
+                finish(DATA_NOT_FOUND, null, false);
         		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, null, FailureTable.REJECT_TIME, System.currentTimeMillis());
                 return;
             }
@@ -195,7 +392,7 @@
 				if (logMINOR && rejectOverloads>0)
 					Logger.minor(this, "no more peers, but overloads ("+rejectOverloads+"/"+routeAttempts+" overloaded)");
                 // Backtrack
-                finish(ROUTE_NOT_FOUND, null);
+                finish(ROUTE_NOT_FOUND, null, false);
         		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, null, -1, System.currentTimeMillis());
                 return;
             }
@@ -347,7 +544,7 @@
             		// Fatal timeout
             		next.localRejectedOverload("FatalTimeout");
             		forwardRejectedOverload();
-            		finish(TIMED_OUT, next);
+            		finish(TIMED_OUT, next, false);
             		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, next, -1, System.currentTimeMillis());
             		return;
             	}
@@ -358,7 +555,7 @@
             	
             	if(msg.getSpec() == DMT.FNPDataNotFound) {
             		next.successNotOverload();
-            		finish(DATA_NOT_FOUND, next);
+            		finish(DATA_NOT_FOUND, next, false);
             		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, next, FailureTable.REJECT_TIME, System.currentTimeMillis());
             		return;
             	}
@@ -420,7 +617,7 @@
            			// Kill the request, regardless of whether there is timeout left.
             		// If there is, we will avoid sending requests for the specified period.
             		// FIXME we need to create the FT entry.
-           			finish(RECENTLY_FAILED, next);
+           			finish(RECENTLY_FAILED, next, false);
             		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, next, timeLeft, System.currentTimeMillis());
             		return;
             	}
@@ -486,18 +683,18 @@
                 				verifyAndCommit(data);
                 			} catch (KeyVerifyException e1) {
                 				Logger.normal(this, "Got data but verify failed: "+e1, e1);
-                				finish(VERIFY_FAILURE, next);
+                				finish(VERIFY_FAILURE, next, false);
                         		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, next, -1, System.currentTimeMillis());
                 				return;
                 			}
-                			finish(SUCCESS, next);
+                			finish(SUCCESS, next, false);
                 			return;
                 		} catch (RetrievalException e) {
 							if (e.getReason()==RetrievalException.SENDER_DISCONNECTED)
 								Logger.normal(this, "Transfer failed (disconnect): "+e, e);
 							else
 								Logger.error(this, "Transfer failed ("+e.getReason()+"/"+RetrievalException.getErrString(e.getReason())+"): "+e, e);
-                			finish(TRANSFER_FAILED, next);
+                			finish(TRANSFER_FAILED, next, false);
                     		node.failureTable.onFailure(key, htl, new PeerNode[] { source }, next, -1, System.currentTimeMillis());
                 			return;
                 		}
@@ -570,18 +767,41 @@
 			node.storeShallow(block);
 			if(node.random.nextInt(RANDOM_REINSERT_INTERVAL) == 0)
 				node.queueRandomReinsert(block);
-			finish(SUCCESS, next);
+			finish(SUCCESS, next, false);
 		} catch (SSKVerifyException e) {
 			Logger.error(this, "Failed to verify: "+e+" from "+next, e);
-			finish(VERIFY_FAILURE, next);
+			finish(VERIFY_FAILURE, next, false);
 			return;
 		} catch (KeyCollisionException e) {
 			Logger.normal(this, "Collision on "+this);
-			finish(SUCCESS, next);
+			finish(SUCCESS, next, false);
 		}
 	}
 
     /**
+     * Finish fetching an SSK. We must have received the data, the headers and the pubkey by this point.
+     * @param next The node we received the data from.
+     * @return True if the request has completed. False if we need to look elsewhere.
+     */
+	private boolean finishSSKFromGetOffer(PeerNode next) {
+    	try {
+			block = new SSKBlock(sskData, headers, (NodeSSK)key, false);
+			node.storeShallow(block);
+			if(node.random.nextInt(RANDOM_REINSERT_INTERVAL) == 0)
+				node.queueRandomReinsert(block);
+			finish(SUCCESS, next, true);
+			return true;
+		} catch (SSKVerifyException e) {
+			Logger.error(this, "Failed to verify (from get offer): "+e+" from "+next, e);
+			return false;
+		} catch (KeyCollisionException e) {
+			Logger.normal(this, "Collision (from get offer) on "+this);
+			finish(SUCCESS, next, true);
+			return false;
+		}
+	}
+
+    /**
      * Note that this must be first on the list.
      */
 	private MessageFilter makeDataFoundFilter(PeerNode next) {
@@ -696,7 +916,7 @@
         }            
     }
     
-    private void finish(int code, PeerNode next) {
+    private void finish(int code, PeerNode next, boolean fromOfferedKey) {
     	if(logMINOR) Logger.minor(this, "finish("+code+ ')');
         
         synchronized(this) {
@@ -710,18 +930,21 @@
         	if(next != null) {
         		next.onSuccess(false, key instanceof NodeSSK);
         	}
-        	node.nodeStats.requestCompleted(true, source != null, key instanceof NodeSSK);
+        	// FIXME should this be called when fromOfferedKey??
+       		node.nodeStats.requestCompleted(true, source != null, key instanceof NodeSSK);
         	
 			//NOTE: because of the requesthandler implementation, this will block and wait
 			//      for downstream transfers on a CHK. The opennet stuff introduces
 			//      a delay of it's own if we don't get the expected message.
 			fireRequestSenderFinished(code);
 			
-        	if(key instanceof NodeCHK && next != null && 
-        			(next.isOpennet() || node.passOpennetRefsThroughDarknet()) ) {
-        		finishOpennet(next);
-        	} else
-        		finishOpennetNull(next);
+			if(fromOfferedKey) {
+				if(key instanceof NodeCHK && next != null && 
+						(next.isOpennet() || node.passOpennetRefsThroughDarknet()) ) {
+					finishOpennet(next);
+				} else
+					finishOpennetNull(next);
+			}
         } else {
         	node.nodeStats.requestCompleted(false, source != null, key instanceof NodeSSK);
 			fireRequestSenderFinished(code);

Modified: trunk/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestStarter.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/RequestStarter.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -117,7 +117,7 @@
 				} while(now < sleepUntil);
 				String reason;
 				if(LOCAL_REQUESTS_COMPETE_FAIRLY) {
-					if((reason = stats.shouldRejectRequest(true, isInsert, isSSK, true, null)) != null) {
+					if((reason = stats.shouldRejectRequest(true, isInsert, isSSK, true, false, null)) != null) {
 						if(logMINOR)
 							Logger.minor(this, "Not sending local request: "+reason);
 						// Wait one throttle-delay before trying again

Modified: trunk/freenet/src/freenet/node/SSKInsertHandler.java
===================================================================
--- trunk/freenet/src/freenet/node/SSKInsertHandler.java	2008-02-02 17:34:22 UTC (rev 17478)
+++ trunk/freenet/src/freenet/node/SSKInsertHandler.java	2008-02-02 20:05:30 UTC (rev 17479)
@@ -84,7 +84,7 @@
             Logger.error(this, "Caught "+t, t);
         } finally {
             if(logMINOR) Logger.minor(this, "Exiting InsertHandler.run() for "+uid);
-            node.unlockUID(uid, true, true, false);
+            node.unlockUID(uid, true, true, false, false);
         }
     }
 




More information about the cvs mailing list