ThreadEngine: Allow interthread signaling without needing as many hacks

git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@11251 e03df62e-2008-0410-955e-edbf42e46eb7
This commit is contained in:
danieldg 2009-03-23 18:48:51 +00:00
parent f9e6de5284
commit 59dbcc1245
7 changed files with 453 additions and 359 deletions

View File

@ -30,6 +30,15 @@ class CoreExport Thread : public Extensible
/** Set to true when the thread is to exit /** Set to true when the thread is to exit
*/ */
bool ExitFlag; bool ExitFlag;
// TODO protected:
public:
/** Get thread's current exit status.
* (are we being asked to exit?)
*/
bool GetExitFlag()
{
return ExitFlag;
}
public: public:
/** Opaque thread state managed by threading engine /** Opaque thread state managed by threading engine
*/ */
@ -60,18 +69,108 @@ class CoreExport Thread : public Extensible
/** Signal the thread to exit gracefully. /** Signal the thread to exit gracefully.
*/ */
void SetExitFlag(bool value) virtual void SetExitFlag()
{ {
ExitFlag = value; ExitFlag = true;
}
};
class CoreExport QueuedThread : public Thread
{
ThreadQueueData queue;
protected:
/** Waits for an enqueue operation to complete
* You MUST hold the queue lock when you call this.
* It will be unlocked while you wait, and will be relocked
* before the function returns
*/
void WaitForQueue()
{
queue.Wait();
}
public:
/** Lock queue.
*/
void LockQueue()
{
queue.Lock();
}
/** Unlock queue.
*/
void UnlockQueue()
{
queue.Unlock();
}
/** Unlock queue and wake up worker
*/
void UnlockQueueWakeup()
{
queue.Wakeup();
queue.Unlock();
}
virtual void SetExitFlag()
{
queue.Lock();
Thread::SetExitFlag();
queue.Wakeup();
queue.Unlock();
}
};
class CoreExport SocketThread : public Thread
{
ThreadQueueData queue;
ThreadSignalData signal;
protected:
/** Waits for an enqueue operation to complete
* You MUST hold the queue lock when you call this.
* It will be unlocked while you wait, and will be relocked
* before the function returns
*/
void WaitForQueue()
{
queue.Wait();
}
/** Notifies parent by making the SignalFD ready to read
* No requirements on locking
*/
void NotifyParent();
public:
SocketThread(InspIRCd* SI);
virtual ~SocketThread();
/** Lock queue.
*/
void LockQueue()
{
queue.Lock();
}
/** Unlock queue.
*/
void UnlockQueue()
{
queue.Unlock();
}
/** Unlock queue and send wakeup to worker
*/
void UnlockQueueWakeup()
{
queue.Wakeup();
queue.Unlock();
}
virtual void SetExitFlag()
{
queue.Lock();
Thread::SetExitFlag();
queue.Wakeup();
queue.Unlock();
} }
/** Get thread's current exit status. /**
* (are we being asked to exit?) * Called in the context of the parent thread after a notification
* has passed through the socket
*/ */
bool GetExitFlag() virtual void OnNotify() = 0;
{
return ExitFlag;
}
}; };
#endif #endif

View File

@ -107,4 +107,50 @@ class CoreExport Mutex
} }
}; };
class ThreadQueueData
{
pthread_mutex_t mutex;
pthread_cond_t cond;
public:
ThreadQueueData()
{
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
}
~ThreadQueueData()
{
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}
void Lock()
{
pthread_mutex_lock(&mutex);
}
void Unlock()
{
pthread_mutex_unlock(&mutex);
}
void Wakeup()
{
pthread_cond_signal(&cond);
}
void Wait()
{
pthread_cond_wait(&cond, &mutex);
}
};
class ThreadSignalSocket;
class ThreadSignalData
{
public:
ThreadSignalSocket* sock;
};
#endif #endif

View File

