[Cppfcplib] r14333 - trunk/apps/CppFCPLib

mkolar at freenetproject.org mkolar at freenetproject.org
Wed Jul 25 15:14:22 UTC 2007


Author: mkolar
Date: 2007-07-25 15:14:22 +0000 (Wed, 25 Jul 2007)
New Revision: 14333

Modified:
   trunk/apps/CppFCPLib/JobTicket.cpp
   trunk/apps/CppFCPLib/JobTicket.h
   trunk/apps/CppFCPLib/Node.cpp
   trunk/apps/CppFCPLib/Node.h
   trunk/apps/CppFCPLib/NodeThread.cpp
   trunk/apps/CppFCPLib/NodeThread.h
Log:
*  collection for local and global jobs
*  modifyPeerNote
*  listPersistenRequest



Modified: trunk/apps/CppFCPLib/JobTicket.cpp
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.cpp	2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/JobTicket.cpp	2007-07-25 15:14:22 UTC (rev 14333)
@@ -9,7 +9,7 @@
 using namespace FCPLib;
 
 JobTicket::Ptr
-JobTicket::factory(std::string id, Message::Ptr cmd, bool keep)
+JobTicket::factory(std::string id, Message::Ptr cmd)
 {
    log().log(NOISY, "Creating " + cmd->getHeader());
    Ptr ret( new JobTicket() );
@@ -17,8 +17,6 @@
    ret->id = id;
    ret->cmd = cmd;
 
-   ret->keep = keep;
-
    ret->lock.acquire();
    ret->reqSentLock.acquire();
 

Modified: trunk/apps/CppFCPLib/JobTicket.h
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.h	2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/JobTicket.h	2007-07-25 15:14:22 UTC (rev 14333)
@@ -19,14 +19,19 @@
 namespace FCPLib {
 
 class NodeThread;
+class Node;
 
 class JobTicket {
+public:
+  typedef boost::shared_ptr<JobTicket > Ptr;
+private:
   std::string id;
   Message::Ptr cmd;
 
   Response nodeResponse;
 
   bool keep;
+  bool global;
 
   std::string repr;
   bool isReprValid;
@@ -52,22 +57,20 @@
     nodeResponse.push_back(m);
   }
 
-  JobTicket() : isReprValid(false), _isFinished(false) {}
-public:
-  typedef boost::shared_ptr<JobTicket > Ptr;
+  JobTicket() : keep(false), global(false), isReprValid(false), _isFinished(false) {}
+  static Ptr factory(std::string id, Message::Ptr cmd);
 
-  static Ptr factory(std::string id, Message::Ptr cmd, bool keep);
-
+  JobTicket& setKeep( bool x ) { keep = x; return *this; };
+  JobTicket& setGlobal( bool x ) { global = x; return *this; };
   void setCallback( boost::function<void (int, const ServerMessage::Ptr)> f )
   {
     this->f = f;
   }
-
   void setCallback( void (*f)(int, const ServerMessage::Ptr) )
   {
     this->f = f;
   }
-
+public:
   const std::string& getCommandName() const;
   const std::string& getId() const;
   const Message::Ptr getCommand() const;
@@ -82,12 +85,15 @@
   }
   const std::string& toString();
 
+  bool isGlobal() const { return global; }
+
   bool isFinished()
   {
     ZThread::Guard<ZThread::Mutex> g(access);
     return _isFinished;
   }
 
+  friend class Node;
   friend class NodeThread;
 };
 

Modified: trunk/apps/CppFCPLib/Node.cpp
===================================================================
--- trunk/apps/CppFCPLib/Node.cpp	2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/Node.cpp	2007-07-25 15:14:22 UTC (rev 14333)
@@ -59,7 +59,7 @@
   m->setField("Name", name);
   m->setField("ExpectedVersion", "2.0");
 
