[freenet-cvs] r13435 - trunk/freenet/src/freenet/io/xfer

toad at freenetproject.org toad at freenetproject.org
Fri Jun 1 11:34:51 UTC 2007


Author: toad
Date: 2007-06-01 11:34:51 +0000 (Fri, 01 Jun 2007)
New Revision: 13435

Modified:
   trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
   trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
Log:
Completed receiving code.

Modified: trunk/freenet/src/freenet/io/xfer/BulkReceiver.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/BulkReceiver.java	2007-06-01 11:15:19 UTC (rev 13434)
+++ trunk/freenet/src/freenet/io/xfer/BulkReceiver.java	2007-06-01 11:34:51 UTC (rev 13435)
@@ -4,8 +4,13 @@
 package freenet.io.xfer;
 
 import freenet.io.comm.DMT;
+import freenet.io.comm.DisconnectedException;
+import freenet.io.comm.Message;
+import freenet.io.comm.MessageFilter;
 import freenet.io.comm.NotConnectedException;
 import freenet.io.comm.PeerContext;
+import freenet.io.comm.RetrievalException;
+import freenet.support.ShortBuffer;
 
 /**
  * Bulk (not block) data transfer - receiver class. Bulk transfer is designed for largish files, much
@@ -13,7 +18,8 @@
  * @author toad
  */
 public class BulkReceiver {
-	
+
+	static final int TIMEOUT = 5*60*1000;
 	/** Tracks the data we have received */
 	final PartiallyReceivedBulk prb;
 	/** Peer we are receiving from */
@@ -21,11 +27,14 @@
 	/** Transfer UID for messages */
 	final long uid;
 	private boolean sentCancel;
+	/** Not persistent over reboots */
+	final long peerBootID;
 
 	public BulkReceiver(PartiallyReceivedBulk prb, PeerContext peer, long uid) {
 		this.prb = prb;
 		this.peer = peer;
 		this.uid = uid;
+		this.peerBootID = peer.getBootID();
 	}
 
 	public void onAborted() {
@@ -40,4 +49,39 @@
 		}
 	}
 
+	/**
+	 * Receive the file.
+	 * @return True if the whole file was received, false otherwise.
+	 */
+	public boolean receive() {
+		MessageFilter mfSendKilled = MessageFilter.create().setSource(peer).setType(DMT.FNPBulkSendAborted) .setField(DMT.UID, uid).setTimeout(TIMEOUT);
+		MessageFilter mfPacket = MessageFilter.create().setSource(peer).setType(DMT.FNPBulkPacketSend) .setField(DMT.UID, uid).setTimeout(TIMEOUT);
+		while(true) {
+			if(prb.hasWholeFile()) return true;
+			Message m;
+			try {
+				m = prb.usm.waitFor(mfSendKilled.or(mfPacket), null);
+			} catch (DisconnectedException e) {
+				prb.abort(RetrievalException.SENDER_DISCONNECTED, "Sender disconnected");
+				return false;
+			}
+			if(peer.getBootID() != peerBootID) {
+				prb.abort(RetrievalException.SENDER_DIED, "Sender restarted");
+				return false;
+			}
+			if(m == null) {
+				prb.abort(RetrievalException.TIMED_OUT, "Sender timeout");
+				return false;
+			}
+			if(m.getSpec() == DMT.FNPBulkSendAborted) {
+				prb.abort(RetrievalException.SENDER_DIED, "Sender cancelled send");
+				return false;
+			}
+			if(m.getSpec() == DMT.FNPBulkPacketSend) {
+				int packetNo = m.getInt(DMT.PACKET_NO);
+				byte[] data = ((ShortBuffer) m.getObject(DMT.DATA)).getData();
+				prb.received(packetNo, data, 0, data.length);
+			}
+		}
+	}
 }

Modified: trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java
===================================================================
--- trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java	2007-06-01 11:15:19 UTC (rev 13434)
+++ trunk/freenet/src/freenet/io/xfer/PartiallyReceivedBulk.java	2007-06-01 11:34:51 UTC (rev 13435)
@@ -96,10 +96,18 @@
 	 * Called when a block has been received. Will copy the data from the provided buffer and store it.
 	 * @param blockNum The block number.
 	 * @param data The byte array from which to read the data.
-	 * @param offset The start of the 
+	 * @param offset The start of the data in the buffer.
 	 */
-	void received(int blockNum, byte[] data, int offset) {
+	void received(int blockNum, byte[] data, int offset, int length) {
 		BulkTransmitter[] notifyBTs;
+		long fileOffset = (long)blockNum * (long)blockSize;
+		int bs = (int) Math.max(blockSize, size - fileOffset);
+		if(length < bs) {
+			String err = "Data too short! Should be "+bs+" actually "+length;
+			Logger.error(this, err+" for "+this);
+			abort(RetrievalException.PREMATURE_EOF, err);
+			return;
+		}
 		synchronized(this) {
 			if(blocksReceived.bitAt(blockNum)) return; // ignore
 			blocksReceived.setBit(blockNum, true); // assume the rest of the function succeeds
@@ -107,8 +115,6 @@
 			notifyBTs = transmitters;
 		}
 		try {
-			long fileOffset = (long)blockNum * (long)blockSize;
-			int bs = (int) Math.max(blockSize, size - fileOffset);
 			raf.pwrite(fileOffset, data, offset, bs);
 		} catch (Throwable t) {
 			Logger.error(this, "Failed to store received block "+blockNum+" on "+this+" : "+t, t);




More information about the cvs mailing list