[freenet-cvs] r11270 - trunk/freenet/src/freenet/node

nextgens at freenetproject.org nextgens at freenetproject.org
Wed Dec 6 10:43:44 UTC 2006


Author: nextgens
Date: 2006-12-06 10:43:42 +0000 (Wed, 06 Dec 2006)
New Revision: 11270

Modified:
   trunk/freenet/src/freenet/node/NodeDispatcher.java
Log:
indent


Modified: trunk/freenet/src/freenet/node/NodeDispatcher.java
===================================================================
--- trunk/freenet/src/freenet/node/NodeDispatcher.java	2006-12-06 06:40:50 UTC (rev 11269)
+++ trunk/freenet/src/freenet/node/NodeDispatcher.java	2006-12-06 10:43:42 UTC (rev 11270)
@@ -37,390 +37,390 @@
 public class NodeDispatcher implements Dispatcher {
 
 	private static boolean logMINOR;
-    final Node node;
-    
-    NodeDispatcher(Node node) {
-        this.node = node;
-        logMINOR = Logger.shouldLog(Logger.MINOR, this);
-    }
-    
-    public boolean handleMessage(Message m) {
-        logMINOR = Logger.shouldLog(Logger.MINOR, this);
-        PeerNode source = (PeerNode)m.getSource();
-        if(logMINOR) Logger.minor(this, "Dispatching "+m+" from "+source);
-        MessageType spec = m.getSpec();
-        if(spec == DMT.FNPPing) {
-            // Send an FNPPong
-            Message reply = DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
-            try {
-                ((PeerNode)m.getSource()).sendAsync(reply, null, 0, null); // nothing we can do if can't contact source
-            } catch (NotConnectedException e) {
-            	if(logMINOR) Logger.minor(this, "Lost connection replying to "+m);
-            }
-            return true;
-        }else if(spec == DMT.FNPLinkPing) {
-        	long id = m.getLong(DMT.PING_SEQNO);
-        	Message msg = DMT.createFNPLinkPong(id);
-        	try {
+	final Node node;
+
+	NodeDispatcher(Node node) {
+		this.node = node;
+		logMINOR = Logger.shouldLog(Logger.MINOR, this);
+	}
+
+	public boolean handleMessage(Message m) {
+		logMINOR = Logger.shouldLog(Logger.MINOR, this);
+		PeerNode source = (PeerNode)m.getSource();
+		if(logMINOR) Logger.minor(this, "Dispatching "+m+" from "+source);
+		MessageType spec = m.getSpec();
+		if(spec == DMT.FNPPing) {
+			// Send an FNPPong
+			Message reply = DMT.createFNPPong(m.getInt(DMT.PING_SEQNO));
+			try {
+				((PeerNode)m.getSource()).sendAsync(reply, null, 0, null); // nothing we can do if can't contact source
+			} catch (NotConnectedException e) {
+				if(logMINOR) Logger.minor(this, "Lost connection replying to "+m);
+			}
+			return true;
+		}else if(spec == DMT.FNPLinkPing) {
+			long id = m.getLong(DMT.PING_SEQNO);
+			Message msg = DMT.createFNPLinkPong(id);
+			try {
 				source.sendAsync(msg, null, 0, null);
 			} catch (NotConnectedException e) {
 				// Ignore
 			}
-        	return true;
-        } else if(spec == DMT.FNPLinkPong) {
-        	long id = m.getLong(DMT.PING_SEQNO);
-        	source.receivedLinkPong(id);
-        	return true;
-        } else if(spec == DMT.FNPDetectedIPAddress) {
-        	Peer p = (Peer) m.getObject(DMT.EXTERNAL_ADDRESS);
-        	source.setRemoteDetectedPeer(p);
-        	node.ipDetector.redetectAddress();
-        	return true;
-        } else if(spec == DMT.FNPVoid) {
-        	return true;
-        } else if(spec == DMT.nodeToNodeMessage) {
-        	node.receivedNodeToNodeMessage(m);
-        	return true;
-        } else if(spec == DMT.nodeToNodeTextMessage) {
-        	node.receivedNodeToNodeTextMessage(m);
-        	return true;
-        }
-        
-        if(!source.isRoutable()) return false;
-        
-        if(spec == DMT.FNPLocChangeNotification) {
-            double newLoc = m.getDouble(DMT.LOCATION);
-            source.updateLocation(newLoc);
-            return true;
-        } else if(spec == DMT.FNPSwapRequest) {
-            return node.lm.handleSwapRequest(m);
-        } else if(spec == DMT.FNPSwapReply) {
-            return node.lm.handleSwapReply(m);
-        } else if(spec == DMT.FNPSwapRejected) {
-            return node.lm.handleSwapRejected(m);
-        } else if(spec == DMT.FNPSwapCommit) {
-            return node.lm.handleSwapCommit(m);
-        } else if(spec == DMT.FNPSwapComplete) {
-            return node.lm.handleSwapComplete(m);
-        } else if(spec == DMT.FNPCHKDataRequest) {
-        	return handleDataRequest(m, false);
-        } else if(spec == DMT.FNPSSKDataRequest) {
-            return handleDataRequest(m, true);
-        } else if(spec == DMT.FNPInsertRequest) {
-        	return handleInsertRequest(m, false);
-        } else if(spec == DMT.FNPSSKInsertRequest) {
-            return handleInsertRequest(m, true);
-        } else if(spec == DMT.FNPRoutedPing) {
-            return handleRouted(m);
-        } else if(spec == DMT.FNPRoutedPong) {
-            return handleRoutedReply(m);
-        } else if(spec == DMT.FNPRoutedRejected) {
-            return handleRoutedRejected(m);
-        } else if(spec == DMT.FNPProbeRequest) {
-        	return handleProbeRequest(m, source);
-        } else if(spec == DMT.FNPProbeReply) {
-        	return handleProbeReply(m, source);
-        } else if(spec == DMT.FNPProbeRejected) {
-        	return handleProbeRejected(m, source);
-        }
-        return false;
-    }
+			return true;
+		} else if(spec == DMT.FNPLinkPong) {
+			long id = m.getLong(DMT.PING_SEQNO);
+			source.receivedLinkPong(id);
+			return true;
+		} else if(spec == DMT.FNPDetectedIPAddress) {
+			Peer p = (Peer) m.getObject(DMT.EXTERNAL_ADDRESS);
+			source.setRemoteDetectedPeer(p);
+			node.ipDetector.redetectAddress();
+			return true;
+		} else if(spec == DMT.FNPVoid) {
+			return true;
+		} else if(spec == DMT.nodeToNodeMessage) {
+			node.receivedNodeToNodeMessage(m);
+			return true;
+		} else if(spec == DMT.nodeToNodeTextMessage) {
+			node.receivedNodeToNodeTextMessage(m);
+			return true;
+		}
 
+		if(!source.isRoutable()) return false;
+
+		if(spec == DMT.FNPLocChangeNotification) {
+			double newLoc = m.getDouble(DMT.LOCATION);
+			source.updateLocation(newLoc);
+			return true;
+		} else if(spec == DMT.FNPSwapRequest) {
+			return node.lm.handleSwapRequest(m);
+		} else if(spec == DMT.FNPSwapReply) {
+			return node.lm.handleSwapReply(m);
+		} else if(spec == DMT.FNPSwapRejected) {
+			return node.lm.handleSwapRejected(m);
+		} else if(spec == DMT.FNPSwapCommit) {
+			return node.lm.handleSwapCommit(m);
+		} else if(spec == DMT.FNPSwapComplete) {
+			return node.lm.handleSwapComplete(m);
+		} else if(spec == DMT.FNPCHKDataRequest) {
+			return handleDataRequest(m, false);
+		} else if(spec == DMT.FNPSSKDataRequest) {
+			return handleDataRequest(m, true);
+		} else if(spec == DMT.FNPInsertRequest) {
+			return handleInsertRequest(m, false);
+		} else if(spec == DMT.FNPSSKInsertRequest) {
+			return handleInsertRequest(m, true);
+		} else if(spec == DMT.FNPRoutedPing) {
+			return handleRouted(m);
+		} else if(spec == DMT.FNPRoutedPong) {
+			return handleRoutedReply(m);
+		} else if(spec == DMT.FNPRoutedRejected) {
+			return handleRoutedRejected(m);
+		} else if(spec == DMT.FNPProbeRequest) {
+			return handleProbeRequest(m, source);
+		} else if(spec == DMT.FNPProbeReply) {
+			return handleProbeReply(m, source);
+		} else if(spec == DMT.FNPProbeRejected) {
+			return handleProbeRejected(m, source);
+		}
+		return false;
+	}
+
 	/**
-     * Handle an incoming FNPDataRequest.
-     */
-    private boolean handleDataRequest(Message m, boolean isSSK) {
-        long id = m.getLong(DMT.UID);
-        if(node.recentlyCompleted(id)) {
-            Message rejected = DMT.createFNPRejectedLoop(id);
-            try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting data request (loop, finished): "+e);
-            }
-            return true;
-        }
-        String rejectReason = node.shouldRejectRequest(!isSSK, false, isSSK);
-        if(rejectReason != null) {
-        	// can accept 1 CHK request every so often, but not with SSKs because they aren't throttled so won't sort out bwlimitDelayTime, which was the whole reason for accepting them when overloaded...
-        	Logger.normal(this, "Rejecting request from "+m.getSource().getPeer()+" preemptively because "+rejectReason);
-        	Message rejected = DMT.createFNPRejectedOverload(id, true);
-        	try {
-        		((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting (overload) data request from "+m.getSource().getPeer()+": "+e);
-        	}
-            node.completed(id);
-            return true;
-        }
-        if(!node.lockUID(id)) {
-        	if(logMINOR) Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already running)");
-            Message rejected = DMT.createFNPRejectedLoop(id);
-            try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting insert request from "+m.getSource().getPeer()+": "+e);
-            }
-            return true;
-        } else {
-        	if(logMINOR) Logger.minor(this, "Locked "+id);
-        }
-        //if(!node.lockUID(id)) return false;
-        RequestHandler rh = new RequestHandler(m, id, node);
-        Thread t = new Thread(rh, "RequestHandler for UID "+id);
-        t.setDaemon(true);
-        t.start();
-        return true;
-    }
-    
-    private boolean handleInsertRequest(Message m, boolean isSSK) {
-        long id = m.getLong(DMT.UID);
-        if(node.recentlyCompleted(id)) {
-            Message rejected = DMT.createFNPRejectedLoop(id);
-            try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting insert request from "+m.getSource().getPeer()+": "+e);
-            }
-            return true;
-        }
-        // SSKs don't fix bwlimitDelayTime so shouldn't be accepted when overloaded.
-        String rejectReason = node.shouldRejectRequest(!isSSK, true, isSSK);
-        if(rejectReason != null) {
-        	Logger.normal(this, "Rejecting insert from "+m.getSource().getPeer()+" preemptively because "+rejectReason);
-        	Message rejected = DMT.createFNPRejectedOverload(id, true);
-        	try {
-        		((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting (overload) insert request from "+m.getSource().getPeer()+": "+e);
-        	}
-            node.completed(id);
-            return true;
-        }
-        if(!node.lockUID(id)) {
-        	if(logMINOR) Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already running)");
-            Message rejected = DMT.createFNPRejectedLoop(id);
-            try {
-                ((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
-            } catch (NotConnectedException e) {
-                Logger.normal(this, "Rejecting insert request from "+m.getSource().getPeer()+": "+e);
-            }
-            return true;
-        }
-        long now = System.currentTimeMillis();
-        if(m.getSpec().equals(DMT.FNPSSKInsertRequest)) {
-        	SSKInsertHandler rh = new SSKInsertHandler(m, id, node, now);
-            Thread t = new Thread(rh, "InsertHandler for "+id+" on "+node.portNumber);
-            t.setDaemon(true);
-            t.start();
-        } else {
-        	InsertHandler rh = new InsertHandler(m, id, node, now);
-        	Thread t = new Thread(rh, "InsertHandler for "+id+" on "+node.portNumber);
-        	t.setDaemon(true);
-        	t.start();
-        }
-        if(logMINOR) Logger.minor(this, "Started InsertHandler for "+id);
-        return true;
-    }
+	 * Handle an incoming FNPDataRequest.
+	 */
+	private boolean handleDataRequest(Message m, boolean isSSK) {
+		long id = m.getLong(DMT.UID);
+		if(node.recentlyCompleted(id)) {
+			Message rejected = DMT.createFNPRejectedLoop(id);
+			try {
+				((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
+			} catch (NotConnectedException e) {
+				Logger.normal(this, "Rejecting data request (loop, finished): "+e);
+			}
+			return true;
+		}
+		String rejectReason = node.shouldRejectRequest(!isSSK, false, isSSK);
+		if(rejectReason != null) {
+			// can accept 1 CHK request every so often, but not with SSKs because they aren't throttled so won't sort out bwlimitDelayTime, which was the whole reason for accepting them when overloaded...
+			Logger.normal(this, "Rejecting request from "+m.getSource().getPeer()+" preemptively because "+rejectReason);
+			Message rejected = DMT.createFNPRejectedOverload(id, true);
+			try {
+				((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
+			} catch (NotConnectedException e) {
+				Logger.normal(this, "Rejecting (overload) data request from "+m.getSource().getPeer()+": "+e);
+			}
+			node.completed(id);
+			return true;
+		}
+		if(!node.lockUID(id)) {
+			if(logMINOR) Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already running)");
+			Message rejected = DMT.createFNPRejectedLoop(id);
+			try {
+				((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
+			} catch (NotConnectedException e) {
+				Logger.normal(this, "Rejecting insert request from "+m.getSource().getPeer()+": "+e);
+			}
+			return true;
+		} else {
+			if(logMINOR) Logger.minor(this, "Locked "+id);
+		}
+		//if(!node.lockUID(id)) return false;
+		RequestHandler rh = new RequestHandler(m, id, node);
+		Thread t = new Thread(rh, "RequestHandler for UID "+id);
+		t.setDaemon(true);
+		t.start();
+		return true;
+	}
 
-    final Hashtable routedContexts = new Hashtable();
-    
-    static class RoutedContext {
-        long createdTime;
-        long accessTime;
-        PeerNode source;
-        final HashSet routedTo;
-        final HashSet notIgnored;
-        Message msg;
-        short lastHtl;
-        
-        RoutedContext(Message msg) {
-            createdTime = accessTime = System.currentTimeMillis();
-            source = (PeerNode)msg.getSource();
-            routedTo = new HashSet();
-            notIgnored = new HashSet();
-            this.msg = msg;
-            lastHtl = msg.getShort(DMT.HTL);
-        }
-        
-        void addSent(PeerNode n) {
-            routedTo.add(n);
-        }
-    }
-    
-    /**
-     * Handle an FNPRoutedRejected message.
-     */
-    private boolean handleRoutedRejected(Message m) {
-        long id = m.getLong(DMT.UID);
-        Long lid = new Long(id);
-        RoutedContext rc = (RoutedContext) routedContexts.get(lid);
-        if(rc == null) {
-            // Gah
-            Logger.error(this, "Unrecognized FNPRoutedRejected");
-            return false; // locally originated??
-        }
-        short htl = rc.lastHtl;
-        if(rc.source != null)
-            htl = rc.source.decrementHTL(htl);
-        short ohtl = m.getShort(DMT.HTL);
-        if(ohtl < htl) htl = ohtl;
-        // Try routing to the next node
-        forward(rc.msg, id, rc.source, htl, rc.msg.getDouble(DMT.TARGET_LOCATION), rc);
-        return true;
-    }
+	private boolean handleInsertRequest(Message m, boolean isSSK) {
+		long id = m.getLong(DMT.UID);
+		if(node.recentlyCompleted(id)) {
+			Message rejected = DMT.createFNPRejectedLoop(id);
+			try {
+				((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
+			} catch (NotConnectedException e) {
+				Logger.normal(this, "Rejecting insert request from "+m.getSource().getPeer()+": "+e);
+			}
+			return true;
+		}
+		// SSKs don't fix bwlimitDelayTime so shouldn't be accepted when overloaded.
+		String rejectReason = node.shouldRejectRequest(!isSSK, true, isSSK);
+		if(rejectReason != null) {
+			Logger.normal(this, "Rejecting insert from "+m.getSource().getPeer()+" preemptively because "+rejectReason);
+			Message rejected = DMT.createFNPRejectedOverload(id, true);
+			try {
+				((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
+			} catch (NotConnectedException e) {
+				Logger.normal(this, "Rejecting (overload) insert request from "+m.getSource().getPeer()+": "+e);
+			}
+			node.completed(id);
+			return true;
+		}
+		if(!node.lockUID(id)) {
+			if(logMINOR) Logger.minor(this, "Could not lock ID "+id+" -> rejecting (already running)");
+			Message rejected = DMT.createFNPRejectedLoop(id);
+			try {
+				((PeerNode)(m.getSource())).sendAsync(rejected, null, 0, null);
+			} catch (NotConnectedException e) {
+				Logger.normal(this, "Rejecting insert request from "+m.getSource().getPeer()+": "+e);
+			}
+			return true;
+		}
+		long now = System.currentTimeMillis();
+		if(m.getSpec().equals(DMT.FNPSSKInsertRequest)) {
+			SSKInsertHandler rh = new SSKInsertHandler(m, id, node, now);
+			Thread t = new Thread(rh, "InsertHandler for "+id+" on "+node.portNumber);
+			t.setDaemon(true);
+			t.start();
+		} else {
+			InsertHandler rh = new InsertHandler(m, id, node, now);
+			Thread t = new Thread(rh, "InsertHandler for "+id+" on "+node.portNumber);
+			t.setDaemon(true);
+			t.start();
+		}
+		if(logMINOR) Logger.minor(this, "Started InsertHandler for "+id);
+		return true;
+	}
 
-    /**
-     * Handle a routed-to-a-specific-node message.
-     * @param m
-     * @return False if we want the message put back on the queue.
-     */
-    boolean handleRouted(Message m) {
-    	if(logMINOR) Logger.minor(this, "handleRouted("+m+ ')');
-        if((m.getSource() != null) && (!(m.getSource() instanceof PeerNode))) {
-            Logger.error(this, "Routed message but source "+m.getSource()+" not a PeerNode!");
-            return true;
-        }
-        long id = m.getLong(DMT.UID);
-        Long lid = new Long(id);
-        PeerNode pn = (PeerNode) (m.getSource());
-        short htl = m.getShort(DMT.HTL);
-        if(pn != null) htl = pn.decrementHTL(htl);
-        RoutedContext ctx;
-        ctx = (RoutedContext)routedContexts.get(lid);
-        if(ctx != null) {
-            try {
-                ((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id, (short)(htl-1)), null, 0, null);
-            } catch (NotConnectedException e) {
-            	if(logMINOR) Logger.minor(this, "Lost connection rejecting "+m);
-            }
-            return true;
-        }
-        ctx = new RoutedContext(m);
-        routedContexts.put(lid, ctx);
-        // pn == null => originated locally, keep full htl
-        double target = m.getDouble(DMT.TARGET_LOCATION);
-        if(logMINOR) Logger.minor(this, "id "+id+" from "+pn+" htl "+htl+" target "+target);
-        if(Math.abs(node.lm.getLocation().getValue() - target) <= Double.MIN_VALUE) {
-        	if(logMINOR) Logger.minor(this, "Dispatching "+m.getSpec()+" on "+node.portNumber);
-            // Handle locally
-            // Message type specific processing
-            dispatchRoutedMessage(m, pn, id);
-            return true;
-        } else if(htl == 0) {
-            Message reject = DMT.createFNPRoutedRejected(id, (short)0);
-            if(pn != null) try {
-                pn.sendAsync(reject, null, 0, null);
-            } catch (NotConnectedException e) {
-            	if(logMINOR) Logger.minor(this, "Lost connection rejecting "+m);
-            }
-            return true;
-        } else {
-            return forward(m, id, pn, htl, target, ctx);
-        }
-    }
+	final Hashtable routedContexts = new Hashtable();
 
-    boolean handleRoutedReply(Message m) {
-        long id = m.getLong(DMT.UID);
-        if(logMINOR) Logger.minor(this, "Got reply: "+m);
-        Long lid = new Long(id);
-        RoutedContext ctx = (RoutedContext) routedContexts.get(lid);
-        if(ctx == null) {
-            Logger.error(this, "Unrecognized routed reply: "+m);
-            return false;
-        }
-        PeerNode pn = ctx.source;
-        if(pn == null) return false;
-        try {
-            pn.sendAsync(m, null, 0, null);
-        } catch (NotConnectedException e) {
-        	if(logMINOR) Logger.minor(this, "Lost connection forwarding "+m+" to "+pn);
-        }
-        return true;
-    }
-    
-    private boolean forward(Message m, long id, PeerNode pn, short htl, double target, RoutedContext ctx) {
-    	if(logMINOR) Logger.minor(this, "Should forward");
-        // Forward
-        m = preForward(m, htl);
-        while(true) {
-            PeerNode next = node.peers.closerPeer(pn, ctx.routedTo, ctx.notIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
-            if(logMINOR) Logger.minor(this, "Next: "+next+" message: "+m);
-            if(next != null) {
-            	// next is connected, or at least has been => next.getPeer() CANNOT be null.
-            	if(logMINOR) Logger.minor(this, "Forwarding "+m.getSpec()+" to "+next.getPeer().getPort());
-                ctx.addSent(next);
-                try {
-                    next.sendAsync(m, null, 0, null);
-                } catch (NotConnectedException e) {
-                    continue;
-                }
-            } else {
-            	if(logMINOR) Logger.minor(this, "Reached dead end for "+m.getSpec()+" on "+node.portNumber);
-                // Reached a dead end...
-                Message reject = DMT.createFNPRoutedRejected(id, htl);
-                if(pn != null) try {
-                    pn.sendAsync(reject, null, 0, null);
-                } catch (NotConnectedException e) {
-                    Logger.error(this, "Cannot send reject message back to source "+pn);
-                    return true;
-                }
-            }
-            return true;
-        }
-    }
+	static class RoutedContext {
+		long createdTime;
+		long accessTime;
+		PeerNode source;
+		final HashSet routedTo;
+		final HashSet notIgnored;
+		Message msg;
+		short lastHtl;
 
-    /**
-     * Prepare a routed-to-node message for forwarding.
-     */
-    private Message preForward(Message m, short newHTL) {
-        m.set(DMT.HTL, newHTL); // update htl
-        if(m.getSpec() == DMT.FNPRoutedPing) {
-            int x = m.getInt(DMT.COUNTER);
-            x++;
-            m.set(DMT.COUNTER, x);
-        }
-        return m;
-    }
+		RoutedContext(Message msg) {
+			createdTime = accessTime = System.currentTimeMillis();
+			source = (PeerNode)msg.getSource();
+			routedTo = new HashSet();
+			notIgnored = new HashSet();
+			this.msg = msg;
+			lastHtl = msg.getShort(DMT.HTL);
+		}
 
-    /**
-     * Deal with a routed-to-node message that landed on this node.
-     * This is where message-type-specific code executes. 
-     * @param m
-     * @return
-     */
-    private boolean dispatchRoutedMessage(Message m, PeerNode src, long id) {
-        if(m.getSpec() == DMT.FNPRoutedPing) {
-        	if(logMINOR) Logger.minor(this, "RoutedPing reached other side!");
-            int x = m.getInt(DMT.COUNTER);
-            Message reply = DMT.createFNPRoutedPong(id, x);
-            try {
-                src.sendAsync(reply, null, 0, null);
-            } catch (NotConnectedException e) {
-            	if(logMINOR) Logger.minor(this, "Lost connection replying to "+m+" in dispatchRoutedMessage");
-            }
-            return true;
-        }
-        return false;
-    }
+		void addSent(PeerNode n) {
+			routedTo.add(n);
+		}
+	}
 
-    // Probe request handling
+	/**
+	 * Handle an FNPRoutedRejected message.
+	 */
+	private boolean handleRoutedRejected(Message m) {
+		long id = m.getLong(DMT.UID);
+		Long lid = new Long(id);
+		RoutedContext rc = (RoutedContext) routedContexts.get(lid);
+		if(rc == null) {
+			// Gah
+			Logger.error(this, "Unrecognized FNPRoutedRejected");
+			return false; // locally originated??
+		}
+		short htl = rc.lastHtl;
+		if(rc.source != null)
+			htl = rc.source.decrementHTL(htl);
+		short ohtl = m.getShort(DMT.HTL);
+		if(ohtl < htl) htl = ohtl;
+		// Try routing to the next node
+		forward(rc.msg, id, rc.source, htl, rc.msg.getDouble(DMT.TARGET_LOCATION), rc);
+		return true;
+	}
 
-    long tLastReceivedProbeRequest;
-    
-    static final int MAX_PROBE_CONTEXTS = 1000;
-    static final int MAX_PROBE_IDS = 10000;
-    
-    class ProbeContext {
+	/**
+	 * Handle a routed-to-a-specific-node message.
+	 * @param m
+	 * @return False if we want the message put back on the queue.
+	 */
+	boolean handleRouted(Message m) {
+		if(logMINOR) Logger.minor(this, "handleRouted("+m+ ')');
+		if((m.getSource() != null) && (!(m.getSource() instanceof PeerNode))) {
+			Logger.error(this, "Routed message but source "+m.getSource()+" not a PeerNode!");
+			return true;
+		}
+		long id = m.getLong(DMT.UID);
+		Long lid = new Long(id);
+		PeerNode pn = (PeerNode) (m.getSource());
+		short htl = m.getShort(DMT.HTL);
+		if(pn != null) htl = pn.decrementHTL(htl);
+		RoutedContext ctx;
+		ctx = (RoutedContext)routedContexts.get(lid);
+		if(ctx != null) {
+			try {
+				((PeerNode)m.getSource()).sendAsync(DMT.createFNPRoutedRejected(id, (short)(htl-1)), null, 0, null);
+			} catch (NotConnectedException e) {
+				if(logMINOR) Logger.minor(this, "Lost connection rejecting "+m);
+			}
+			return true;
+		}
+		ctx = new RoutedContext(m);
+		routedContexts.put(lid, ctx);
+		// pn == null => originated locally, keep full htl
+		double target = m.getDouble(DMT.TARGET_LOCATION);
+		if(logMINOR) Logger.minor(this, "id "+id+" from "+pn+" htl "+htl+" target "+target);
+		if(Math.abs(node.lm.getLocation().getValue() - target) <= Double.MIN_VALUE) {
+			if(logMINOR) Logger.minor(this, "Dispatching "+m.getSpec()+" on "+node.portNumber);
+			// Handle locally
+			// Message type specific processing
+			dispatchRoutedMessage(m, pn, id);
+			return true;
+		} else if(htl == 0) {
+			Message reject = DMT.createFNPRoutedRejected(id, (short)0);
+			if(pn != null) try {
+				pn.sendAsync(reject, null, 0, null);
+			} catch (NotConnectedException e) {
+				if(logMINOR) Logger.minor(this, "Lost connection rejecting "+m);
+			}
+			return true;
+		} else {
+			return forward(m, id, pn, htl, target, ctx);
+		}
+	}
 
-    	final PeerNode src; // FIXME make this a weak reference or something ? - Memory leak with high connection churn
-    	final HashSet visitedPeers;
-    	final ProbeCallback cb;
-    	short counter;
-    	short htl;
-    	double nearest;
-    	double best;
-    	
+	boolean handleRoutedReply(Message m) {
+		long id = m.getLong(DMT.UID);
+		if(logMINOR) Logger.minor(this, "Got reply: "+m);
+		Long lid = new Long(id);
+		RoutedContext ctx = (RoutedContext) routedContexts.get(lid);
+		if(ctx == null) {
+			Logger.error(this, "Unrecognized routed reply: "+m);
+			return false;
+		}
+		PeerNode pn = ctx.source;
+		if(pn == null) return false;
+		try {
+			pn.sendAsync(m, null, 0, null);
+		} catch (NotConnectedException e) {
+			if(logMINOR) Logger.minor(this, "Lost connection forwarding "+m+" to "+pn);
+		}
+		return true;
+	}
+
+	private boolean forward(Message m, long id, PeerNode pn, short htl, double target, RoutedContext ctx) {
+		if(logMINOR) Logger.minor(this, "Should forward");
+		// Forward
+		m = preForward(m, htl);
+		while(true) {
+			PeerNode next = node.peers.closerPeer(pn, ctx.routedTo, ctx.notIgnored, target, true, node.isAdvancedDarknetEnabled(), -1);
+			if(logMINOR) Logger.minor(this, "Next: "+next+" message: "+m);
+			if(next != null) {
+				// next is connected, or at least has been => next.getPeer() CANNOT be null.
+				if(logMINOR) Logger.minor(this, "Forwarding "+m.getSpec()+" to "+next.getPeer().getPort());
+				ctx.addSent(next);
+				try {
+					next.sendAsync(m, null, 0, null);
+				} catch (NotConnectedException e) {
+					continue;
+				}
+			} else {
+				if(logMINOR) Logger.minor(this, "Reached dead end for "+m.getSpec()+" on "+node.portNumber);
+				// Reached a dead end...
+				Message reject = DMT.createFNPRoutedRejected(id, htl);
+				if(pn != null) try {
+					pn.sendAsync(reject, null, 0, null);
+				} catch (NotConnectedException e) {
+					Logger.error(this, "Cannot send reject message back to source "+pn);
+					return true;
+				}
+			}
+			return true;
+		}
+	}
+
+	/**
+	 * Prepare a routed-to-node message for forwarding.
+	 */
+	private Message preForward(Message m, short newHTL) {
+		m.set(DMT.HTL, newHTL); // update htl
+		if(m.getSpec() == DMT.FNPRoutedPing) {
+			int x = m.getInt(DMT.COUNTER);
+			x++;
+			m.set(DMT.COUNTER, x);
+		}
+		return m;
+	}
+
+	/**
+	 * Deal with a routed-to-node message that landed on this node.
+	 * This is where message-type-specific code executes. 
+	 * @param m
+	 * @return
+	 */
+	private boolean dispatchRoutedMessage(Message m, PeerNode src, long id) {
+		if(m.getSpec() == DMT.FNPRoutedPing) {
+			if(logMINOR) Logger.minor(this, "RoutedPing reached other side!");
+			int x = m.getInt(DMT.COUNTER);
+			Message reply = DMT.createFNPRoutedPong(id, x);
+			try {
+				src.sendAsync(reply, null, 0, null);
+			} catch (NotConnectedException e) {
+				if(logMINOR) Logger.minor(this, "Lost connection replying to "+m+" in dispatchRoutedMessage");
+			}
+			return true;
+		}
+		return false;
+	}
+
+	// Probe request handling
+
+	long tLastReceivedProbeRequest;
+
+	static final int MAX_PROBE_CONTEXTS = 1000;
+	static final int MAX_PROBE_IDS = 10000;
+
+	class ProbeContext {
+
+		final PeerNode src; // FIXME make this a weak reference or something ? - Memory leak with high connection churn
+		final HashSet visitedPeers;
+		final ProbeCallback cb;
+		short counter;
+		short htl;
+		double nearest;
+		double best;
+
 		public ProbeContext(long id, double target, double best, double nearest, short htl, short counter, PeerNode src, ProbeCallback cb) {
 			visitedPeers = new HashSet();
 			this.counter = counter;
@@ -430,20 +430,20 @@
 			this.src = src;
 			this.cb = cb;
 		}
-    	
-    }
-    
-    final LRUQueue recentProbeRequestIDs = new LRUQueue();
-    final LRUHashtable recentProbeContexts = new LRUHashtable();
-    
-    /** 
-     * Handle a probe request.
-     * Reject it if it's looped.
-     * Look up (and promote) its context object.
-     * Update its HTL, nearest-seen and best-seen.
-     * Complete it if it has run out of HTL.
-     * Otherwise forward it.
-     **/
+
+	}
+
+	final LRUQueue recentProbeRequestIDs = new LRUQueue();
+	final LRUHashtable recentProbeContexts = new LRUHashtable();
+
+	/** 
+	 * Handle a probe request.
+	 * Reject it if it's looped.
+	 * Look up (and promote) its context object.
+	 * Update its HTL, nearest-seen and best-seen.
+	 * Complete it if it has run out of HTL.
+	 * Otherwise forward it.
+	 **/
 	private boolean handleProbeRequest(Message m, PeerNode src) {
 		long id = m.getLong(DMT.UID);
 		Long lid = new Long(id);
@@ -475,10 +475,10 @@
 		return innerHandleProbeRequest(src, id, lid, target, best, nearest, htl, counter, true, true, null);
 	}
 
-    private boolean innerHandleProbeRequest(PeerNode src, long id, Long lid, double target, double best, 
-    		double nearest, short htl, short counter, boolean checkRecent, boolean canReject, ProbeCallback cb) {
-    	if(htl > Node.MAX_HTL) htl = Node.MAX_HTL;
-    	if(htl <= 1) htl = 1;
+	private boolean innerHandleProbeRequest(PeerNode src, long id, Long lid, double target, double best, 
+			double nearest, short htl, short counter, boolean checkRecent, boolean canReject, ProbeCallback cb) {
+		if(htl > Node.MAX_HTL) htl = Node.MAX_HTL;
+		if(htl <= 1) htl = 1;
 		ProbeContext ctx = null;
 		boolean rejected = false;
 		boolean isNew = false;
@@ -545,18 +545,18 @@
 			nearest = ctx.nearest;
 		}
 		Logger.minor(this, "htl="+htl+", nearest="+nearest+", ctx.htl="+ctx.htl+", ctx.nearest="+ctx.nearest);
-		
+
 		PeerNode[] peers = node.peers.myPeers;
-		
+
 		double myLoc = node.getLocation();
 		// Update best
-		
+
 		if(myLoc > target && myLoc < best)
 			best = myLoc;
-		
+
 		if(ctx.best > target && ctx.best < best)
 			best = ctx.best;
-		
+
 		for(int i=0;i<peers.length;i++) {
 			if(!peers[i].isConnected()) {
 				if(logMINOR)
@@ -575,9 +575,9 @@
 				best = loc;
 			}
 		}
-		
+
 		// Update nearest, htl
-		
+
 		if(PeerManager.distance(myLoc, target) < PeerManager.distance(nearest, target)) {
 			if(logMINOR)
 				Logger.minor(this, "Updating nearest to "+myLoc+" from "+nearest+" for "+target+" and resetting htl from "+htl+" to "+Node.MAX_HTL);
@@ -593,7 +593,7 @@
 			if(logMINOR)
 				Logger.minor(this, "Updated htl to "+htl+" - myLoc="+myLoc+", target="+target+", nearest="+nearest);
 		}
-		
+
 		// Complete ?
 		if(htl == 0) {
 			if(src != null) {
@@ -609,15 +609,15 @@
 				complete("success", target, best, nearest, id, ctx, counter);
 			}
 		}
-		
+
 		// Otherwise route it
-		
+
 		HashSet visited = ctx.visitedPeers;
-		
+
 		while(true) {
-			
+
 			PeerNode pn = node.peers.closerPeer(src, visited, null, target, true, false, 965);
-			
+
 			if(pn == null) {
 				// Can't complete, because some HTL left
 				// Reject: RNF
@@ -633,9 +633,9 @@
 				}
 				return true;
 			}
-			
+
 			visited.add(pn);
-			
+
 			Message forwarded =
 				DMT.createFNPProbeRequest(id, target, nearest, best, htl, counter++);
 			try {
@@ -646,7 +646,7 @@
 				// Try another one
 			}
 		}
-		
+
 	}
 
 	private void complete(String msg, double target, double best, double nearest, long id, ProbeContext ctx, short counter) {
@@ -663,7 +663,7 @@
 		short counter = m.getShort(DMT.COUNTER);
 		if(logMINOR)
 			Logger.minor(this, "Probe reply: "+id+ ' ' +target+ ' ' +best+ ' ' +nearest);
-    	// Just propagate back to source
+		// Just propagate back to source
 		ProbeContext ctx;
 		synchronized(recentProbeContexts) {
 			ctx = (ProbeContext) recentProbeContexts.get(lid);
@@ -675,7 +675,7 @@
 			while(recentProbeContexts.size() > MAX_PROBE_CONTEXTS)
 				recentProbeContexts.popValue();
 		}
-		
+
 		if(ctx.src != null) {
 			Message complete = DMT.createFNPProbeReply(id, target, nearest, best, counter++);
 			try {
@@ -690,7 +690,7 @@
 		return true;
 	}
 
-    private boolean handleProbeRejected(Message m, PeerNode src) {
+	private boolean handleProbeRejected(Message m, PeerNode src) {
 		long id = m.getLong(DMT.UID);
 		Long lid = new Long(id);
 		double target = m.getDouble(DMT.TARGET_LOCATION);
@@ -701,7 +701,7 @@
 		short reason = m.getShort(DMT.REASON);
 		if(logMINOR)
 			Logger.minor(this, "Probe rejected: "+id+ ' ' +target+ ' ' +best+ ' ' +nearest+ ' ' +htl+ ' ' +counter+ ' ' +reason);
-		
+
 		ProbeContext ctx;
 		synchronized(recentProbeContexts) {
 			ctx = (ProbeContext) recentProbeContexts.get(lid);
@@ -713,9 +713,9 @@
 			while(recentProbeContexts.size() > MAX_PROBE_CONTEXTS)
 				recentProbeContexts.popValue();
 		}
-		
+
 		return innerHandleProbeRequest(src, id, lid, target, best, nearest, htl, counter, false, false, null);
-    }
+	}
 
 	public void startProbe(double d, ProbeCallback cb) {
 		long l = node.random.nextLong();
@@ -726,5 +726,4 @@
 		double nodeLoc = node.getLocation();
 		innerHandleProbeRequest(null, l, ll, d, (nodeLoc > d) ? nodeLoc : 1.0, nodeLoc, Node.MAX_HTL, (short)0, false, false, cb);
 	}
-
 }




More information about the cvs mailing list