[Cppfcplib] r14369 - trunk/apps/CppFCPLib

mkolar at freenetproject.org mkolar at freenetproject.org
Fri Jul 27 01:31:47 UTC 2007


Author: mkolar
Date: 2007-07-27 01:31:46 +0000 (Fri, 27 Jul 2007)
New Revision: 14369

Modified:
   trunk/apps/CppFCPLib/JobTicket.h
   trunk/apps/CppFCPLib/Node.cpp
   trunk/apps/CppFCPLib/Node.h
   trunk/apps/CppFCPLib/Server.cpp
   trunk/apps/CppFCPLib/Server.h
   trunk/apps/CppFCPLib/ServerMessage.cpp
   trunk/apps/CppFCPLib/ServerMessage.h
Log:
* get direct, persistence=connection



Modified: trunk/apps/CppFCPLib/JobTicket.h
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.h	2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/JobTicket.h	2007-07-27 01:31:46 UTC (rev 14369)
@@ -116,19 +116,24 @@
 class GetJob : public JobTicket {
 public:
   typedef boost::shared_ptr<GetJob> Ptr;
+  enum ReturnType { Direct, Disk, None };
 
 private:
+  ReturnType retType;
   std::ostream *stream;
   GetJob()
     : JobTicket(),
+      retType(Direct),
       stream(NULL)
   {}
 
   static Ptr factory(std::string id, Message::Ptr cmd);
   GetJob& setStream( std::ostream *s ) { stream = s; return *this; }
+  GetJob& setReturnType( ReturnType r ) { retType = r; return *this; }
 public:
   ~GetJob() { if (stream != NULL) delete stream; }
   std::ostream& getStream() { return *stream; }
+  ReturnType getReturnType() const { return retType; }
 
   friend class Node;
   friend class NodeThread;

Modified: trunk/apps/CppFCPLib/Node.cpp
===================================================================
--- trunk/apps/CppFCPLib/Node.cpp	2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/Node.cpp	2007-07-27 01:31:46 UTC (rev 14369)
@@ -630,7 +630,7 @@
   }
 }
 
-JobTicket::Ptr
+GetJob::Ptr
 Node::getDisk(const std::string URI, const std::string filename, const std::string id, const AdditionalFields& fields )
 {
   std::string identifier = id == "" ? _getUniqueId() : id;
@@ -669,8 +669,9 @@
     m->setField("Filename", filename);
     if (fields.hasField("TempFilename")) m->setField("TempFilename", fields.getField("TempFilename"));
 
-    JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
+    GetJob::Ptr job = GetJob::factory( m->getField("Identifier"), m );
     job->setGlobal( global ).setPersistent( persistent );
+    job->setReturnType( GetJob::Disk );
 
     clientReqQueue->put(job);
 
@@ -686,7 +687,7 @@
   // TODO :: implement fallback
 }
 
-JobTicket::Ptr
+GetJob::Ptr
 Node::fetchData(const std::string URI, const std::string id, const AdditionalFields& fields)
 {
   std::string identifier = id == "" ? _getUniqueId() : id;
@@ -716,14 +717,58 @@
   if (fields.hasField("BinaryBlob")) m->setField("BinaryBlob", fields.getField("BinaryBlob"));
   if (fields.hasField("AllowedMIMETypes")) m->setField("AllowedMIMETypes", fields.getField("AllowedMIMETypes"));
 
-  JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
+  GetJob::Ptr job = GetJob::factory( m->getField("Identifier"), m );
   job->setGlobal( global ).setPersistent( persistent );
+  job->setReturnType( GetJob::None );
 
   clientReqQueue->put(job);
 
   return job;
 }
+GetJob::Ptr
+Node::getDirect(const std::string URI, const std::string id, std::ostream* stream, const AdditionalFields& fields)
+{
+  // TODO:: implement for persistent
+  std::string identifier = id == "" ? _getUniqueId() : id;
+  bool persistent = fields.hasField("Persistence") && fields.getField("Persistence") != "connection";
+  bool global = fields.hasField("Global") && fields.getField("Global") == "true";
+  if (global && !persistent)
+    throw std::invalid_argument("Global requests must be persistent");
 
+  if (stream == NULL && !persistent)
+    throw std::invalid_argument("You must specify stream when strarting Get with Persistence=connection");
+
+  Message::Ptr m = Message::factory( std::string("ClientGet") );
+
+  m->setField("URI", URI);
+  m->setField("Identifier", identifier);
+  if (fields.hasField("IgnoreDS")) m->setField("IgnoreDS", fields.getField("IgnoreDS"));
+  if (fields.hasField("DSonly")) m->setField("DSonly", fields.getField("DSonly"));
+  if (fields.hasField("Verbosity")) m->setField("Verbosity", fields.getField("Verbosity"));
+  if (fields.hasField("MaxSize")) m->setField("MaxSize", fields.getField("MaxSize"));
+  if (fields.hasField("MaxTempSize")) m->setField("MaxTempSize", fields.getField("MaxTempSize"));
+  if (fields.hasField("MaxRetries")) m->setField("MaxRetries", fields.getField("MaxRetries"));
+  if (fields.hasField("PriorityClass")) m->setField("PriorityClass", fields.getField("PriorityClass"));
+  if (fields.hasField("Persistence"))
+    m->setField("Persistence", fields.getField("Persistence"));
+  else
+    m->setField("Persistence", "connection");
+  if (fields.hasField("ClientToken")) m->setField("ClientToken", fields.getField("ClientToken"));
+  m->setField("Global", Converter::toString(global));
+  m->setField("ReturnType", "direct");
+  if (fields.hasField("BinaryBlob")) m->setField("BinaryBlob", fields.getField("BinaryBlob"));
+  if (fields.hasField("AllowedMIMETypes")) m->setField("AllowedMIMETypes", fields.getField("AllowedMIMETypes"));
+
+  GetJob::Ptr job = GetJob::factory( m->getField("Identifier"), m );
+  job->setGlobal( global ).setPersistent( persistent );
+  job->setReturnType( GetJob::Direct ).setStream( stream );
+
+  clientReqQueue->put(job);
+
+  return job;
+}
+
+
 JobTicket::Ptr
 Node::subscribeUSK(const std::string URI, const std::string id, bool dontPoll)
 {

Modified: trunk/apps/CppFCPLib/Node.h
===================================================================
--- trunk/apps/CppFCPLib/Node.h	2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/Node.h	2007-07-27 01:31:46 UTC (rev 14369)
@@ -88,17 +88,22 @@
                          const AdditionalFields& = AdditionalFields()
                          );
 
