[freenet-cvs] r16959 - trunk/freenet/src/freenet/node

robert at freenetproject.org robert at freenetproject.org
Mon Jan 7 20:07:30 UTC 2008


Author: robert
Date: 2008-01-07 20:07:30 +0000 (Mon, 07 Jan 2008)
New Revision: 16959

Modified:
   trunk/freenet/src/freenet/node/MessageItem.java
   trunk/freenet/src/freenet/node/PeerNode.java
   trunk/freenet/src/freenet/node/RequestSender.java
Log:
implement conditionalSend: sendSync-with-timeout, aborts message send


Modified: trunk/freenet/src/freenet/node/MessageItem.java
===================================================================
--- trunk/freenet/src/freenet/node/MessageItem.java	2008-01-07 19:31:24 UTC (rev 16958)
+++ trunk/freenet/src/freenet/node/MessageItem.java	2008-01-07 20:07:30 UTC (rev 16959)
@@ -72,4 +72,8 @@
 			}
 		}
 	}
+	
+	public boolean isForMessage(Message msg) {
+		return this.msg.equals(msg);
+	}
 }

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java	2008-01-07 19:31:24 UTC (rev 16958)
+++ trunk/freenet/src/freenet/node/PeerNode.java	2008-01-07 20:07:30 UTC (rev 16959)
@@ -16,6 +16,7 @@
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.ListIterator;
 import java.util.Vector;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
@@ -968,6 +969,21 @@
 		// It will wake up before the maximum coalescing delay (100ms) because
 		// it wakes up every 100ms *anyway*.
 	}
+	
+	private boolean maybeRemoveMessageFromQueue(Message removeMe) {
+		Logger.normal(this, "attempting to remove message from send-queue: "+removeMe);
+		synchronized (messagesToSendNow) {
+			ListIterator i=messagesToSendNow.listIterator();
+			while (i.hasNext()) {
+				MessageItem it=(MessageItem)i.next();
+				if (it.isForMessage(removeMe)) {
+					i.remove();
+					return true;
+				}
+			}
+		}
+		return false;
+	}
 
 	public long getMessageQueueLengthBytes() {
 		long x = 0;
@@ -1396,6 +1412,36 @@
 	}
 
 	/**
+	 * Conceptually, send a message to this node IF it can be done within 'timeout', returing true
+	 * only after the message was sent (similiar to sendSync), and false if the message cannot be
+	 * sent to the node in that time period. As an optimization, however, this function may return
+	 * immediately if it is determined that the message would not leave the node within the timeout
+	 * period.
+	 */
+	public boolean conditionalSend(Message req, ByteCounter ctr, long timeout) throws NotConnectedException {
+		if (timeout<=0)
+			return false;
+		if (getMessageQueueLengthBytes()/(getThrottle().getBandwidth()+1.0) > timeout) {
+			Logger.normal(this, "conditionalSend; pre-emptively not sending message ("+timeout+"ms): "+req);
+			return false;
+		}
+		SyncMessageCallback cb = new SyncMessageCallback();
+		sendAsync(req, cb, 0, ctr);
+		cb.waitForSend(timeout);
+		if (cb.done) {
+			return true;
+		} else {
+			//best-effort: remove the message from the send queue it is ok if we can't prematurely
+			//remove the item (i.e. race condition / now it is sent), but it will generate unclaimed messages, etc.
+			if (!maybeRemoveMessageFromQueue(req))
+				Logger.error(this, "unable to stop transmition of request: "+req);
+			else
+				Logger.normal(this, "removed request from queue for timeout: "+req);
+			return false;
+		}
+	}
+	
+	/**
 	* Enqueue a message to be sent to this node and wait up to a minute for it to be transmitted.
 	*/
 	public void sendSync(Message req, ByteCounter ctr) throws NotConnectedException {
@@ -1426,7 +1472,7 @@
 				}
 			}
 			if(isConnected())
