Add OnUnloadModule hook to MySQL

git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@12632 e03df62e-2008-0410-955e-edbf42e46eb7
This commit is contained in:
danieldg 2010-03-13 16:35:07 +00:00
parent b822830c5b
commit a5dc76ec96

View File

@ -67,16 +67,23 @@ class SQLConnection;
class MySQLresult; class MySQLresult;
class DispatcherThread; class DispatcherThread;
struct QueueItem struct QQueueItem
{ {
SQLQuery* q; SQLQuery* q;
SQLConnection* c; SQLConnection* c;
QueueItem(SQLQuery* Q, SQLConnection* C) : q(Q), c(C) {} QQueueItem(SQLQuery* Q, SQLConnection* C) : q(Q), c(C) {}
};
struct RQueueItem
{
SQLQuery* q;
MySQLresult* r;
RQueueItem(SQLQuery* Q, MySQLresult* R) : q(Q), r(R) {}
}; };
typedef std::map<std::string, SQLConnection*> ConnMap; typedef std::map<std::string, SQLConnection*> ConnMap;
typedef std::deque<QueueItem> QueryQueue; typedef std::deque<QQueueItem> QueryQueue;
typedef std::deque<MySQLresult*> ResultQueue; typedef std::deque<RQueueItem> ResultQueue;
/** MySQL module /** MySQL module
* */ * */
@ -84,14 +91,15 @@ class ModuleSQL : public Module
{ {
public: public:
DispatcherThread* Dispatcher; DispatcherThread* Dispatcher;
QueryQueue qq; QueryQueue qq; // MUST HOLD MUTEX
ResultQueue rq; ResultQueue rq; // MUST HOLD MUTEX
ConnMap connections; ConnMap connections; // main thread only
ModuleSQL(); ModuleSQL();
void init(); void init();
~ModuleSQL(); ~ModuleSQL();
void OnRehash(User* user); void OnRehash(User* user);
void OnUnloadModule(Module* mod);
Version GetVersion(); Version GetVersion();
}; };
@ -115,14 +123,13 @@ class DispatcherThread : public SocketThread
class MySQLresult : public SQLResult class MySQLresult : public SQLResult
{ {
public: public:
SQLQuery* query;
SQLerror err; SQLerror err;
int currentrow; int currentrow;
int rows; int rows;
std::vector<std::string> colnames; std::vector<std::string> colnames;
std::vector<SQLEntries> fieldlists; std::vector<SQLEntries> fieldlists;
MySQLresult(SQLQuery* q, MYSQL_RES* res, int affected_rows) : query(q), err(SQL_NO_ERROR), currentrow(0), rows(0) MySQLresult(MYSQL_RES* res, int affected_rows) : err(SQL_NO_ERROR), currentrow(0), rows(0)
{ {
if (affected_rows >= 1) if (affected_rows >= 1)
{ {
@ -166,7 +173,7 @@ class MySQLresult : public SQLResult
} }
} }
MySQLresult(SQLQuery* q, SQLerror& e) : query(q), err(e) MySQLresult(SQLerror& e) : err(e)
{ {
} }
@ -217,11 +224,11 @@ class SQLConnection : public SQLProvider
public: public:
reference<ConfigTag> config; reference<ConfigTag> config;
MYSQL *connection; MYSQL *connection;
bool active; Mutex lock;
// This constructor creates an SQLConnection object with the given credentials, but does not connect yet. // This constructor creates an SQLConnection object with the given credentials, but does not connect yet.
SQLConnection(Module* p, ConfigTag* tag) : SQLProvider(p, "SQL/" + tag->getString("id")), SQLConnection(Module* p, ConfigTag* tag) : SQLProvider(p, "SQL/" + tag->getString("id")),
config(tag), active(false) config(tag)
{ {
} }
@ -310,30 +317,23 @@ class SQLConnection : public SQLProvider
return (ModuleSQL*)(Module*)creator; return (ModuleSQL*)(Module*)creator;
} }
void DoBlockingQuery(SQLQuery* req) MySQLresult* DoBlockingQuery(SQLQuery* req)
{ {
/* Parse the command string and dispatch it to mysql */ /* Parse the command string and dispatch it to mysql */
if (CheckConnection() && !mysql_real_query(connection, req->query.data(), req->query.length())) if (CheckConnection() && !mysql_real_query(connection, req->query.data(), req->query.length()))
{ {
/* Successfull query */ /* Successfull query */
MYSQL_RES* res = mysql_use_result(connection); MYSQL_RES* res = mysql_use_result(connection);
unsigned long rows = mysql_affected_rows(connection); unsigned long rows = mysql_affected_rows(connection);
MySQLresult* r = new MySQLresult(req, res, rows); return new MySQLresult(res, rows);
Parent()->Dispatcher->LockQueue();
Parent()->rq.push_back(r);
Parent()->Dispatcher->NotifyParent();
Parent()->Dispatcher->UnlockQueue();
} }
else else
{ {
/* XXX: See /usr/include/mysql/mysqld_error.h for a list of /* XXX: See /usr/include/mysql/mysqld_error.h for a list of
* possible error numbers and error messages */ * possible error numbers and error messages */
SQLerror e(SQL_QREPLY_FAIL, ConvToStr(mysql_errno(connection)) + std::string(": ") + mysql_error(connection)); SQLerror e(SQL_QREPLY_FAIL, ConvToStr(mysql_errno(connection)) + std::string(": ") + mysql_error(connection));
MySQLresult* r = new MySQLresult(req, e); return new MySQLresult(e);
Parent()->Dispatcher->LockQueue();
Parent()->rq.push_back(r);
Parent()->Dispatcher->NotifyParent();
Parent()->Dispatcher->UnlockQueue();
} }
} }
@ -359,7 +359,7 @@ class SQLConnection : public SQLProvider
void submit(SQLQuery* q) void submit(SQLQuery* q)
{ {
Parent()->Dispatcher->LockQueue(); Parent()->Dispatcher->LockQueue();
Parent()->qq.push_back(QueueItem(q, this)); Parent()->qq.push_back(QQueueItem(q, this));
Parent()->Dispatcher->UnlockQueueWakeup(); Parent()->Dispatcher->UnlockQueueWakeup();
} }
}; };
@ -374,8 +374,8 @@ void ModuleSQL::init()
Dispatcher = new DispatcherThread(this); Dispatcher = new DispatcherThread(this);
ServerInstance->Threads->Start(Dispatcher); ServerInstance->Threads->Start(Dispatcher);
Implementation eventlist[] = { I_OnRehash }; Implementation eventlist[] = { I_OnRehash, I_OnUnloadModule };
ServerInstance->Modules->Attach(eventlist, this, 1); ServerInstance->Modules->Attach(eventlist, this, 2);
} }
ModuleSQL::~ModuleSQL() ModuleSQL::~ModuleSQL()
@ -394,7 +394,6 @@ ModuleSQL::~ModuleSQL()
void ModuleSQL::OnRehash(User* user) void ModuleSQL::OnRehash(User* user)
{ {
Dispatcher->LockQueue();
ConnMap conns; ConnMap conns;
ConfigTagList tags = ServerInstance->Config->ConfTags("database"); ConfigTagList tags = ServerInstance->Config->ConfTags("database");
for(ConfigIter i = tags.first; i != tags.second; i++) for(ConfigIter i = tags.first; i != tags.second; i++)
@ -415,21 +414,56 @@ void ModuleSQL::OnRehash(User* user)
connections.erase(curr); connections.erase(curr);
} }
} }
// now clean up the deleted databases
Dispatcher->LockQueue();
SQLerror err(SQL_BAD_DBID);
for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++) for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
{ {
if (i->second->active) ServerInstance->Modules->DelService(*i->second);
// it might be running a query on this database. Wait for that to complete
i->second->lock.Lock();
i->second->lock.Unlock();
// now remove all active queries to this DB
for(unsigned int j = qq.size() - 1; j >= 0; j--)
{ {
// can't delete it now. Next rehash will try to kill it again if (qq[j].c == i->second)
conns.insert(*i); {
qq[j].q->OnError(err);
delete qq[j].q;
qq.erase(qq.begin() + j);
}
} }
else // finally, nuke the connection
delete i->second;
}
Dispatcher->UnlockQueue();
connections.swap(conns);
}
void ModuleSQL::OnUnloadModule(Module* mod)
{
SQLerror err(SQL_BAD_DBID);
Dispatcher->LockQueue();
for(unsigned int i = qq.size() - 1; i >= 0; i--)
{
if (qq[i].q->creator == mod)
{ {
ServerInstance->Modules->DelService(*i->second); if (i == 0)
delete i->second; {
// need to wait until the query is done
// (the result will be discarded)
qq[i].c->lock.Lock();
qq[i].c->lock.Unlock();
}
qq[i].q->OnError(err);
delete qq[i].q;
qq.erase(qq.begin() + i);
} }
} }
connections.swap(conns);
Dispatcher->UnlockQueue(); Dispatcher->UnlockQueue();
// clean up any result queue entries
Dispatcher->OnNotify();
} }
Version ModuleSQL::GetVersion() Version ModuleSQL::GetVersion()
@ -444,13 +478,30 @@ void DispatcherThread::Run()
{ {
if (!Parent->qq.empty()) if (!Parent->qq.empty())
{ {
QueueItem i = Parent->qq.front(); QQueueItem i = Parent->qq.front();
Parent->qq.pop_front(); i.c->lock.Lock();
i.c->active = true;
this->UnlockQueue(); this->UnlockQueue();
i.c->DoBlockingQuery(i.q); MySQLresult* res = i.c->DoBlockingQuery(i.q);
i.c->lock.Unlock();
/*
* At this point, the main thread could be working on:
* Rehash - delete i.c out from under us. We don't care about that.
* UnloadModule - delete i.q and the qq item. Need to avoid reporting results.
*/
this->LockQueue(); this->LockQueue();
i.c->active = false; if (Parent->qq.front().q == i.q)
{
Parent->qq.pop_front();
Parent->rq.push_back(RQueueItem(i.q, res));
NotifyParent();
}
else
{
// UnloadModule ate the query
delete res;
}
} }
else else
{ {
@ -469,13 +520,13 @@ void DispatcherThread::OnNotify()
this->LockQueue(); this->LockQueue();
for(ResultQueue::iterator i = Parent->rq.begin(); i != Parent->rq.end(); i++) for(ResultQueue::iterator i = Parent->rq.begin(); i != Parent->rq.end(); i++)
{ {
MySQLresult* res = *i; MySQLresult* res = i->r;
if (res->err.id == SQL_NO_ERROR) if (res->err.id == SQL_NO_ERROR)
res->query->OnResult(*res); i->q->OnResult(*res);
else else
res->query->OnError(res->err); i->q->OnError(res->err);
delete res->query; delete i->q;
delete res; delete i->r;
} }
Parent->rq.clear(); Parent->rq.clear();
this->UnlockQueue(); this->UnlockQueue();