[freenet-cvs] r11679 - in trunk/apps/load-balancing-sims/phase7/sim: . clients messages

mrogers at freenetproject.org mrogers at freenetproject.org
Mon Feb 5 11:52:16 UTC 2007


Author: mrogers
Date: 2007-02-05 11:52:15 +0000 (Mon, 05 Feb 2007)
New Revision: 11679

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Packet.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/Sim.java
   trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java
   trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java
   trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java
   trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java
   trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java
Log:
Disabled ack coalescing in preparation for improved RTO/FRTO calculation

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -1,5 +1,5 @@
 package sim;
-import sim.generators.Client;
+import sim.clients.Client;
 import sim.handlers.*;
 import sim.messages.*;
 import java.util.HashMap;

Modified: trunk/apps/load-balancing-sims/phase7/sim/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Packet.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/Packet.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -1,40 +1,32 @@
 // A low-level packet (as opposed to a high-level message)
 
 package sim;
-import sim.messages.Ack;
 import sim.messages.Message;
 import java.util.ArrayList;
 
 class Packet
 {
-	public final static int HEADER_SIZE = 70; // Including IP & UDP headers
-	public final static int ACK_SIZE = 4; // Size of an ack in bytes
+	public final static int HEADER_SIZE = 60; // Including IP & UDP headers
 	public final static int MAX_SIZE = 1450; // MTU including headers
 	public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
 	
 	public final int src, dest; // Network addresses
 	public int size = HEADER_SIZE; // Size in bytes, including headers
 	public int seq = -1; // Data sequence number (-1 if no data)
-	public ArrayList<Ack> acks = null;
+	public int ack = -1; // Ack sequence number (-1 if no ack)
 	public ArrayList<Message> messages = null;
 	
 	public double sent; // Time at which the packet was (re) transmitted
 	public double latency; // Link latency, stored here for convenience
 	
-	public Packet (int src, int dest, double latency)
+	public Packet (int src, int dest, double latency, int ack)
 	{
 		this.src = src;
 		this.dest = dest;
 		this.latency = latency;
+		this.ack = ack;
 	}
 	
-	public void addAck (Ack a)
-	{
-		if (acks == null) acks = new ArrayList<Ack>();
-		acks.add (a);
-		size += a.size();
-	}
-	
 	public void addMessage (Message m)
 	{
 		if (messages == null) messages = new ArrayList<Message>();

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -36,7 +36,6 @@
 	private int txSeq = 0; // Sequence number of next outgoing data packet
 	private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
 	private LinkedList<Packet> txBuffer; // Retransmission buffer
-	private DeadlineQueue<Ack> ackQueue; // Outgoing acks
 	private DeadlineQueue<Message> searchQueue; // Outgoing search messages
 	private DeadlineQueue<Message> transferQueue; // Outgoing transfers
 	private CongestionWindow window; // AIMD congestion window
@@ -61,7 +60,6 @@
 		this.location = location;
 		this.latency = latency;
 		txBuffer = new LinkedList<Packet>();
-		ackQueue = new DeadlineQueue<Ack>();
 		searchQueue = new DeadlineQueue<Message>();
 		transferQueue = new DeadlineQueue<Message>();
 		window = new CongestionWindow (this);
@@ -83,20 +81,9 @@
 		// Start the coalescing timer
 		startTimer();
 		// Send as many packets as possible
-		while (send());
+		while (send (-1));
 	}
 	
-	// Queue an ack for transmission
-	private void sendAck (int seq)
-	{
-		if (LOG) log ("ack " + seq + " added to ack queue");
-		ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
-		// Start the coalescing timer
-		startTimer();
-		// Send as many packets as possible
-		while (send());
-	}
-	
 	// Start the coalescing timer
 	private void startTimer()
 	{
@@ -107,11 +94,11 @@
 	}
 	
 	// Try to send a packet, return true if a packet was sent
-	private boolean send()
+	private boolean send (int ack)
 	{
-		int waiting = ackQueue.size+searchQueue.size+transferQueue.size;
+		int waiting = searchQueue.size + transferQueue.size;
 		if (LOG) log (waiting + " bytes waiting");
-		if (waiting == 0) return false;
+		if (ack == -1 && waiting == 0) return false;
 		
 		// Return to slow start when the link is idle
 		double now = Event.time();
@@ -123,29 +110,30 @@
 		size = Math.min (size, node.bandwidth.available());
 		if (LOG) log (size + " bytes available for packet");
 		
-		// Urgent acks to send?
-		if (ackQueue.deadline() <= now) return sendPacket (size);
+		// Ack to send?
+		if (ack != -1) return sendPacket (ack, size);
 		// Urgent searches and room to send them?
 		if (searchQueue.deadline() <= now
-		&& searchQueue.headSize() <= size) return sendPacket (size);
+		&& searchQueue.headSize() <= size)
+			return sendPacket (ack, size);
 		// Urgent transfers and room to send them?
 		if (transferQueue.deadline() <= now
-		&& transferQueue.headSize() <= size) return sendPacket (size);
+		&& transferQueue.headSize() <= size)
+			return sendPacket (ack, size);
 		// Enough non-urgent messages for a large packet, and room?
 		if (waiting >= Packet.SENSIBLE_PAYLOAD
-		&& size >= Packet.SENSIBLE_PAYLOAD) return sendPacket (size);
+		&& size >= Packet.SENSIBLE_PAYLOAD)
+			return sendPacket (ack, size);
 		
 		if (LOG) log ("not sending a packet");
 		return false;
 	}
 	
 	// Try to send a packet up to the specified size, return true if sent
