More stuff for m_pgsql in, provider-side API stuff semi-done

Add m_sqlv2 header for the new API

git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@4122 e03df62e-2008-0410-955e-edbf42e46eb7
This commit is contained in:
om 2006-07-07 13:36:11 +00:00
parent 56ded38a45
commit 598aedf098
2 changed files with 225 additions and 45 deletions

View File

@ -17,6 +17,7 @@
#include <sstream>
#include <string>
#include <deque>
#include <map>
#include <libpq-fe.h>
@ -56,14 +57,68 @@ typedef std::map<std::string, SQLConn*> ConnMap;
*/
enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE };
class SQLerror
inline std::string pop_front_r(std::deque<std::string> &d)
{
public:
std::string err;
std::string r = d.front();
d.pop_front();
return r;
SQLerror(const std::string &s)
: err(s)
}
/** QueryQueue, a queue of queries waiting to be executed.
* This maintains two queues internally, one for 'priority'
* queries and one for less important ones. Each queue has
* new queries appended to it and ones to execute are popped
* off the front. This keeps them flowing round nicely and no
* query should ever get 'stuck' for too long. If there are
* queries in the priority queue they will be executed first,
* 'unimportant' queries will only be executed when the
* priority queue is empty.
*/
class QueryQueue
{
private:
std::deque<std::string> priority; /* The priority queue */
std::deque<std::string> normal; /* The 'normal' queue */
public:
QueryQueue()
{
}
void push_back(const std::string &q, bool pri = false)
{
log(DEBUG, "QueryQueue::push_back(): Adding %s query to queue: %s", ((pri) ? "priority" : "non-priority"), q.c_str());
if(pri)
priority.push_back(q);
else
normal.push_back(q);
}
inline std::string pop_front()
{
std::string res;
if(priority.size())
{
return pop_front_r(priority);
}
else if(normal.size())
{
return pop_front_r(normal);
}
else
{
return "";
}
}
std::pair<int, int> size()
{
return std::make_pair(priority.size(), normal.size());
}
};
@ -88,6 +143,7 @@ private:
SQLstatus status; /* PgSQL database connection status */
public:
QueryQueue queue; /* Queue of queries waiting to be executed on this connection */
/* This class should only ever be created inside this module, using this constructor, so we don't have to worry about the default ones */
@ -131,8 +187,6 @@ public:
throw ModuleException("Connect failed");
}
}
exit(-1);
}
~SQLConn()
@ -171,8 +225,6 @@ public:
log(DEBUG, "No result for lookup yet!");
return true;
}
exit(-1);
}
bool DoConnect()
@ -249,6 +301,7 @@ public:
{
case PGRES_POLLING_WRITING:
log(DEBUG, "PGconnectPoll: PGRES_POLLING_WRITING");
WantWrite();
status = CWRITE;
DoPoll();
break;
@ -263,7 +316,6 @@ public:
case PGRES_POLLING_OK:
log(DEBUG, "PGconnectPoll: PGRES_POLLING_OK");
status = WWRITE;
Query("SELECT * FROM rawr");
break;
default:
log(DEBUG, "PGconnectPoll: wtf?");
@ -273,6 +325,43 @@ public:
return true;
}
bool ProcessData()
{
if(PQconsumeInput(sql))
{
log(DEBUG, "PQconsumeInput succeeded");
if(PQisBusy(sql))
{
log(DEBUG, "Still busy processing command though");
}
else
{
log(DEBUG, "Looks like we have a result to process!");
while(PGresult* result = PQgetResult(sql))
{
int cols = PQnfields(result);
log(DEBUG, "Got result! :D");
log(DEBUG, "%d rows, %d columns checking now what the column names are", PQntuples(result), cols);
for(int i = 0; i < cols; i++)
{
log(DEBUG, "Column name: %s (%d)", PQfname(result, i));
}
PQclear(result);
}
}
return true;
}
log(DEBUG, "PQconsumeInput failed: %s", PQerrorMessage(sql));
return false;
}
void ShowStatus()
{
switch(PQstatus(sql))
@ -313,6 +402,14 @@ public:
return DoEvent();
}
virtual bool OnWriteReady()
{
/* Always return true here, false would close the socket - we need to do that ourselves with the pgsql API */
log(DEBUG, "OnWriteReady(): status = %s", StatusStr());
return DoEvent();
}
virtual bool OnConnected()
{
@ -329,38 +426,21 @@ public:
}
else
{
if(PQconsumeInput(sql))
{
log(DEBUG, "PQconsumeInput succeeded");
if(PQisBusy(sql))
{
log(DEBUG, "Still busy processing command though");
}
else
{
log(DEBUG, "Looks like we have a result to process!");
while(PGresult* result = PQgetResult(sql))
{
int cols = PQnfields(result);
log(DEBUG, "Got result! :D");
log(DEBUG, "%d rows, %d columns checking now what the column names are", PQntuples(result), cols);
for(int i = 0; i < cols; i++)
{
log(DEBUG, "Column name: %s (%d)", PQfname(result, i));
}
PQclear(result);
}
}
}
else
{
log(DEBUG, "PQconsumeInput failed: %s", PQerrorMessage(sql));
}
ProcessData();
}
switch(PQflush(sql))
{
case -1:
log(DEBUG, "Error flushing write queue: %s", PQerrorMessage(sql));
break;
case 0:
log(DEBUG, "Successfully flushed write queue (or there was nothing to write)");
break;
case 1:
log(DEBUG, "Not all of the write queue written, triggering write event so we can have another go");
WantWrite();
break;
}
return true;
@ -436,7 +516,6 @@ public:
{
/* Unused, I think */
}
};
class ModulePgSQL : public Module
@ -451,13 +530,13 @@ public:
{
log(DEBUG, "%s 'SQL' feature", Srv->PublishFeature("SQL", this) ? "Published" : "Couldn't publish");
log(DEBUG, "%s 'PgSQL' feature", Srv->PublishFeature("PgSQL", this) ? "Published" : "Couldn't publish");
OnRehash("");
}
void Implements(char* List)
{
List[I_OnRehash] = List[I_OnUserRegister] = List[I_OnCheckReady] = List[I_OnUserDisconnect] = 1;
List[I_OnRequest] = List[I_OnRehash] = List[I_OnUserRegister] = List[I_OnCheckReady] = List[I_OnUserDisconnect] = 1;
}
virtual void OnRehash(const std::string &parameter)
@ -492,6 +571,34 @@ public:
connections.insert(std::make_pair(id, newconn));
}
}
virtual char* OnRequest(Request* request)
{
if(strcmp(SQLREQID, request->GetData()) == 0)
{
SQLrequest* req = (SQLrequest*)request;
ConnMap::iterator iter;
log(DEBUG, "Got query: '%s' on id '%s'", req->query.c_str(), req->dbid.c_str());
if((iter = connections.find(req->dbid)) != connections.end())
{
/* Execute query */
iter->second->queue.push_back(req->query, req->pri);
return SQLSUCCESS;
}
else
{
req->error.Id(BAD_DBID);
return NULL;
}
}
log(DEBUG, "Got unsupported API version string: %s", request->GetData());
return NULL;
}
virtual Version GetVersion()
{

View File

@ -0,0 +1,73 @@
#ifndef INSPIRCD_SQLAPI_2
#define INSPIRCD_SQLAPI_2
#define SQLREQID "SQLv2 Request"
#define SQLRESID "SQLv2 Result"
#define SQLSUCCESS "You shouldn't be reading this (success)"
#include <string>
#include "modules.h"
enum SQLerrorNum { BAD_DBID };
class SQLerror
{
SQLerrorNum id;
public:
SQLerror()
{
}
SQLerror(SQLerrorNum i)
: id(i)
{
}
void Id(SQLerrorNum i)
{
id = i;
}
const char* Str()
{
switch(id)
{
case BAD_DBID:
return "Invalid database ID";
default:
return "Unknown error";
}
}
};
class SQLrequest : public Request
{
public:
std::string query;
std::string dbid;
bool pri;
SQLerror error;
SQLrequest(Module* s, Module* d, const std::string &q, const std::string &id, bool p = false)
: Request(SQLREQID, s, d), query(q), dbid(id), pri(p)
{
}
};
class SQLresult : public Request
{
public:
SQLresult(Module* s, Module* d)
: Request(SQLRESID, s, d)
{
}
virtual int Rows()
{
return 0;
}
};
#endif