@ -95,5 +95,54 @@ class CoreExport Mutex
} }
}; };
class ThreadQueueData
{
CRITICAL_SECTION mutex;
HANDLE event;
public:
ThreadQueueData()
{
InitializeCriticalSection(&mutex);
event = CreateEvent(NULL, false, false, NULL);
}
~ThreadQueueData()
{
DeleteCriticalSection(&mutex);
}
void Lock()
{
EnterCriticalSection(&mutex);
}
void Unlock()
{
LeaveCriticalSection(&mutex);
}
void Wakeup()
{
PulseEvent(event);
}
void Wait()
{
LeaveCriticalSection(&mutex);
WaitForSingleObject(event, INFINITE);
EnterCriticalSection(&mutex);
}
};
class ThreadSignalData
{
public:
int connFD;
ThreadSignalData()
{
connFD = -1;
}
};
#endif #endif

View File

@ -27,8 +27,6 @@
class SQLConn; class SQLConn;
class MsSQLResult; class MsSQLResult;
class ResultNotifier;
class MsSQLListener;
class ModuleMsSQL; class ModuleMsSQL;
typedef std::map<std::string, SQLConn*> ConnMap; typedef std::map<std::string, SQLConn*> ConnMap;
@ -45,86 +43,25 @@ unsigned long count(const char * const str, char a)
return n; return n;
} }
ResultNotifier* notifier = NULL;
MsSQLListener* listener = NULL;
int QueueFD = -1;
ConnMap connections; ConnMap connections;
Mutex* QueueMutex;
Mutex* ResultsMutex; Mutex* ResultsMutex;
Mutex* LoggingMutex; Mutex* LoggingMutex;
class QueryThread : public Thread class QueryThread : public SocketThread
{ {
private: private:
ModuleMsSQL* Parent; ModuleMsSQL* Parent;
InspIRCd* ServerInstance; InspIRCd* ServerInstance;
public: public:
QueryThread(InspIRCd* si, ModuleMsSQL* mod) QueryThread(InspIRCd* si, ModuleMsSQL* mod)
: Thread(), Parent(mod), ServerInstance(si) : SocketThread(si), Parent(mod), ServerInstance(si)
{ {
} }
~QueryThread() { } ~QueryThread() { }
virtual void Run(); virtual void Run();
virtual void OnNotify();
}; };
class ResultNotifier : public BufferedSocket
{
ModuleMsSQL* mod;
public:
ResultNotifier(ModuleMsSQL* m, InspIRCd* SI, int newfd, char* ip) : BufferedSocket(SI, newfd, ip), mod(m)
{
}
virtual bool OnDataReady()
{
char data = 0;
if (ServerInstance->SE->Recv(this, &data, 1, 0) > 0)
{
Dispatch();
return true;
}
return false;
}
void Dispatch();
};
class MsSQLListener : public ListenSocketBase
{
ModuleMsSQL* Parent;
irc::sockets::insp_sockaddr sock_us;
socklen_t uslen;
FileReader* index;
public:
MsSQLListener(ModuleMsSQL* P, InspIRCd* Instance, int port, const std::string &addr) : ListenSocketBase(Instance, port, addr), Parent(P)
{
uslen = sizeof(sock_us);
if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen))
{
throw ModuleException("Could not getsockname() to find out port number for ITC port");
}
}
virtual void OnAcceptReady(const std::string &ipconnectedto, int nfd, const std::string &incomingip)
{
new ResultNotifier(this->Parent, this->ServerInstance, nfd, (char *)ipconnectedto.c_str()); // XXX unsafe casts suck
}
/* Using getsockname and ntohs, we can determine which port number we were allocated */
int GetPort()
{
#ifdef IPV6
return ntohs(sock_us.sin6_port);
#else
return ntohs(sock_us.sin_port);
#endif
}
};
class MsSQLResult : public SQLresult class MsSQLResult : public SQLresult
{ {
private: private:
@ -289,8 +226,6 @@ class MsSQLResult : public SQLresult
{ {
delete fl; delete fl;
} }
}; };
class SQLConn : public classbase class SQLConn : public classbase
@ -569,7 +504,6 @@ class SQLConn : public classbase
ResultsMutex->Lock(); ResultsMutex->Lock();
results.push_back(res); results.push_back(res);
ResultsMutex->Unlock(); ResultsMutex->Unlock();
SendNotify();
return SQLerror(); return SQLerror();
} }
@ -693,40 +627,6 @@ class SQLConn : public classbase
} }
} }
void SendNotify()
{
if (QueueFD < 0)
{
if ((QueueFD = socket(AF_FAMILY, SOCK_STREAM, 0)) == -1)
{
/* crap, we're out of sockets... */
return;
}
irc::sockets::insp_sockaddr addr;
#ifdef IPV6
irc::sockets::insp_aton("::1", &addr.sin6_addr);
addr.sin6_family = AF_FAMILY;
addr.sin6_port = htons(listener->GetPort());
#else
irc::sockets::insp_inaddr ia;
irc::sockets::insp_aton("127.0.0.1", &ia);
addr.sin_family = AF_FAMILY;
addr.sin_addr = ia;
addr.sin_port = htons(listener->GetPort());
#endif
if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1)
{
/* wtf, we cant connect to it, but we just created it! */
return;
}
}
char id = 0;
send(QueueFD, &id, 1, 0);
}
void DoLeadingQuery() void DoLeadingQuery()
{ {
SQLrequest& req = queue.front(); SQLrequest& req = queue.front();
@ -748,7 +648,6 @@ class ModuleMsSQL : public Module
{ {
LoggingMutex = new Mutex(); LoggingMutex = new Mutex();
ResultsMutex = new Mutex(); ResultsMutex = new Mutex();
QueueMutex = new Mutex();
ServerInstance->Modules->UseInterface("SQLutils"); ServerInstance->Modules->UseInterface("SQLutils");
@ -757,25 +656,6 @@ class ModuleMsSQL : public Module
throw ModuleException("m_mssql: Unable to publish feature 'SQL'"); throw ModuleException("m_mssql: Unable to publish feature 'SQL'");
} }
/* Create a socket on a random port. Let the tcp stack allocate us an available port */
#ifdef IPV6
listener = new MsSQLListener(this, ServerInstance, 0, "::1");
#else
listener = new MsSQLListener(this, ServerInstance, 0, "127.0.0.1");
#endif
if (listener->GetFd() == -1)
{
ServerInstance->Modules->DoneWithInterface("SQLutils");
throw ModuleException("m_mssql: unable to create ITC pipe");
}
else
{
LoggingMutex->Lock();
ServerInstance->Logs->Log("m_mssql", DEBUG, "MsSQL: Interthread comms port is %d", listener->GetPort());
LoggingMutex->Unlock();
}
ReadConf(); ReadConf();
queryDispatcher = new QueryThread(ServerInstance, this); queryDispatcher = new QueryThread(ServerInstance, this);
@ -792,32 +672,14 @@ class ModuleMsSQL : public Module
ClearQueue(); ClearQueue();
ClearAllConnections(); ClearAllConnections();
ServerInstance->SE->DelFd(listener);
ServerInstance->BufferedSocketCull();
if (QueueFD >= 0)
{
shutdown(QueueFD, 2);
close(QueueFD);
}
if (notifier)
{
ServerInstance->SE->DelFd(notifier);
notifier->Close();
ServerInstance->BufferedSocketCull();
}
ServerInstance->Modules->UnpublishInterface("SQL", this); ServerInstance->Modules->UnpublishInterface("SQL", this);
ServerInstance->Modules->UnpublishFeature("SQL"); ServerInstance->Modules->UnpublishFeature("SQL");
ServerInstance->Modules->DoneWithInterface("SQLutils"); ServerInstance->Modules->DoneWithInterface("SQLutils");
delete LoggingMutex; delete LoggingMutex;
delete ResultsMutex; delete ResultsMutex;
delete QueueMutex;
} }
void SendQueue() void SendQueue()
{ {
for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++) for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++)
@ -929,9 +791,9 @@ class ModuleMsSQL : public Module
virtual void OnRehash(User* user, const std::string &parameter) virtual void OnRehash(User* user, const std::string &parameter)
{ {
QueueMutex->Lock(); queryDispatcher->LockQueue();
ReadConf(); ReadConf();
QueueMutex->Unlock(); queryDispatcher->UnlockQueueWakeup();
} }
virtual const char* OnRequest(Request* request) virtual const char* OnRequest(Request* request)
@ -940,7 +802,7 @@ class ModuleMsSQL : public Module
{ {
SQLrequest* req = (SQLrequest*)request; SQLrequest* req = (SQLrequest*)request;
QueueMutex->Lock(); queryDispatcher->LockQueue();
ConnMap::iterator iter; ConnMap::iterator iter;
@ -957,7 +819,7 @@ class ModuleMsSQL : public Module
req->error.Id(SQL_BAD_DBID); req->error.Id(SQL_BAD_DBID);
} }
QueueMutex->Unlock(); queryDispatcher->UnlockQueueWakeup();
return returnval; return returnval;
} }
@ -979,17 +841,17 @@ class ModuleMsSQL : public Module
}; };
void ResultNotifier::Dispatch() void QueryThread::OnNotify()
{ {
mod->SendQueue(); mod->SendQueue();
} }
void QueryThread::Run() void QueryThread::Run()
{ {
this->LockQueue();
while (this->GetExitFlag() == false) while (this->GetExitFlag() == false)
{ {
SQLConn* conn = NULL; SQLConn* conn = NULL;
QueueMutex->Lock();
for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++) for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
{ {
if (i->second->queue.totalsize()) if (i->second->queue.totalsize())
@ -998,16 +860,20 @@ void QueryThread::Run()
break; break;
} }
} }
QueueMutex->Unlock();
if (conn) if (conn)
{ {
this->UnlockQueue();
conn->DoLeadingQuery(); conn->DoLeadingQuery();
QueueMutex->Lock(); this->NotifyParent();
this->LockQueue();
conn->queue.pop(); conn->queue.pop();
QueueMutex->Unlock();
} }
usleep(1000); else
{
this->WaitForQueue();
}
} }
this->UnlockQueue();
} }
MODULE_INIT(ModuleMsSQL) MODULE_INIT(ModuleMsSQL)

