[Cppfcplib] r14369 - trunk/apps/CppFCPLib
mkolar at freenetproject.org
mkolar at freenetproject.org
Fri Jul 27 01:31:47 UTC 2007
Author: mkolar
Date: 2007-07-27 01:31:46 +0000 (Fri, 27 Jul 2007)
New Revision: 14369
Modified:
trunk/apps/CppFCPLib/JobTicket.h
trunk/apps/CppFCPLib/Node.cpp
trunk/apps/CppFCPLib/Node.h
trunk/apps/CppFCPLib/Server.cpp
trunk/apps/CppFCPLib/Server.h
trunk/apps/CppFCPLib/ServerMessage.cpp
trunk/apps/CppFCPLib/ServerMessage.h
Log:
* get direct, persistence=connection
Modified: trunk/apps/CppFCPLib/JobTicket.h
===================================================================
--- trunk/apps/CppFCPLib/JobTicket.h 2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/JobTicket.h 2007-07-27 01:31:46 UTC (rev 14369)
@@ -116,19 +116,24 @@
class GetJob : public JobTicket {
public:
typedef boost::shared_ptr<GetJob> Ptr;
+ enum ReturnType { Direct, Disk, None };
private:
+ ReturnType retType;
std::ostream *stream;
GetJob()
: JobTicket(),
+ retType(Direct),
stream(NULL)
{}
static Ptr factory(std::string id, Message::Ptr cmd);
GetJob& setStream( std::ostream *s ) { stream = s; return *this; }
+ GetJob& setReturnType( ReturnType r ) { retType = r; return *this; }
public:
~GetJob() { if (stream != NULL) delete stream; }
std::ostream& getStream() { return *stream; }
+ ReturnType getReturnType() const { return retType; }
friend class Node;
friend class NodeThread;
Modified: trunk/apps/CppFCPLib/Node.cpp
===================================================================
--- trunk/apps/CppFCPLib/Node.cpp 2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/Node.cpp 2007-07-27 01:31:46 UTC (rev 14369)
@@ -630,7 +630,7 @@
}
}
-JobTicket::Ptr
+GetJob::Ptr
Node::getDisk(const std::string URI, const std::string filename, const std::string id, const AdditionalFields& fields )
{
std::string identifier = id == "" ? _getUniqueId() : id;
@@ -669,8 +669,9 @@
m->setField("Filename", filename);
if (fields.hasField("TempFilename")) m->setField("TempFilename", fields.getField("TempFilename"));
- JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
+ GetJob::Ptr job = GetJob::factory( m->getField("Identifier"), m );
job->setGlobal( global ).setPersistent( persistent );
+ job->setReturnType( GetJob::Disk );
clientReqQueue->put(job);
@@ -686,7 +687,7 @@
// TODO :: implement fallback
}
-JobTicket::Ptr
+GetJob::Ptr
Node::fetchData(const std::string URI, const std::string id, const AdditionalFields& fields)
{
std::string identifier = id == "" ? _getUniqueId() : id;
@@ -716,14 +717,58 @@
if (fields.hasField("BinaryBlob")) m->setField("BinaryBlob", fields.getField("BinaryBlob"));
if (fields.hasField("AllowedMIMETypes")) m->setField("AllowedMIMETypes", fields.getField("AllowedMIMETypes"));
- JobTicket::Ptr job = JobTicket::factory( m->getField("Identifier"), m );
+ GetJob::Ptr job = GetJob::factory( m->getField("Identifier"), m );
job->setGlobal( global ).setPersistent( persistent );
+ job->setReturnType( GetJob::None );
clientReqQueue->put(job);
return job;
}
+GetJob::Ptr
+Node::getDirect(const std::string URI, const std::string id, std::ostream* stream, const AdditionalFields& fields)
+{
+ // TODO:: implement for persistent
+ std::string identifier = id == "" ? _getUniqueId() : id;
+ bool persistent = fields.hasField("Persistence") && fields.getField("Persistence") != "connection";
+ bool global = fields.hasField("Global") && fields.getField("Global") == "true";
+ if (global && !persistent)
+ throw std::invalid_argument("Global requests must be persistent");
+ if (stream == NULL && !persistent)
+ throw std::invalid_argument("You must specify stream when strarting Get with Persistence=connection");
+
+ Message::Ptr m = Message::factory( std::string("ClientGet") );
+
+ m->setField("URI", URI);
+ m->setField("Identifier", identifier);
+ if (fields.hasField("IgnoreDS")) m->setField("IgnoreDS", fields.getField("IgnoreDS"));
+ if (fields.hasField("DSonly")) m->setField("DSonly", fields.getField("DSonly"));
+ if (fields.hasField("Verbosity")) m->setField("Verbosity", fields.getField("Verbosity"));
+ if (fields.hasField("MaxSize")) m->setField("MaxSize", fields.getField("MaxSize"));
+ if (fields.hasField("MaxTempSize")) m->setField("MaxTempSize", fields.getField("MaxTempSize"));
+ if (fields.hasField("MaxRetries")) m->setField("MaxRetries", fields.getField("MaxRetries"));
+ if (fields.hasField("PriorityClass")) m->setField("PriorityClass", fields.getField("PriorityClass"));
+ if (fields.hasField("Persistence"))
+ m->setField("Persistence", fields.getField("Persistence"));
+ else
+ m->setField("Persistence", "connection");
+ if (fields.hasField("ClientToken")) m->setField("ClientToken", fields.getField("ClientToken"));
+ m->setField("Global", Converter::toString(global));
+ m->setField("ReturnType", "direct");
+ if (fields.hasField("BinaryBlob")) m->setField("BinaryBlob", fields.getField("BinaryBlob"));
+ if (fields.hasField("AllowedMIMETypes")) m->setField("AllowedMIMETypes", fields.getField("AllowedMIMETypes"));
+
+ GetJob::Ptr job = GetJob::factory( m->getField("Identifier"), m );
+ job->setGlobal( global ).setPersistent( persistent );
+ job->setReturnType( GetJob::Direct ).setStream( stream );
+
+ clientReqQueue->put(job);
+
+ return job;
+}
+
+
JobTicket::Ptr
Node::subscribeUSK(const std::string URI, const std::string id, bool dontPoll)
{
Modified: trunk/apps/CppFCPLib/Node.h
===================================================================
--- trunk/apps/CppFCPLib/Node.h 2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/Node.h 2007-07-27 01:31:46 UTC (rev 14369)
@@ -88,17 +88,22 @@
const AdditionalFields& = AdditionalFields()
);
- JobTicket::Ptr getDisk(const std::string , // URI
- const std::string , // Filename
- const std::string = "", // Identifier
- const AdditionalFields& = AdditionalFields()
- );
+ GetJob::Ptr getDisk(const std::string , // URI
+ const std::string , // Filename
+ const std::string = "", // Identifier
+ const AdditionalFields& = AdditionalFields()
+ );
- JobTicket::Ptr fetchData(const std::string , // URI
- const std::string = "", // Identifier
- const AdditionalFields& = AdditionalFields()
- );
+ GetJob::Ptr fetchData(const std::string , // URI
+ const std::string = "", // Identifier
+ const AdditionalFields& = AdditionalFields()
+ );
+ GetJob::Ptr getDirect(const std::string , // URI
+ const std::string = "", // Identifier
+ std::ostream* = NULL, // Stream
+ const AdditionalFields& = AdditionalFields()
+ );
JobTicket::Ptr subscribeUSK(const std::string, const std::string, bool);
Modified: trunk/apps/CppFCPLib/Server.cpp
===================================================================
--- trunk/apps/CppFCPLib/Server.cpp 2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/Server.cpp 2007-07-27 01:31:46 UTC (rev 14369)
@@ -63,3 +63,9 @@
bool Server::dataAvailable(){
return socket_->available() != 0 || response.size() != 0;
}
+
+void
+Server::read(boost::asio::mutable_buffers_1 buf)
+{
+ boost::asio::read(*socket_, buf);
+}
Modified: trunk/apps/CppFCPLib/Server.h
===================================================================
--- trunk/apps/CppFCPLib/Server.h 2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/Server.h 2007-07-27 01:31:46 UTC (rev 14369)
@@ -21,6 +21,7 @@
public:
~Server();
std::string readln();
+ void read(boost::asio::mutable_buffers_1);
void send(const std::string &s);
void send(Message::Ptr m);
bool dataAvailable();
Modified: trunk/apps/CppFCPLib/ServerMessage.cpp
===================================================================
--- trunk/apps/CppFCPLib/ServerMessage.cpp 2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/ServerMessage.cpp 2007-07-27 01:31:46 UTC (rev 14369)
@@ -192,6 +192,18 @@
return true;
}
+bool
+IsLastDataFound::operator()(const JobTicketPtr job) const
+{
+ GetJob::Ptr job_ = boost::dynamic_pointer_cast<GetJob, JobTicket>( job );
+
+ if ( job_->getReturnType() != GetJob::Direct )
+ return true;
+
+ // non persistent job needs to ask for AllData
+ return job_->isPersistent();
+}
+
void
AllDataMessage::read(boost::shared_ptr<Server> s)
{
@@ -214,3 +226,23 @@
bytesToRead = boost::lexical_cast<int>( message->getField("DataLength") );
log().log(DETAIL, " ... " + message->getField("DataLength") + " bytes of data ...");
}
+
+bool
+AllDataMessage::isLast(const JobTicketPtr job) const
+{
+ GetJob::Ptr job_ = boost::dynamic_pointer_cast<GetJob, JobTicket>( job );
+
+ std::ostream& stream = job_->getStream();
+ char buf[1024];
+
+ int tmp = bytesToRead;
+ while (tmp > 0) {
+ int m = std::min<int>(tmp, 1024);
+ server->read(boost::asio::buffer(buf, m));
+ log().log(DEBUG, "NODE: read "+ boost::lexical_cast<string>( m ) + " bytes of data\n");
+ stream.write(buf, m);
+ tmp -= m;
+ }
+
+ return true;
+}
Modified: trunk/apps/CppFCPLib/ServerMessage.h
===================================================================
--- trunk/apps/CppFCPLib/ServerMessage.h 2007-07-27 00:00:57 UTC (rev 14368)
+++ trunk/apps/CppFCPLib/ServerMessage.h 2007-07-27 01:31:46 UTC (rev 14369)
@@ -73,6 +73,11 @@
bool operator()(const JobTicketPtr job) const;
};
+struct IsLastDataFound {
+ Message::Ptr message;
+ IsLastDataFound( Message::Ptr m ) : message(m) {}
+ bool operator()(const JobTicketPtr job) const;
+};
template<typename isLastT = IsLastTrue, bool isErrorT = false>
class ServerMessageT : public ServerMessage {
@@ -96,7 +101,7 @@
AllDataMessage() {}
public:
- bool isLast(const JobTicketPtr job) const { return true; }
+ bool isLast(const JobTicketPtr job) const;
bool isError() const { return false; }
friend class ServerMessage;
};
More information about the Cppfcplib
mailing list