-	private boolean sendPacket (int maxSize)
+	private boolean sendPacket (int ack, int maxSize)
 	{
 		// Construct a packet
-		Packet p = new Packet (node.net.address, address, latency);
-		// Add all waiting acks to the packet
-		while (ackQueue.size > 0) p.addAck (ackQueue.pop());
+		Packet p = new Packet (node.net.address, address, latency, ack);
 		if (LOG) log ((maxSize - p.size) + " bytes for messages");
 		// Don't allow more than SEQ_RANGE payloads to be in flight
 		if (txSeq <= txMaxSeq) {
@@ -169,7 +157,7 @@
 			log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
 		}
 		// Don't send empty packets
-		if (p.acks == null && p.messages == null) return false;
+		if (p.ack == -1 && p.messages == null) return false;
 		// Transmit the packet
 		if (LOG) log ("sending packet " +p.seq+ ", " +p.size+ " bytes");
 		node.sendPacket (p);
@@ -186,7 +174,7 @@
 	// Called by Node when a packet arrives
 	public void handlePacket (Packet p)
 	{
-		if (p.acks != null) for (Ack a : p.acks) handleAck (a);
+		if (p.ack != -1) handleAck (p.ack);
 		if (p.messages != null) handleData (p);
 	}
 	
@@ -195,7 +183,7 @@
 		if (LOG) log ("received packet " +p.seq+ ", expected " +rxSeq);
 		if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
 			if (LOG) log ("duplicate packet");
-			sendAck (p.seq); // Original ack may have been lost
+			send (p.seq); // Original ack may have been lost
 		}
 		else if (p.seq == rxSeq) {
 			// Find the sequence number of the next missing packet
@@ -204,7 +192,7 @@
 			// Deliver the messages to the node
 			for (Message m : p.messages)
 				node.handleMessage (m, this);
-			sendAck (p.seq);
+			send (p.seq);
 		}
 		else if (p.seq < rxSeq + SEQ_RANGE) {
 			if (LOG) log ("packet out of order");
@@ -212,37 +200,36 @@
 			// Deliver the messages to the node
 			for (Message m : p.messages)
 				node.handleMessage (m, this);
-			sendAck (p.seq);
+			send (p.seq);
 		}
 		// This indicates a misbehaving sender - discard the packet
 		else if (LOG) log ("WARNING: sequence number out of range");
 	}
 	