View File

@ -65,14 +65,10 @@
class SQLConnection; class SQLConnection;
class MySQLListener; class DispatcherThread;
typedef std::map<std::string, SQLConnection*> ConnMap; typedef std::map<std::string, SQLConnection*> ConnMap;
static MySQLListener *MessagePipe = NULL; typedef std::deque<SQLresult*> ResultQueue;
int QueueFD = -1;
class DispatcherThread;
unsigned long count(const char * const str, char a) unsigned long count(const char * const str, char a)
{ {
@ -97,7 +93,6 @@ class ModuleSQL : public Module
int currid; int currid;
bool rehashing; bool rehashing;
DispatcherThread* Dispatcher; DispatcherThread* Dispatcher;
Mutex QueueMutex;
Mutex ResultsMutex; Mutex ResultsMutex;
Mutex LoggingMutex; Mutex LoggingMutex;
Mutex ConnMutex; Mutex ConnMutex;
@ -111,13 +106,10 @@ class ModuleSQL : public Module
}; };
#if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224 #if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224
#define mysql_field_count mysql_num_fields #define mysql_field_count mysql_num_fields
#endif #endif
typedef std::deque<SQLresult*> ResultQueue;
/** Represents a mysql result set /** Represents a mysql result set
*/ */
class MySQLresult : public SQLresult class MySQLresult : public SQLresult
@ -301,10 +293,6 @@ class MySQLresult : public SQLresult
} }
}; };
class SQLConnection;
void NotifyMainThread(SQLConnection* connection_with_new_result);
/** Represents a connection to a mysql database /** Represents a connection to a mysql database
*/ */
class SQLConnection : public classbase class SQLConnection : public classbase
@ -452,9 +440,7 @@ class SQLConnection : public classbase
*queryend = 0; *queryend = 0;
Parent->QueueMutex.Lock();
req.query.q = query; req.query.q = query;
Parent->QueueMutex.Unlock();
if (!mysql_real_query(connection, req.query.q.data(), req.query.q.length())) if (!mysql_real_query(connection, req.query.q.data(), req.query.q.length()))
{ {
@ -485,13 +471,7 @@ class SQLConnection : public classbase
Parent->ResultsMutex.Unlock(); Parent->ResultsMutex.Unlock();
} }
/* Now signal the main thread that we've got a result to process.
* Pass them this connection id as what to examine
*/
delete[] query; delete[] query;
NotifyMainThread(this);
} }
bool ConnectionLost() bool ConnectionLost()
@ -675,123 +655,18 @@ ConnMap::iterator GetCharId(char id)
return Connections.end(); return Connections.end();
} }
void NotifyMainThread(SQLConnection* connection_with_new_result)
{
/* Here we write() to the socket the main thread has open
* and we connect()ed back to before our thread became active.
* The main thread is using a nonblocking socket tied into
* the socket engine, so they wont block and they'll receive
* nearly instant notification. Because we're in a seperate
* thread, we can just use standard connect(), and we can
* block if we like. We just send the connection id of the
* connection back.
*
* NOTE: We only send a single char down the connection, this
* way we know it wont get a partial read at the other end if
* the system is especially congested (see bug #263).
* The function FindCharId translates a connection name into a
* one character id, and GetCharId translates a character id
* back into an iterator.
*/
char id = FindCharId(connection_with_new_result->GetID());
send(QueueFD, &id, 1, 0);
}
class ModuleSQL; class ModuleSQL;
class DispatcherThread : public Thread class DispatcherThread : public SocketThread
{ {
private: private:
ModuleSQL* Parent; ModuleSQL* Parent;
InspIRCd* ServerInstance; InspIRCd* ServerInstance;
public: public:
DispatcherThread(InspIRCd* Instance, ModuleSQL* CreatorModule) : Thread(), Parent(CreatorModule), ServerInstance(Instance) { } DispatcherThread(InspIRCd* Instance, ModuleSQL* CreatorModule) : SocketThread(Instance), Parent(CreatorModule), ServerInstance(Instance) { }
~DispatcherThread() { } ~DispatcherThread() { }
virtual void Run(); virtual void Run();
}; virtual void OnNotify();
/** Used by m_mysql to notify one thread when the other has a result
*/
class Notifier : public BufferedSocket
{
ModuleSQL* Parent;
public:
Notifier(ModuleSQL* P, InspIRCd* SI, int newfd, char* ip) : BufferedSocket(SI, newfd, ip), Parent(P) { }
virtual bool OnDataReady()
{
char data = 0;
/* NOTE: Only a single character is read so we know we
* cant get a partial read. (We've been told that theres
* data waiting, so we wont ever get EAGAIN)
* The function GetCharId translates a single character
* back into an iterator.
*/
if (ServerInstance->SE->Recv(this, &data, 1, 0) > 0)
{
Parent->ConnMutex.Lock();
ConnMap::iterator iter = GetCharId(data);
Parent->ConnMutex.Unlock();
if (iter != Connections.end())
{
Parent->ResultsMutex.Lock();
ResultQueue::iterator n = iter->second->rq.begin();
Parent->ResultsMutex.Unlock();
(*n)->Send();
delete (*n);
Parent->ResultsMutex.Lock();
iter->second->rq.pop_front();
Parent->ResultsMutex.Unlock();
return true;
}
/* No error, but unknown id */
return true;
}
/* Erk, error on descriptor! */
return false;
}
};
/** Spawn sockets from a listener
*/
class MySQLListener : public ListenSocketBase
{
ModuleSQL* Parent;
irc::sockets::insp_sockaddr sock_us;
socklen_t uslen;
FileReader* index;
public:
MySQLListener(ModuleSQL* P, InspIRCd* Instance, int port, const std::string &addr) : ListenSocketBase(Instance, port, addr), Parent(P)
{
uslen = sizeof(sock_us);
if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen))
{
throw ModuleException("Could not getsockname() to find out port number for ITC port");
}
}
virtual void OnAcceptReady(const std::string &ipconnectedto, int nfd, const std::string &incomingip)
{
// XXX unsafe casts suck
new Notifier(this->Parent, this->ServerInstance, nfd, (char *)ipconnectedto.c_str());
}
/* Using getsockname and ntohs, we can determine which port number we were allocated */
int GetPort()
{
#ifdef IPV6
return ntohs(sock_us.sin6_port);
#else
return ntohs(sock_us.sin_port);
#endif
}
}; };
ModuleSQL::ModuleSQL(InspIRCd* Me) : Module(Me), rehashing(false) ModuleSQL::ModuleSQL(InspIRCd* Me) : Module(Me), rehashing(false)
@ -802,25 +677,6 @@ ModuleSQL::ModuleSQL(InspIRCd* Me) : Module(Me), rehashing(false)
PublicServerInstance = ServerInstance; PublicServerInstance = ServerInstance;
currid = 0; currid = 0;
/* Create a socket on a random port. Let the tcp stack allocate us an available port */
#ifdef IPV6
MessagePipe = new MySQLListener(this, ServerInstance, 0, "::1");
#else
MessagePipe = new MySQLListener(this, ServerInstance, 0, "127.0.0.1");
#endif
if (MessagePipe->GetFd() == -1)
{
ServerInstance->Modules->DoneWithInterface("SQLutils");
throw ModuleException("m_mysql: unable to create ITC pipe");
}
else
{
LoggingMutex.Lock();
ServerInstance->Logs->Log("m_mysql", DEBUG, "MySQL: Interthread comms port is %d", MessagePipe->GetPort());
LoggingMutex.Unlock();
}
Dispatcher = new DispatcherThread(ServerInstance, this); Dispatcher = new DispatcherThread(ServerInstance, this);
ServerInstance->Threads->Start(Dispatcher); ServerInstance->Threads->Start(Dispatcher);
@ -861,13 +717,11 @@ const char* ModuleSQL::OnRequest(Request* request)
{ {
SQLrequest* req = (SQLrequest*)request; SQLrequest* req = (SQLrequest*)request;
/* XXX: Lock */
QueueMutex.Lock();
ConnMap::iterator iter; ConnMap::iterator iter;
const char* returnval = NULL; const char* returnval = NULL;
Dispatcher->LockQueue();
ConnMutex.Lock(); ConnMutex.Lock();
if((iter = Connections.find(req->dbid)) != Connections.end()) if((iter = Connections.find(req->dbid)) != Connections.end())
{ {
@ -881,7 +735,10 @@ const char* ModuleSQL::OnRequest(Request* request)
} }
ConnMutex.Unlock(); ConnMutex.Unlock();
QueueMutex.Unlock(); Dispatcher->UnlockQueueWakeup();
/* Yes, it's possible this will generate a spurious wakeup.
* That's fine, it'll just get ignored.
*/
return returnval; return returnval;
} }
@ -891,7 +748,9 @@ const char* ModuleSQL::OnRequest(Request* request)
void ModuleSQL::OnRehash(User* user, const std::string &parameter) void ModuleSQL::OnRehash(User* user, const std::string &parameter)
{ {
Dispatcher->LockQueue();
rehashing = true; rehashing = true;
Dispatcher->UnlockQueueWakeup();
} }
Version ModuleSQL::GetVersion() Version ModuleSQL::GetVersion()
@ -903,49 +762,15 @@ void DispatcherThread::Run()
{ {
LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent); LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent);
/* Connect back to the Notifier */ this->LockQueue();
while (!this->GetExitFlag())
if ((QueueFD = socket(AF_FAMILY, SOCK_STREAM, 0)) == -1)
{
/* crap, we're out of sockets... */
return;
}
irc::sockets::insp_sockaddr addr;
#ifdef IPV6
irc::sockets::insp_aton("::1", &addr.sin6_addr);
addr.sin6_family = AF_FAMILY;
addr.sin6_port = htons(MessagePipe->GetPort());
#else
irc::sockets::insp_inaddr ia;
irc::sockets::insp_aton("127.0.0.1", &ia);
addr.sin_family = AF_FAMILY;
addr.sin_addr = ia;
addr.sin_port = htons(MessagePipe->GetPort());
#endif
if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1)
{
/* wtf, we cant connect to it, but we just created it! */
return;
}
while (this->GetExitFlag() == false)
{ {
if (Parent->rehashing) if (Parent->rehashing)
{ {
/* XXX: Lock */
Parent->QueueMutex.Lock();
Parent->rehashing = false; Parent->rehashing = false;
LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent); LoadDatabases(Parent->Conf, Parent->PublicServerInstance, Parent);
Parent->QueueMutex.Unlock();
/* XXX: Unlock */
} }
SQLConnection* conn = NULL;
/* XXX: Lock here for safety */
Parent->QueueMutex.Lock();
Parent->ConnMutex.Lock(); Parent->ConnMutex.Lock();
for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++) for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++)
{ {
@ -956,24 +781,57 @@ void DispatcherThread::Run()
} }
} }
Parent->ConnMutex.Unlock(); Parent->ConnMutex.Unlock();
Parent->QueueMutex.Unlock();
/* XXX: Unlock */
/* Theres an item! */
if (conn) if (conn)
{ {
/* There's an item! */
this->UnlockQueue();
conn->DoLeadingQuery(); conn->DoLeadingQuery();
this->NotifyParent();
/* XXX: Lock */ this->LockQueue();
Parent->QueueMutex.Lock();
conn->queue.pop(); conn->queue.pop();
Parent->QueueMutex.Unlock();
/* XXX: Unlock */
} }
else
{
/* We know the queue is empty, we can safely hang this thread until
* something happens
*/
this->WaitForQueue();
}
}
this->UnlockQueue();
}
usleep(1000); void DispatcherThread::OnNotify()
{
while (1)
{
SQLConnection* conn = NULL;
Parent->ConnMutex.Lock();
for (ConnMap::iterator iter = Connections.begin(); iter != Connections.end(); iter++)
{
if (!iter->second->rq.empty())
{
conn = iter->second;
break;
}
}
Parent->ConnMutex.Unlock();
if (!conn)
break;
Parent->ResultsMutex.Lock();
ResultQueue::iterator n = conn->rq.begin();
Parent->ResultsMutex.Unlock();
(*n)->Send();
delete (*n);
Parent->ResultsMutex.Lock();
conn->rq.pop_front();
Parent->ResultsMutex.Unlock();
} }
} }
MODULE_INIT(ModuleSQL) MODULE_INIT(ModuleSQL)

