[freenet-cvs] r16190 - trunk/freenet/src/freenet/node

toad at freenetproject.org toad at freenetproject.org
Sat Dec 1 17:14:09 UTC 2007


Author: toad
Date: 2007-12-01 17:14:09 +0000 (Sat, 01 Dec 2007)
New Revision: 16190

Added:
   trunk/freenet/src/freenet/node/AnnounceSender.java
Modified:
   trunk/freenet/src/freenet/node/Node.java
   trunk/freenet/src/freenet/node/NodeDispatcher.java
   trunk/freenet/src/freenet/node/OpennetManager.java
   trunk/freenet/src/freenet/node/PeerNode.java
Log:
Opennet announcement. Untested.

Added: trunk/freenet/src/freenet/node/AnnounceSender.java
===================================================================
--- trunk/freenet/src/freenet/node/AnnounceSender.java	                        (rev 0)
+++ trunk/freenet/src/freenet/node/AnnounceSender.java	2007-12-01 17:14:09 UTC (rev 16190)
@@ -0,0 +1,392 @@
+package freenet.node;
+
+import java.util.HashSet;
+
+import freenet.io.comm.ByteCounter;
+import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
+import freenet.io.comm.NotConnectedException;
+import freenet.io.comm.PeerParseException;
+import freenet.io.comm.ReferenceSignatureVerificationException;
+import freenet.support.Logger;
+import freenet.support.SimpleFieldSet;
+
+public class AnnounceSender implements Runnable, ByteCounter {
+
+    // Constants
+    static final int ACCEPTED_TIMEOUT = 5000;
+    static final int ANNOUNCE_TIMEOUT = 240000; // longer than a regular request as have to transfer noderefs hop by hop etc
+	
+	private final PeerNode source;
+	private final long uid;
+	private final OpennetManager om;
+	private final Node node;
+	private Message msg;
+	private byte[] noderefBuf;
+	private int noderefLength;
+	private short htl;
+	private double nearestLoc;
+	private double target;
+	private static boolean logMINOR;
+	
+	public AnnounceSender(Message m, long uid, PeerNode source, OpennetManager om, Node node) {
+		this.source = source;
+		this.uid = uid;
+		this.msg = m;
+		this.om = om;
+		this.node = node;
+		htl = m.getShort(DMT.HTL);
+		target = m.getDouble(DMT.TARGET_LOCATION); // FIXME validate
+		logMINOR = Logger.shouldLog(Logger.MINOR, this);
+	}
+
+	public void run() {
+		try {
+			realRun();
+		} catch (Throwable t) {
+			Logger.error(this, "Caught "+t+" announcing "+uid+" from "+source, t);
+		} finally {
+			source.completedAnnounce(uid);
+			source.node.completed(uid);
+		}
+	}
+
+	private void realRun() {
+		boolean hasForwarded = false;
+		try {
+			source.sendAsync(DMT.createFNPAccepted(uid), null, 0, null);
+		} catch (NotConnectedException e) {
+			return;
+		}
+		if(source != null) {
+			if(!transferNoderef()) return;
+		}
+		
+        double myLoc = node.lm.getLocation();
+        if(Location.distance(target, myLoc) < Location.distance(target, nearestLoc)) {
+            nearestLoc = myLoc;
+            htl = node.maxHTL();
+        } else {
+        	if(source != null)
+        		htl = node.decrementHTL(source, htl);
+        }
+        
+		// Now route it.
+		
+        HashSet nodesRoutedTo = new HashSet();
+        HashSet nodesNotIgnored = new HashSet();
+        while(true) {
+            if(logMINOR) Logger.minor(this, "htl="+htl);
+            if(htl == 0) {
+            	// No more nodes.
+            	complete();
+            	return;
+            }
+            
+            // Route it
+            PeerNode next;
+            next = node.peers.closerPeer(source, nodesRoutedTo, nodesNotIgnored, target, true, node.isAdvancedModeEnabled(), -1, null);
+            
+            if(next == null) {
+                // Backtrack
+            	rnf();
+                return;
+            }
+            if(logMINOR) Logger.minor(this, "Routing request to "+next);
+            nodesRoutedTo.add(next);
+            
+            if(hasForwarded)
+            	htl = node.decrementHTL(source, htl);
+            
+            if(!sendTo(next)) continue;
+            
+            hasForwarded = true;
+            
+            Message msg = null;
+            
+            while(true) {
+            	
+                /**
+                 * What are we waiting for?
+                 * FNPAccepted - continue
+                 * FNPRejectedLoop - go to another node
+                 * FNPRejectedOverload - go to another node
+                 */
+                
+                MessageFilter mfAccepted = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPAccepted);
+                MessageFilter mfRejectedLoop = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedLoop);
+                MessageFilter mfRejectedOverload = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPRejectedOverload);
+                MessageFilter mfOpennetDisabled = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPOpennetDisabled);
+                MessageFilter mfOpennetNoderefRejected = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ACCEPTED_TIMEOUT).setType(DMT.FNPOpennetNoderefRejected);
+                
+                // mfRejectedOverload must be the last thing in the or
+                // So its or pointer remains null
+                // Otherwise we need to recreate it below
+                MessageFilter mf = mfAccepted.or(mfRejectedLoop.or(mfRejectedOverload.or(mfOpennetDisabled.or(mfOpennetNoderefRejected))));
+                
+                try {
+                    msg = node.usm.waitFor(mf, this);
+                    if(logMINOR) Logger.minor(this, "first part got "+msg);
+                } catch (DisconnectedException e) {
+                    Logger.normal(this, "Disconnected from "+next+" while waiting for Accepted on "+uid);
+                    break;
+                }
+                
+            	if(msg == null) {
+            		if(logMINOR) Logger.minor(this, "Timeout waiting for Accepted");
+            		// Try next node
+            		msg = null;
+            		break;
+            	}
+            	
+            	if(msg.getSpec() == DMT.FNPRejectedLoop) {
+            		if(logMINOR) Logger.minor(this, "Rejected loop");
+            		// Find another node to route to
+            		msg = null;
+            		break;
+            	}
+            	
+            	if(msg.getSpec() == DMT.FNPRejectedOverload) {
+            		if(logMINOR) Logger.minor(this, "Rejected: overload");
+					// Give up on this one, try another
+            		msg = null;
+					break;
+            	}
+            	
+            	if(msg.getSpec() == DMT.FNPOpennetDisabled) {
+            		source.setOpennetDisabled();
+            		msg = null;
+            		break;
+            	}
+            	
+            	if(msg.getSpec() == DMT.FNPOpennetNoderefRejected) {
+            		int reason = msg.getInt(DMT.REJECT_CODE);
+            		Logger.normal(this, "Announce rejected by "+source+" : "+DMT.getOpennetRejectedCode(reason));
+            		msg = null;
+            		break;
+            	}
+            	
+            	if(msg.getSpec() != DMT.FNPAccepted) {
+            		Logger.error(this, "Unrecognized message: "+msg);
+            		continue;
+            	}
+            	
+            	break;
+            }
+            
+            if((msg == null) || (msg.getSpec() != DMT.FNPAccepted)) {
+            	// Try another node
+            	continue;
+            }
+
+            if(logMINOR) Logger.minor(this, "Got Accepted");
+            
+            // Otherwise, must be Accepted
+            
+            // So wait...
+            
+            while(true) {
+            	
+            	MessageFilter mfAnnounceCompleted = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetAnnounceCompleted);
+            	MessageFilter mfRouteNotFound = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPRouteNotFound);
+            	MessageFilter mfRejectedOverload = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPRejectedOverload);
+            	MessageFilter mfAnnounceReply = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetAnnounceReply);
+                MessageFilter mfOpennetDisabled = MessageFilter.create().setSource(next).setField(DMT.UID, uid).setTimeout(ANNOUNCE_TIMEOUT).setType(DMT.FNPOpennetDisabled);
+            	MessageFilter mf = mfAnnounceCompleted.or(mfRouteNotFound.or(mfRejectedOverload.or(mfAnnounceReply.or(mfOpennetDisabled))));
+            	
+            	try {
+            		msg = node.usm.waitFor(mf, this);
+            	} catch (DisconnectedException e) {
+            		Logger.normal(this, "Disconnected from "+next+" while waiting for announcement");
+            		break;
+            	}
+            	
+            	if(logMINOR) Logger.minor(this, "second part got "+msg);
+            	
+            	if(msg == null) {
+            		// Fatal timeout
+            		timedOut();
+            		return;
+            	}
+            	
+            	if(msg.getSpec() == DMT.FNPOpennetAnnounceCompleted) {
+            		complete();
+            		return;
+            	}
+            	
+            	if(msg.getSpec() == DMT.FNPRouteNotFound) {
+            		// Backtrack within available hops
+            		short newHtl = msg.getShort(DMT.HTL);
+            		if(newHtl < htl) htl = newHtl;
+            		break;
+            	}
+
+            	if(msg.getSpec() == DMT.FNPRejectedOverload) {
+					// Give up on this one, try another
+					break;
+            	}
+            	
+            	if(msg.getSpec() == DMT.FNPOpennetDisabled) {
+            		source.setOpennetDisabled();
+            		msg = null;
+            		break;
+            	}
+            	
+            	if(msg.getSpec() == DMT.FNPOpennetAnnounceReply) {
+            		validateForwardReply(msg, next);
+            		continue; // There may be more
+            	}
+            	
+            	Logger.error(this, "Unexpected message: "+msg);
+            }
+        }
+	}
+
+	/**
+	 * Validate a reply, and relay it back to the source.
+	 * @param msg2 The AnnouncementReply message.
+	 * @return True unless we lost the connection to our request source.
+	 */
+	private boolean validateForwardReply(Message msg, PeerNode source) {
+		long xferUID = msg.getLong(DMT.TRANSFER_UID);
+		int noderefLength = msg.getInt(DMT.NODEREF_LENGTH);
+		int paddedLength = msg.getInt(DMT.PADDED_LENGTH);
+		byte[] noderefBuf = om.innerWaitForOpennetNoderef(xferUID, paddedLength, noderefLength, source, false, uid, true, this);
+		if(noderefBuf == null) {
+			return true; // Don't relay
+		}
+		SimpleFieldSet fs = om.validateNoderef(noderefBuf, 0, noderefLength, source);
+		if(fs == null) {
+			om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, this);
+			return true; // Don't relay
+		}
+		// Now relay it
+		try {
+			om.sendAnnouncementReply(uid, source, noderefBuf, this);
+		} catch (NotConnectedException e) {
+			// Hmmm...!
+			return false;
+		}
+		return true;
+	}
+
+	/**
+	 * Send an AnnouncementRequest.
+	 * @param next The node to send the announcement to.
+	 * @return True if the announcement was successfully sent.
+	 */
+	private boolean sendTo(PeerNode next) {
+		try {
+			om.sendAnnouncementRequest(uid, next, noderefBuf, this, target, htl, nearestLoc);
+		} catch (NotConnectedException e) {
+			if(logMINOR) Logger.minor(this, "Disconnected");
+			return false;
+		}
+		return true;
+	}
+
+	private void timedOut() {
+		Message msg = DMT.createFNPRejectedOverload(uid, false);
+		try {
+			source.sendAsync(msg, null, 0, this);
+		} catch (NotConnectedException e) {
+			// Ok
+		}
+	}
+
+	private void rnf() {
+		Message msg = DMT.createFNPRouteNotFound(uid, htl);
+		try {
+			source.sendAsync(msg, null, 0, this);
+		} catch (NotConnectedException e) {
+			// Ok
+		}
+	}
+
+	private void complete() {
+		Message msg = DMT.createFNPOpennetAnnounceCompleted(uid);
+		try {
+			source.sendAsync(msg, null, 0, this);
+		} catch (NotConnectedException e) {
+			// Oh well.
+		}
+	}
+
+	/**
+	 * @return True unless the noderef is bogus.
+	 */
+	private boolean transferNoderef() {
+		long xferUID = msg.getLong(DMT.TRANSFER_UID);
+		noderefLength = msg.getInt(DMT.NODEREF_LENGTH);
+		int paddedLength = msg.getInt(DMT.PADDED_LENGTH);
+		noderefBuf = om.innerWaitForOpennetNoderef(xferUID, paddedLength, noderefLength, source, false, uid, true, this);
+		if(noderefBuf == null) {
+			return false;
+		}
+		SimpleFieldSet fs = om.validateNoderef(noderefBuf, 0, noderefLength, source);
+		if(fs == null) {
+			om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, this);
+			return false;
+		}
+		// If we want it, add it and send it.
+		try {
+			if(om.addNewOpennetNode(fs)) {
+				sendOurRef();
+			} else {
+				// Okay, just route it.
+			}
+		} catch (FSParseException e) {
+			om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, this);
+			return false;
+		} catch (PeerParseException e) {
+			om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, this);
+			return false;
+		} catch (ReferenceSignatureVerificationException e) {
+			om.rejectRef(uid, source, DMT.NODEREF_REJECTED_INVALID, this);
+			return false;
+		}
+		return true;
+	}
+
+	private void sendOurRef() {
+		// FIXME transmit our noderef back to the node
+		// TODO Auto-generated method stub
+		
+	}
+
+	private volatile Object totalBytesSync = new Object();
+	private int totalBytesSent;
+	
+	public void sentBytes(int x) {
+		synchronized(totalBytesSync) {
+			totalBytesSent += x;
+		}
+	}
+	
+	public int getTotalSentBytes() {
+		synchronized(totalBytesSync) {
+			return totalBytesSent;
+		}
+	}
+	
+	private int totalBytesReceived;
+	
+	public void receivedBytes(int x) {
+		synchronized(totalBytesSync) {
+			totalBytesReceived += x;
+		}
+	}
+	
+	public int getTotalReceivedBytes() {
+		synchronized(totalBytesSync) {
+			return totalBytesReceived;
+		}
+	}
+
+	public void sentPayload(int x) {
+		// Doesn't count.
+	}
+	
+}

Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java	2007-12-01 17:00:49 UTC (rev 16189)
+++ trunk/freenet/src/freenet/node/Node.java	2007-12-01 17:14:09 UTC (rev 16190)
@@ -2224,7 +2224,7 @@
 	/**
 	 * A request completed (regardless of success).
 	 */