-  JobTicket::Ptr job = JobTicket::factory("", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "Node constructor: waiting for response to ClientHello");
@@ -84,7 +84,7 @@
 
   m->setField("NodeIdentifier", identifier);
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for Peer message");
@@ -104,7 +104,7 @@
   if (fields.hasField("WithMetadata")) m->setField("WithMetadata", fields.getField("WithMetadata"));
   if (fields.hasField("WithVolatile")) m->setField("WithVolatile", fields.getField("WithVolatile"));
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for EndListPeers message");
@@ -123,7 +123,7 @@
   Message::Ptr m = Message::factory( std::string("ListPeerNotes") );
   m->setField("NodeIdentifier", identifier);
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for EndListPeerNotes message");
@@ -144,7 +144,7 @@
   else
     m->setField("URL", value);
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m);
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for Peer message");
@@ -164,7 +164,7 @@
 
   m->setFields(message);
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for Peer message");
@@ -188,7 +188,7 @@
   if (fields.hasField("IsDisabled")) m->setField("IsDisabled", fields.getField("IsDisabled"));
   if (fields.hasField("IsListenOnly")) m->setField("IsListenOnly", fields.getField("IsListenOnly"));
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for Peer message");
@@ -203,15 +203,15 @@
 PeerNote
 Node::modifyPeerNote(const std::string & nodeIdentifier,
                      const std::string & noteText,
-                     int peerNoteType = 1)
+                     int peerNoteType)
 {
   Message::Ptr m = Message::factory( std::string("ModifyPeerNote") );
 
   m->setField("NodeIdentifier", nodeIdentifier);
   m->setField("NoteText", Base64::base64Encode((const unsigned char*)noteText.c_str(), noteText.size()));
-  m->setField("PeerNoteType", "1");  // TODO: change to peerNoteType once it is used
+  m->setField("PeerNoteType", peerNoteType);
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for PeerNote message");
@@ -231,7 +231,7 @@
 
   m->setField("NodeIdentifier", identifier);
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for PeerRemoved message");
@@ -253,7 +253,7 @@
   if (fields.hasField("WithPrivate")) m->setField("WithPrivate", fields.getField("WithPrivate"));
   if (fields.hasField("WithVolatile")) m->setField("WithVolatile", fields.getField("WithVolatile"));
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for NodeData message");
@@ -279,7 +279,7 @@
   if (fields.hasField("WithShortDescription")) m->setField("WithShortDescription", fields.getField("WithShortDescription"));
   if (fields.hasField("WithLongDescription")) m->setField("WithLongDescription", fields.getField("WithLongDescription"));
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for ConfigData message");
@@ -298,7 +298,7 @@
   if (m->getHeader() != "ModifyConfig")
     throw std::logic_error("ModifyConfig message expected, " + m->getHeader() + " received");
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for ConfigData message");
@@ -322,7 +322,7 @@
   if (write)
     m->setField("WantWriteDirectory", "true");
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for TestDDAReply");
@@ -344,7 +344,7 @@
   if (readContent != "")
     m->setField("ReadContent", readContent);
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for TestDDAComplete");
@@ -426,7 +426,7 @@
   Message::Ptr m = Message::factory( std::string("GenerateSSK") );
   m->setField("Identifier", identifier);
 
-  JobTicket::Ptr job = JobTicket::factory( identifier, m, false);
+  JobTicket::Ptr job = JobTicket::factory( identifier, m );
   clientReqQueue->put(job);
 
   log().log(DEBUG, "waiting for SSKKeypair message");
@@ -461,7 +461,7 @@
 
   m->setStream(s, dataLength);
 
-  JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m, false);
+  JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
   clientReqQueue->put(job);
 
   job->waitTillReqSent(globalCommandsTimeout); // assure that there is a response
@@ -491,7 +491,7 @@
   m->setField("UploadFrom", "redirect");
   m->setField("TargetURI", target);
 
-  JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m, false);
+  JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
   log().log(DEBUG, job->toString());
   clientReqQueue->put(job);
 
@@ -567,7 +567,7 @@
   if (!r.readDirectory)
     m->setField("FileHash", fields.getField("FileHash"));
 
-  JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m, false);
+  JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
   log().log(DEBUG, job->toString());
   clientReqQueue->put(job);
 
@@ -604,7 +604,7 @@
   m->setField("Identifier", id);
   m->setField("DontPoll", Converter::toString( dontPoll ));
 
-  JobTicket::Ptr job = JobTicket::factory( id, m, false );
+  JobTicket::Ptr job = JobTicket::factory( id, m );
   clientReqQueue->put(job);
 
   return job;
@@ -617,23 +617,19 @@
   m->setField( "Enabled", Converter::toString( enabled ) );
   m->setField( "VerbosityMask", boost::lexical_cast<std::string>(verbosity) );
 
-  JobTicket::Ptr job = JobTicket::factory( "", m, false );
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 }
 
-MessagePtrContainer
-Node::listPersistentRequest()
+void
+Node::refreshPersistentRequest()
 {
   Message::Ptr m = Message::factory( std::string("ListPersistentRequest") );
-  JobTicket::Ptr job = JobTicket::factory( "", m, false);
+  JobTicket::Ptr job = JobTicket::factory( "", m );
   clientReqQueue->put(job);
 
-  log().log(DEBUG, "waiting for SSKKeypair message");
-  job->wait(globalCommandsTimeout);
+  // persistent jobs will be updated
 
-  Response resp = job->getResponse();
-  checkProtocolError(resp); // throws
-
-  // hmmm... this does not work probably as messages will contain Identifiers and will be assigned to other jobs...
-  return createResult<MessagePtrContainer, VectorWithoutLastConverter>( resp );
+  log().log(DEBUG, "waiting for EndListPersistentRequests message");
+  job->wait(globalCommandsTimeout);
 }