-				Logger.error(this, "Waited too long for a blocking send on " + this + " for " + PeerNode.this, new Exception("error"));
+				Logger.normal(this, "Waited too long for a blocking send on " + this + " for " + PeerNode.this, new Exception("error"));
 		}
 
 		public void acknowledged() {

Modified: trunk/freenet/src/freenet/node/RequestSender.java
===================================================================
--- trunk/freenet/src/freenet/node/RequestSender.java	2008-01-07 19:31:24 UTC (rev 16958)
+++ trunk/freenet/src/freenet/node/RequestSender.java	2008-01-07 20:07:30 UTC (rev 16959)
@@ -4,6 +4,7 @@
 package freenet.node;
 
 import java.util.HashSet;
+import java.util.ArrayList;
 
 import freenet.crypt.CryptFormatException;
 import freenet.crypt.DSAPublicKey;
@@ -46,6 +47,8 @@
 public final class RequestSender implements Runnable, ByteCounter {
 
     // Constants
+	//SEND_TIMEOUT is not a hard timeout, shoot low for low latency (250-500ms?).
+	static final int SEND_TIMEOUT = 1000;
     static final int ACCEPTED_TIMEOUT = 5000;
     static final int FETCH_TIMEOUT = 120000;
     /** Wait up to this long to get a path folding reply */
@@ -141,6 +144,7 @@
 		int rejectOverloads=0;
         HashSet nodesRoutedTo = new HashSet();
         HashSet nodesNotIgnored = new HashSet();
+		ArrayList busyPeers = new ArrayList();
         while(true) {
             if(logMINOR) Logger.minor(this, "htl="+htl);
             if(htl == 0) {
@@ -159,9 +163,24 @@
 			routeAttempts++;
             
             // Route it
+			long sendTimeout = SEND_TIMEOUT;
+			boolean usingBusyPeer=false;
             PeerNode next;
             next = node.peers.closerPeer(source, nodesRoutedTo, nodesNotIgnored, target, true, node.isAdvancedModeEnabled(), -1, null);
             
+			if (next == null && !busyPeers.isEmpty()) {
+				next = (PeerNode)busyPeers.remove(0);
+				usingBusyPeer=true;
+				if (logMINOR) Logger.minor(this, "trying previously-found busy peer: "+next);
+				//NOTE: if we are at this point, it is already presumed that the message cannot even make it off the node to this peer in SEND_TIMEOUT, use all the timeout we have left.
+				sendTimeout = FETCH_TIMEOUT-(System.currentTimeMillis()-startTime);
+				//Edge case, local request & we are running w/o any time left.
+				if (sendTimeout < SEND_TIMEOUT && source==null) {
+					if (logMINOR) Logger.minor(this, "increasing timeout for local request");
+					sendTimeout = 2*SEND_TIMEOUT;
+				}
+			}
+			
             if(next == null) {
 				if (logMINOR && rejectOverloads>0)
 					Logger.minor(this, "no more peers, but overloads ("+rejectOverloads+"/"+routeAttempts+" overloaded)");
@@ -187,11 +206,18 @@
             // So take it from when we first started to try to send the request.
             // See comments below when handling FNPRecentlyFailed for why we need this.
             long timeSentRequest = System.currentTimeMillis();
-            
+			
             try {
             	//This is the first contact to this node
             	//async is preferred, but makes ACCEPTED_TIMEOUT much more likely for long send queues.
-            	next.sendAsync(req, null, 0, this);
+				//using conditionalSend this way might actually approximate Q-routing load balancing accross the network.
+            	if (!next.conditionalSend(req, this, sendTimeout)) {
+					if (usingBusyPeer)
+						continue;
+					Logger.normal(this, "will try this peer later if no others are available");
+					busyPeers.add(next);
+					continue;
+				}
             } catch (NotConnectedException e) {
             	Logger.minor(this, "Not connected");
             	continue;




More information about the cvs mailing list