-	private synchronized void completed(long id) {
+	synchronized void completed(long id) {
 		recentlyCompletedIDs.push(new Long(id));
 		while(recentlyCompletedIDs.size() > MAX_RECENTLY_COMPLETED_IDS)
 			recentlyCompletedIDs.pop();

Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java	2007-12-01 17:00:49 UTC (rev 16189)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java	2007-12-01 17:14:09 UTC (rev 16190)
@@ -97,6 +97,8 @@
 			return node.nodeUpdater.uom.handleRequestMain(m, source);
 		} else if(spec == DMT.UOMSendingMain) {
 			return node.nodeUpdater.uom.handleSendingMain(m, source);
+		} else if(spec == DMT.FNPOpennetAnnounceRequest) {
+			return handleAnnounceRequest(m, source);
 		}
 
 		if(!source.isRoutable()) return false;
@@ -141,7 +143,7 @@
 //			return handleProbeRejected(m, source);
 //		} else if(spec == DMT.FNPProbeTrace) {
 //			return handleProbeTrace(m, source);
-		}
+		} 
 		return false;
 	}
 
@@ -265,6 +267,49 @@
 		return true;
 	}
 
+	private boolean handleAnnounceRequest(Message m, PeerNode source) {
+		long uid = m.getLong(DMT.UID);
+		OpennetManager om = node.getOpennet();
+		if(om == null) {
+			Message msg = DMT.createFNPOpennetDisabled(uid);
+			try {
+				source.sendAsync(msg, null, 0, null);
+			} catch (NotConnectedException e) {
+				// Ok
+			}
+			return true;
+		}
+		if(node.recentlyCompleted(uid)) {
+			Message msg = DMT.createFNPRejectedLoop(uid);
+			try {
+				source.sendAsync(msg, null, 0, null);
+			} catch (NotConnectedException e) {
+				// Ok
+			}
+			return true;
+		}
+		boolean success = false;
+		try {
+			if(!source.shouldAcceptAnnounce(uid)) {
+				node.completed(uid);
+				Message msg = DMT.createFNPRejectedOverload(uid, true);
+				try {
+					source.sendAsync(msg, null, 0, null);
+				} catch (NotConnectedException e) {
+					// Ok
+				}
+				return true;
+			}
+			AnnounceSender sender = new AnnounceSender(m, uid, source, om, node);
+			node.executor.execute(sender, "Announcement sender for "+uid);
+			success = true;
+			return true;
+		} finally {
+			if(!success)
+				source.completedAnnounce(uid);
+		}
+	}
+
 	final Hashtable routedContexts = new Hashtable();
 
 	static class RoutedContext {

Modified: trunk/freenet/src/freenet/node/OpennetManager.java
===================================================================
--- trunk/freenet/src/freenet/node/OpennetManager.java	2007-12-01 17:00:49 UTC (rev 16189)
+++ trunk/freenet/src/freenet/node/OpennetManager.java	2007-12-01 17:14:09 UTC (rev 16190)
@@ -518,6 +518,21 @@
 		innerSendOpennetRef(xferUID, padded, peer);
 	}
 	