Modified: trunk/apps/CppFCPLib/Node.h
===================================================================
--- trunk/apps/CppFCPLib/Node.h	2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/Node.h	2007-07-25 15:14:22 UTC (rev 14333)
@@ -63,7 +63,7 @@
   Message::Ptr addPeer(const std::string &, bool isURL);
   Message::Ptr addPeer(const std::map<std::string, std::string> &message);
   Message::Ptr modifyPeer(const std::string &, const AdditionalFields& = AdditionalFields());
-  PeerNote modifyPeerNote(const std::string &, const std::string &, int);
+  PeerNote modifyPeerNote(const std::string &, const std::string &, int = 1);
   Message::Ptr removePeer(const std::string &);
 
   Message::Ptr getNode(const AdditionalFields& = AdditionalFields());
@@ -95,7 +95,7 @@
   JobTicket::Ptr subscribeUSK(const std::string, const std::string, bool);
 
   void watchGlobal( bool enabled, int verbosity );
-  MessagePtrContainer listPersistentRequest();
+  void refreshPersistentRequest();
 };
 }
 

Modified: trunk/apps/CppFCPLib/NodeThread.cpp
===================================================================
--- trunk/apps/CppFCPLib/NodeThread.cpp	2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/NodeThread.cpp	2007-07-25 15:14:22 UTC (rev 14333)
@@ -1,9 +1,12 @@
 
 
+#include <ctime>
+#include <boost/lexical_cast.hpp>
+
 #include "NodeThread.h"
 #include "Log.h"
-#include <ctime>
 
+
 using namespace FCPLib;
 using namespace ZThread;
 
@@ -85,7 +88,7 @@
   log().log(NOISY, "sendClientReq : top");
   if (job->getCommandName() != "WatchGlobal") {
     log().log(NOISY, "sendClientReq : about to add the job to the map");
-    jobs[job->getId()] = job;
+    jobs[job->isGlobal() ? 1 : 0][job->getId()] = job;
     log().log(NOISY, "sendClientReq : added the job to the map");
   }
 
@@ -99,19 +102,28 @@
   JobTicket::Ptr job;
   std::map<std::string, JobTicket::Ptr>::iterator it;
 
-  it = jobs.find(message->getIdOfJob());
-  if (it == jobs.end()) {
+  std::string tmp = message->getMessage()->getField("Global");
+  tmp = tmp == "" ? "false" : tmp;
+  int isGlobal = boost::lexical_cast<int>(tmp);
+
+  it = jobs[isGlobal].find(message->getIdOfJob());
+  if (it == jobs[isGlobal].end()) {
     log().log(DETAIL, "doMessage : received " + message->getMessage()->getHeader() + ", cannot find " + message->getIdOfJob() + " in started jobs");
     /// message from global queue or error
     Message::Ptr m = message->getMessage();
-    if ( m->getField("Identifier") == "" ) { // error
+    if (!isGlobal) { // error
       log().log(DEBUG, "doMessage : received error message");
       // TODO: create a mean of passing error messages to client programme
       return;
     } else { // global queue, create a job
       log().log(DEBUG, "doMessage : received message from a global queue");
-      JobTicket::Ptr job = JobTicket::factory(m->getField("Identifier"), m, false);
-      jobs[m->getField("Identifier")] = job;
+      if ( m->getField("Identifier") == "" ) {
+        // should never happen
+        log().log(ERROR, "doMessage : global message does not contain identifier !???");
+        return;
+      }
+      JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
+      jobs[1][m->getField("Identifier")] = job;
       return;
     }
   }
@@ -125,7 +137,7 @@
 
     if (!job->keep) {
       log().log(NOISY, "doMessage : job should not be kept, erasing");
-      jobs.erase( it );
+      jobs[isGlobal].erase( it );
     }
   }
   else {

Modified: trunk/apps/CppFCPLib/NodeThread.h
===================================================================
--- trunk/apps/CppFCPLib/NodeThread.h	2007-07-25 09:06:20 UTC (rev 14332)
+++ trunk/apps/CppFCPLib/NodeThread.h	2007-07-25 15:14:22 UTC (rev 14333)
@@ -30,7 +30,7 @@
   bool isAlive_;
   ZThread::CountedPtr<std::exception> exception;
 
-  std::map<std::string, JobTicket::Ptr > jobs;
+  std::map<std::string, JobTicket::Ptr > jobs[2]; // 0 -- local jobs, 1 -- global jobs
 
   friend class Node;
   NodeThread(std::string &host, int port, JobTicketQueuePtr clientReqQueue_) throw();




More information about the Cppfcplib mailing list