-	private void handleAck (Ack a)
+	private void handleAck (int ack)
 	{
-		int seq = a.id;
-		if (LOG) log ("received ack " + seq);
+		if (LOG) log ("received ack " + ack);
 		double now = Event.time();
 		Iterator<Packet> i = txBuffer.iterator();
 		while (i.hasNext()) {
 			Packet p = i.next();
 			double age = now - p.sent;
 			// Explicit ack
-			if (p.seq == seq) {
+			if (p.seq == ack) {
 				i.remove();
 				// Update the congestion window
 				window.bytesAcked (p.size);
 				// Update the average round-trip time
 				rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
 				if (LOG) {
-					log ("packet " +p.seq+ " acknowledged");
+					log ("packet " + ack + " acknowledged");
 					log ("round-trip time " + age);
 					log ("average round-trip time " + rtt);
 				}
 				break;
 			}
 			// Fast retransmission
-			if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
+			if (p.seq < ack && age > FRTO * rtt) {
 				p.sent = now;
 				if (LOG) log ("fast retransmitting " + p.seq);
 				node.resendPacket (p);
@@ -253,8 +240,8 @@
 		if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1;
 		else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
 		if (LOG) log ("maximum sequence number " + txMaxSeq);
-		// Send as many packets a possible
-		if (timerRunning) while (send());
+		// Send as many packets as possible
+		if (timerRunning) while (send (-1));
 		else checkDeadlines();
 	}
 	
@@ -326,7 +313,7 @@
 		
 		double now = Event.time();
 		for (Packet p : txBuffer) {
-			if (now - p.sent > RTO * rtt + MAX_DELAY) {
+			if (now - p.sent > RTO * rtt) {
 				// Retransmission timeout
 				if (LOG) log ("retransmitting " + p.seq);
 				p.sent = now;
@@ -341,11 +328,11 @@
 	private void checkDeadlines()
 	{
 		// Send as many packets as possible
-		while (send());
+		while (send (-1));
 		// Find the next coalescing deadline - ignore message deadlines
 		// if there isn't room in the congestion window to send them
-		double dl = ackQueue.deadline();
-		int win = window.available() -Packet.HEADER_SIZE -ackQueue.size;
+		double dl = Double.POSITIVE_INFINITY;
+		int win = window.available() - Packet.HEADER_SIZE;
 		if (searchQueue.headSize() <= win)
 			dl = Math.min (dl, searchQueue.deadline());
 		if (transferQueue.headSize() <= win)
@@ -370,12 +357,9 @@
 	// Are we waiting for the bandwidth limiter?
 	private boolean shouldPoll()
 	{
-		double now = Event.time();
-		// Will we need to send an ack before the next polling interval?
-		if (ackQueue.deadline() < now + node.bandwidth.poll)
-			return false;
 		double bw = node.bandwidth.available();
 		double win = window.available();
+		double now = Event.time();
 		// Is there an overdue search that's waiting for bandwidth?
 		if (searchQueue.headSize() > bw
 		&& searchQueue.headSize() <= win

Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Sim.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -1,5 +1,5 @@
 package sim;
-import sim.generators.SimplePublisher;
+import sim.clients.SimplePublisher;
 
 class Sim implements EventTarget
 {

Modified: trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -1,4 +1,4 @@
-package sim.generators;
+package sim.clients;
 import sim.messages.Search;
 
 public interface Client

Modified: trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -1,7 +1,7 @@
 // A simple publisher that inserts keys using a Poisson process and informs
 // each reader after an average of ten minutes
 
-package sim.generators;
+package sim.clients;
 import sim.Event;
 import sim.EventTarget;
 import sim.Node;

Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -1,5 +1,5 @@
 package sim.messages;
-import sim.generators.Client;
+import sim.clients.Client;
 
 public class ChkInsert extends Search
 {

Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -1,5 +1,5 @@
 package sim.messages;
-import sim.generators.Client;
+import sim.clients.Client;
 
 public class Search extends Message
 {

Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java	2007-02-05 00:36:43 UTC (rev 11678)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java	2007-02-05 11:52:15 UTC (rev 11679)
@@ -1,5 +1,5 @@
 package sim.messages;
-import sim.generators.Client;
+import sim.clients.Client;
 
 public class SskInsert extends Search
 {




More information about the cvs mailing list