-  JobTicket::Ptr getDisk(const std::string , // URI
-                         const std::string , // Filename
-                         const std::string = "", // Identifier
-                         const AdditionalFields& = AdditionalFields()
-                         );
+  GetJob::Ptr getDisk(const std::string , // URI
+                      const std::string , // Filename
+                      const std::string = "", // Identifier
+                      const AdditionalFields& = AdditionalFields()
+                      );
 
-  JobTicket::Ptr fetchData(const std::string , // URI
-                           const std::string = "", // Identifier
-                           const AdditionalFields& = AdditionalFields()
-                           );
+  GetJob::Ptr fetchData(const std::string , // URI
+                        const std::string = "", // Identifier
+                        const AdditionalFields& = AdditionalFields()
+                        );
 
+  GetJob::Ptr getDirect(const std::string , // URI
+                        const std::string = "", // Identifier
+                        std::ostream* = NULL, // Stream
+                        const AdditionalFields& = AdditionalFields()
+                        );
 
 
   JobTicket::Ptr subscribeUSK(const std::string, const std::string, bool);

Modified: trunk/apps/CppFCPLib/Server.cpp
===================================================================
--- trunk/apps/CppFCPLib/Server.cpp	2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/Server.cpp	2007-07-27 01:31:46 UTC (rev 14369)
@@ -63,3 +63,9 @@
 bool Server::dataAvailable(){
   return socket_->available() != 0 || response.size() != 0;
 }
+
+void
+Server::read(boost::asio::mutable_buffers_1 buf)
+{
+  boost::asio::read(*socket_, buf);
+}

Modified: trunk/apps/CppFCPLib/Server.h
===================================================================
--- trunk/apps/CppFCPLib/Server.h	2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/Server.h	2007-07-27 01:31:46 UTC (rev 14369)
@@ -21,6 +21,7 @@
 public:
   ~Server();
   std::string readln();
+  void read(boost::asio::mutable_buffers_1);
   void send(const std::string &s);
   void send(Message::Ptr m);
   bool dataAvailable();

Modified: trunk/apps/CppFCPLib/ServerMessage.cpp
===================================================================
--- trunk/apps/CppFCPLib/ServerMessage.cpp	2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/ServerMessage.cpp	2007-07-27 01:31:46 UTC (rev 14369)
@@ -192,6 +192,18 @@
   return true;
 }
 
+bool
+IsLastDataFound::operator()(const JobTicketPtr job) const
+{
+  GetJob::Ptr job_ = boost::dynamic_pointer_cast<GetJob, JobTicket>( job );
+
+  if ( job_->getReturnType() != GetJob::Direct )
+    return true;
+
+  // non persistent job needs to ask for AllData
+  return job_->isPersistent();
+}
+
 void
 AllDataMessage::read(boost::shared_ptr<Server> s)
 {
@@ -214,3 +226,23 @@
   bytesToRead = boost::lexical_cast<int>( message->getField("DataLength") );
   log().log(DETAIL, " ... " + message->getField("DataLength") + " bytes of data ...");
 }
+
+bool
+AllDataMessage::isLast(const JobTicketPtr job) const
+{
+  GetJob::Ptr job_ = boost::dynamic_pointer_cast<GetJob, JobTicket>( job );
+
+  std::ostream& stream = job_->getStream();
+  char buf[1024];
+
+  int tmp = bytesToRead;
+  while (tmp > 0) {
+    int m = std::min<int>(tmp, 1024);
+    server->read(boost::asio::buffer(buf, m));
+    log().log(DEBUG, "NODE: read "+ boost::lexical_cast<string>( m ) + " bytes of data\n");
+    stream.write(buf, m);
+    tmp -= m;
+  }
+
+  return true;
+}

Modified: trunk/apps/CppFCPLib/ServerMessage.h
===================================================================
--- trunk/apps/CppFCPLib/ServerMessage.h	2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/ServerMessage.h	2007-07-27 01:31:46 UTC (rev 14369)
@@ -73,6 +73,11 @@
   bool operator()(const JobTicketPtr job) const;
 };
 
+struct IsLastDataFound {
+  Message::Ptr message;
+  IsLastDataFound( Message::Ptr m ) : message(m) {}
+  bool operator()(const JobTicketPtr job) const;
+};
 
 template<typename isLastT = IsLastTrue, bool isErrorT = false>
 class ServerMessageT : public ServerMessage {
@@ -96,7 +101,7 @@
 
   AllDataMessage() {}
 public:
-  bool isLast(const JobTicketPtr job) const { return true; }
+  bool isLast(const JobTicketPtr job) const;
   bool isError() const { return false; }
   friend class ServerMessage;
 };




More information about the Cppfcplib mailing list