View File

@ -53,6 +53,96 @@ ThreadEngine::~ThreadEngine()
void ThreadData::FreeThread(Thread* thread) void ThreadData::FreeThread(Thread* thread)
{ {
thread->SetExitFlag(true); thread->SetExitFlag();
pthread_join(pthread_id, NULL); pthread_join(pthread_id, NULL);
} }
#if 0
/* TODO this is a linux-specific syscall that allows signals to be
* sent using a single file descriptor, rather than 2 for a pipe.
* Requires glibc 2.8, kernel 2.6.22+
*/
#include <sys/eventfd.h>
class ThreadSignalSocket : public BufferedSocket
{
SocketThread* parent;
public:
ThreadSignalSocket(SocketThread* p, InspIRCd* SI, int newfd) :
BufferedSocket(SI, newfd, const_cast<char*>("0.0.0.0")), parent(p) {}
~ThreadSignalSocket()
{
}
void Notify()
{
eventfd_write(fd, 1);
}
virtual bool OnDataReady()
{
eventfd_t data;
if (eventfd_read(fd, &data))
return false;
parent->OnNotify();
return true;
}
};
SocketThread::SocketThread(InspIRCd* SI)
{
int fd = eventfd(0, 0); // TODO nonblock
if (fd < 0)
throw new CoreException("Could not create pipe " + std::string(strerror(errno)));
signal.sock = new ThreadSignalSocket(this, SI, fd);
}
#else
class ThreadSignalSocket : public BufferedSocket
{
SocketThread* parent;
int send_fd;
public:
ThreadSignalSocket(SocketThread* p, InspIRCd* SI, int recvfd, int sendfd) :
BufferedSocket(SI, recvfd, const_cast<char*>("0.0.0.0")), parent(p), send_fd(sendfd) {}
~ThreadSignalSocket()
{
close(send_fd);
}
void Notify()
{
char dummy = '*';
send(send_fd, &dummy, 1, 0);
}
virtual bool OnDataReady()
{
char data;
if (ServerInstance->SE->Recv(this, &data, 1, 0) <= 0)
return false;
parent->OnNotify();
return true;
}
};
SocketThread::SocketThread(InspIRCd* SI)
{
int fds[2];
if (pipe(fds))
throw new CoreException("Could not create pipe " + std::string(strerror(errno)));
signal.sock = new ThreadSignalSocket(this, SI, fds[0], fds[1]);
}
#endif
void SocketThread::NotifyParent()
{
signal.sock->Notify();
}
SocketThread::~SocketThread()
{
delete signal.sock;
}

