[freenet-cvs] r11903 - in trunk/apps/load-balancing-sims/phase7/sim: . messages

mrogers at freenetproject.org mrogers at freenetproject.org
Fri Feb 23 20:30:07 UTC 2007


Author: mrogers
Date: 2007-02-23 20:30:06 +0000 (Fri, 23 Feb 2007)
New Revision: 11903

Removed:
   trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java
Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
Log:
Cleaner (saner?) coalescing and retransmission code

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java	2007-02-23 18:11:16 UTC (rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java	2007-02-23 20:30:06 UTC (rev 11903)
@@ -12,9 +12,6 @@
 {
 	public final static boolean LOG = false;
 	
-	// Coarse-grained retransmission timer
-	public final static double RETX_TIMER = 0.1; // Seconds
-	
 	// Flow control
 	public static boolean useTokens = false;
 	public static boolean useBackoff = false;
@@ -43,7 +40,6 @@
 	private boolean decrementMaxHtl = false;
 	private boolean decrementMinHtl = false;
 	public TokenBucket bandwidth; // Bandwidth limiter
-	private boolean timerRunning = false;
 	private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
 	private double delay = 0.0; // Delay caused by congestion or b/w limiter
 	private LinkedList<Search> searchQueue;
@@ -70,7 +66,7 @@
 		pubKeyCache = new LruCache<Integer> (16000);
 		if (Math.random() < 0.5) decrementMaxHtl = true;
 		if (Math.random() < 0.25) decrementMinHtl = true;
-		bandwidth = new TokenBucket (40000, 60000);
+		bandwidth = new TokenBucket (40000, 400000);
 		searchQueue = new LinkedList<Search>();
 		if (useTokens) {
 			// Allocate flow control tokens after a short delay
@@ -230,15 +226,6 @@
 		else if (LOG) log ("public key " + key + " not added to store");
 	}
 	
-	// Called by Peer to start the retransmission timer
-	public void startTimer()
-	{
-		if (timerRunning) return;
-		timerRunning = true;
-		if (LOG) log ("starting retransmission timer");
-		Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
-	}
-	
 	// Called by Peer to transmit a packet for the first time
 	public void sendPacket (Packet p)
 	{
@@ -611,17 +598,6 @@
 		addToSearchQueue (si);
 	}
 	
-	private void checkTimeouts()
-	{
-		boolean stopTimer = true;
-		for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false;
-		if (stopTimer) {
-			if (LOG) log ("stopping retransmission timer");
-			timerRunning = false;
-		}
-		else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
-	}
-	
 	// Allocate all flow control tokens at startup
 	private void allocateTokens()
 	{
@@ -657,10 +633,6 @@
 			generateSskInsert ((Integer) data, 1, null);
 			break;
 			
-			case CHECK_TIMEOUTS:
-			checkTimeouts();
-			break;
-			
 			case ALLOCATE_TOKENS:
 			allocateTokens();
 			break;
@@ -676,7 +648,6 @@
 	public final static int REQUEST_SSK = 3;
 	public final static int INSERT_SSK = 4;
 	public final static int SSK_COLLISION = 5;
-	private final static int CHECK_TIMEOUTS = 6;
-	private final static int ALLOCATE_TOKENS = 7;
-	private final static int SEND_SEARCH = 8;
+	private final static int ALLOCATE_TOKENS = 6;
+	private final static int SEND_SEARCH = 7;
 }

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java	2007-02-23 18:11:16 UTC (rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java	2007-02-23 20:30:06 UTC (rev 11903)
@@ -19,9 +19,8 @@
 	public final static double RTT_DECAY = 0.9; // Exp moving average
 	public final static double LINK_IDLE = 8.0; // RTTs without transmitting
 	
-	// Coalescing
-	public final static double MAX_DELAY = 0.1; // Max coalescing delay
-	public final static double MIN_SLEEP = 0.01; // Forty winks
+	// Retransmission/coalescing timer
+	public final static double TICK = 0.1; // Timer granularity, seconds
 	
 	// Backoff
 	public final static double INITIAL_BACKOFF = 1.0; // Seconds
@@ -39,9 +38,9 @@
 	private DeadlineQueue<Message> searchQueue; // Outgoing search messages
 	private DeadlineQueue<Message> transferQueue; // Outgoing transfers
 	private CongestionWindow window; // AIMD congestion window
-	private double lastTransmission = Double.POSITIVE_INFINITY; // Time
+	private double lastTransmission = Double.POSITIVE_INFINITY; // Abs. time
 	private boolean tgif = false; // "Transfers go in first" toggle
-	private boolean timerRunning = false; // Coalescing timer
+	private boolean timerRunning = false; // Retransmission/coalescing timer
 	
 	// Receiver state
 	private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -50,8 +49,8 @@
 	// Flow control
 	private int tokensOut = 0; // How many searches can we send?
 	private int tokensIn = 0; // How many searches should we accept?
-	public double backoffUntil = 0.0; // Time
-	public double backoffLength = INITIAL_BACKOFF; // Seconds
+	public double backoffUntil = 0.0; // Absolute time, seconds
+	public double backoffLength = INITIAL_BACKOFF; // Relative time, seconds
 	
 	public Peer (Node node, int address, double location, double latency)
 	{
@@ -69,7 +68,7 @@
 	// Queue a message for transmission
 	public void sendMessage (Message m)
 	{
-		m.deadline = Event.time() + MAX_DELAY;
+		m.deadline = Event.time() + TICK;
 		if (m instanceof Block) {
 			if (LOG) log (m + " added to transfer queue");
 			transferQueue.add (m);
@@ -84,13 +83,13 @@
 		while (send (-1));
 	}
 	
-	// Start the coalescing timer
+	// Start the retransmission/coalescing timer
 	private void startTimer()
 	{
 		if (timerRunning) return;
 		timerRunning = true;
-		if (LOG) log ("starting coalescing timer");
-		Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
+		if (LOG) log ("starting timer");
+		Event.schedule (this, TICK, TIMER, null);
 	}
 	
 	// Try to send a packet, return true if a packet was sent
@@ -165,7 +164,7 @@
 		if (p.messages != null) {
 			p.sent = Event.time();
 			txBuffer.add (p);
-			node.startTimer(); // Start the retransmission timer
+			startTimer(); // Start the retransmission timer
 			window.bytesSent (p.size);
 		}
 		return true;
@@ -241,8 +240,7 @@
 		else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
 		if (LOG) log ("maximum sequence number " + txMaxSeq);
 		// Send as many packets as possible
-		if (timerRunning) while (send (-1));
-		else checkDeadlines();
+		while (send (-1));
 	}
 	
 	// When a local RejectedOverload is received, back off unless backed off
@@ -305,12 +303,19 @@
 		return tokensIn;
 	}
 	
-	// Check retx timeouts, return true if there are packets in flight
-	public boolean checkTimeouts()
+	// Event callback: wake up, send packets, go back to sleep
+	private void timer()
 	{
-		if (LOG) log (txBuffer.size() + " packets in flight");
-		if (txBuffer.isEmpty()) return false;
-		
+		// Send as many packets as possible
+		while (send (-1));
+		// Stop the timer if there's nothing to wait for
+		if (searchQueue.size + transferQueue.size == 0
+		&& txBuffer.isEmpty()) {
+			if (LOG) log ("stopping timer");
+			timerRunning = false;
+			return;
+		}
+		// Check the retransmission timeouts
 		double now = Event.time();
 		for (Packet p : txBuffer) {
 			if (now - p.sent > RTO * rtt) {
@@ -321,57 +326,10 @@
 				window.timeout (now);
 			}
 		}
-		return true;
-	}
-	
-	// Event callback: wake up, send packets, go back to sleep
-	private void checkDeadlines()
-	{
-		// Send as many packets as possible
-		while (send (-1));
-		// Find the next coalescing deadline - ignore message deadlines
-		// if there isn't room in the congestion window to send them
-		double dl = Double.POSITIVE_INFINITY;
-		int win = window.available() - Packet.HEADER_SIZE;
-		if (searchQueue.headSize() <= win)
-			dl = Math.min (dl, searchQueue.deadline());
-		if (transferQueue.headSize() <= win)
-			dl = Math.min (dl, transferQueue.deadline());
-		// If there's no deadline, stop the timer
-		if (dl == Double.POSITIVE_INFINITY) {
-			if (timerRunning) {
-				if (LOG) log ("stopping coalescing timer");
-				timerRunning = false;
-			}
-			return;
-		}
 		// Schedule the next check
-		double sleep = dl - Event.time();
-		if (shouldPoll()) sleep = Math.max (sleep, node.bandwidth.poll);
-		else sleep = Math.max (sleep, MIN_SLEEP);
-		timerRunning = true;
-		if (LOG) log ("sleeping for " + sleep + " seconds");
-		Event.schedule (this, sleep, CHECK_DEADLINES, null);
+		Event.schedule (this, TICK, TIMER, null);
 	}
 	
-	// Are we waiting for the bandwidth limiter?
-	private boolean shouldPoll()
-	{
-		double bw = node.bandwidth.available();
-		double win = window.available();
-		double now = Event.time();
-		// Is there an overdue search that's waiting for bandwidth?
-		if (searchQueue.headSize() > bw
-		&& searchQueue.headSize() <= win
-		&& searchQueue.deadline() <= now) return true;
-		// Is there an overdue transfer that's waiting for bandwidth?
-		if (transferQueue.headSize() > bw
-		&& transferQueue.headSize() <= win
-		&& transferQueue.deadline() <= now) return true;
-		// We're waiting for something other than bandwidth
-		return false;
-	}
-	
 	public void log (String message)
 	{
 		Event.log (node.net.address + ":" + address + " " + message);
@@ -385,8 +343,8 @@
 	// EventTarget interface
 	public void handleEvent (int type, Object data)
 	{
-		if (type == CHECK_DEADLINES) checkDeadlines();
+		if (type == TIMER) timer();
 	}
 	
-	private final static int CHECK_DEADLINES = 1;
+	private final static int TIMER = 1;
 }

Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java	2007-02-23 18:11:16 UTC (rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java	2007-02-23 20:30:06 UTC (rev 11903)
@@ -2,17 +2,13 @@
 
 class TokenBucket
 {
-	public final double rate, size, poll;
+	public final double rate, size;
 	private double tokens, lastUpdated;
 	
 	public TokenBucket (double rate, double size)
 	{
 		this.rate = rate; // Bandwidth limit in bytes per second
 		this.size = size; // Size of maximum burst in bytes
-		double poll = Packet.MAX_SIZE / rate;
-		if (poll < Peer.MIN_SLEEP) poll = Peer.MIN_SLEEP;
-		if (poll > Peer.MAX_DELAY) poll = Peer.MAX_DELAY;
-		this.poll = poll; // Polling interval in seconds
 		tokens = size;
 		lastUpdated = 0.0; // Time
 	}

Deleted: trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java	2007-02-23 18:11:16 UTC (rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java	2007-02-23 20:30:06 UTC (rev 11903)
@@ -1,15 +0,0 @@
-package sim.messages;
-
-public class Ack extends Message
-{
-	public Ack (int seq, double deadline)
-	{
-		id = seq; // Space-saving hack
-		this.deadline = deadline;
-	}
-	
-	public int size()
-	{
-		return ACK_SIZE;
-	}
-}




More information about the cvs mailing list