+	public void sendAnnouncementReply(long uid, PeerNode peer, byte[] noderef, ByteCounter ctr) 
+	throws NotConnectedException {
+		byte[] padded = new byte[PADDED_NODEREF_SIZE];
+		if(noderef.length > padded.length) {
+			Logger.error(this, "Noderef too big: "+noderef.length+" bytes");
+			return;
+		}
+		System.arraycopy(noderef, 0, padded, 0, noderef.length);
+		long xferUID = node.random.nextLong();
+		Message msg = DMT.createFNPOpennetAnnounceReply(uid, xferUID, noderef.length, 
+				padded.length);
+		peer.sendAsync(msg, null, 0, ctr);
+		innerSendOpennetRef(xferUID, padded, peer);
+	}
+	
 	/**
 	 * Wait for an opennet noderef.
 	 * @param isReply If true, wait for an FNPOpennetConnectReply[New], if false wait for an FNPOpennetConnectDestination[New].

Modified: trunk/freenet/src/freenet/node/PeerNode.java
===================================================================
--- trunk/freenet/src/freenet/node/PeerNode.java	2007-12-01 17:00:49 UTC (rev 16189)
+++ trunk/freenet/src/freenet/node/PeerNode.java	2007-12-01 17:14:09 UTC (rev 16190)
@@ -266,6 +266,7 @@
 	final WeakReference myRef;
 	/** The node is being disconnected, but it may take a while. */
 	private boolean disconnecting;