View File

@ -51,3 +51,89 @@ void ThreadData::FreeThread(Thread* thread)
WaitForSingleObject(handle,INFINITE); WaitForSingleObject(handle,INFINITE);
} }
class ThreadSignalSocket : public BufferedSocket
{
SignalThread* parent;
public:
ThreadSignalSocket(SignalThread* t, InspIRCd* SI, int newfd, char* ip)
: BufferedSocket(SI, newfd, ip), parent(t)
{
parent->results = this;
}
virtual bool OnDataReady()
{
char data = 0;
if (ServerInstance->SE->Recv(this, &data, 1, 0) > 0)
{
parent->OnNotify();
return true;
}
return false;
}
};
class ThreadSignalListener : public ListenSocketBase
{
SocketThread* parent;
irc::sockets::insp_sockaddr sock_us;
public:
ThreadSignalListener(SocketThread* t, InspIRCd* Instance, int port, const std::string &addr) : ListenSocketBase(Instance, port, addr), parent(t)
{
socklen_t uslen = sizeof(sock_us);
if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen))
{
throw ModuleException("Could not getsockname() to find out port number for ITC port");
}
}
virtual void OnAcceptReady(const std::string &ipconnectedto, int nfd, const std::string &incomingip)
{
new ThreadSignalSocket(parent, ServerInstance, nfd, const_cast<char*>(ipconnectedto.c_str()));
ServerInstance->SE->DelFd(this);
// XXX unsafe casts suck
}
/* Using getsockname and ntohs, we can determine which port number we were allocated */
int GetPort()
{
#ifdef IPV6
return ntohs(sock_us.sin6_port);
#else
return ntohs(sock_us.sin_port);
#endif
}
};
SocketThread::SocketThread(InspIRCd* SI)
{
ThreadSignalListener* listener = new ThreadSignalListener(this, ServerInstance, 0, "127.0.0.1");
if (listener->GetFd() == -1)
throw CoreException("Could not create ITC pipe");
int connFD = socket(AF_INET, SOCK_STREAM, 0);
if (connFD == -1)
throw CoreException("Could not create ITC pipe");
irc::sockets::sockaddrs addr;
irc::sockets::insp_aton("127.0.0.1", &addr.in4.sin_addr);
addr.in4.sin_family = AF_INET;
addr.in4.sin_port = htons(listener->GetPort());
if (connect(connFD, &addr.sa, sizeof(addr.in4)) == -1)
{
ServerInstance->SE->DelFd(listener);
close(connFD);
throw CoreException("Could not connet to ITC pipe");
}
this->signal.connFD = connFD;
}
void SocketThread::NotifyParent()
{
char dummy = '*';
send(signal.connFD, &dummy, 1, 0);
}
SocketThread::~SocketThread()
{
close(signal.connFD);
}