+	
 	/**
 	 * For FNP link setup:
 	 *  The initiator has to ensure that nonces send back by the
@@ -3038,4 +3039,47 @@
 	synchronized boolean manyPacketsClaimedSentNotReceived() {
 		return manyPacketsClaimedSentNotReceived;
 	}
+	
+	static final int MAX_SIMULTANEOUS_ANNOUNCEMENTS = 1;
+	static final int MAX_ANNOUNCE_DELAY = 1000;
+	private long timeLastAcceptedAnnouncement;
+	private long[] runningAnnounceUIDs = new long[0];
+	
+	public synchronized boolean shouldAcceptAnnounce(long uid) {
+		long now = System.currentTimeMillis();
+		if(runningAnnounceUIDs.length < MAX_SIMULTANEOUS_ANNOUNCEMENTS &&
+				now - timeLastAcceptedAnnouncement > MAX_ANNOUNCE_DELAY) {
+			long[] newList = new long[runningAnnounceUIDs.length + 1];
+			if(runningAnnounceUIDs.length > 0)
+				System.arraycopy(runningAnnounceUIDs, 0, newList, 0, runningAnnounceUIDs.length);
+			newList[runningAnnounceUIDs.length] = uid;
+			timeLastAcceptedAnnouncement = now;
+			return true;
+		} else {
+			return false;
+		}
+	}
+	
+	public synchronized boolean completedAnnounce(long uid) {
+		if(runningAnnounceUIDs.length == 0) return false;
+		long[] newList = new long[runningAnnounceUIDs.length - 1];
+		int x = 0;
+		for(int i=0;i<runningAnnounceUIDs.length;i++) {
+			if(i == runningAnnounceUIDs.length) return false;
+			long l = runningAnnounceUIDs[i];
+			if(l == uid) continue;
+			newList[x++] = l;
+		}
+		runningAnnounceUIDs = newList;
+		if(x < runningAnnounceUIDs.length) {
+			newList = new long[x];
+			System.arraycopy(runningAnnounceUIDs, 0, newList, 0, x);
+			runningAnnounceUIDs = newList;
+		}
+		return true;
+	}
+
+	public void setOpennetDisabled() {
+		// FIXME
+	}
 }




More information about the cvs mailing list