Call OnStreamSocketWrite() once per write event

Do sendq flattening in SSL modules, move code for it into class SSLIOHook from core
This commit is contained in:
Attila Molnar 2015-06-06 14:42:59 +02:00
parent e05c258650
commit d0556a2a59
4 changed files with 40 additions and 26 deletions

View File

@ -138,6 +138,31 @@ class SSLIOHook : public IOHook
*/
reference<ssl_cert> certificate;
/** Reduce elements in a send queue by appending later elements to the first element until there are no more
* elements to append or a desired length is reached
* @param sendq SendQ to work on
* @param targetsize Target size of the front element
*/
static void FlattenSendQueue(StreamSocket::SendQueue& sendq, size_t targetsize)
{
if ((sendq.size() <= 1) || (sendq.front().length() >= targetsize))
return;
// Avoid multiple repeated SSL encryption invocations
// This adds a single copy of the queue, but avoids
// much more overhead in terms of system calls invoked
// by an IOHook.
std::string tmp;
tmp.reserve(std::min(targetsize, sendq.bytes())+1);
do
{
tmp.append(sendq.front());
sendq.pop_front();
}
while (!sendq.empty() && tmp.length() < targetsize);
sendq.push_front(tmp);
}
public:
SSLIOHook(IOHookProvider* hookprov)
: IOHook(hookprov)

View File

@ -202,33 +202,12 @@ void StreamSocket::DoWrite()
if (GetIOHook())
{
{
while (error.empty() && !sendq.empty())
{
if (sendq.size() > 1 && sendq.front().length() < 1024)
{
// Avoid multiple repeated SSL encryption invocations
// This adds a single copy of the queue, but avoids
// much more overhead in terms of system calls invoked
// by the IOHook.
//
// The length limit of 1024 is to prevent merging strings
// more than once when writes begin to block.
std::string tmp;
tmp.reserve(1280);
while (!sendq.empty() && tmp.length() < 1024)
{
tmp.append(sendq.front());
sendq.pop_front();
}
sendq.push_front(tmp);
}
{
int rv = GetIOHook()->OnStreamSocketWrite(this);
if (rv > 0)
{
// consumed the entire string, and is ready for more
sendq.pop_front();
}
else if (rv == 0)
{

View File

@ -987,14 +987,16 @@ info_done_dealloc:
StreamSocket::SendQueue& sendq = user->GetSendQ();
int ret = 0;
while (!sendq.empty())
{
FlattenSendQueue(sendq, profile->GetOutgoingRecordSize());
const StreamSocket::SendQueue::Element& buffer = sendq.front();
ret = gnutls_record_send(this->sess, buffer.data(), buffer.length());
if (ret == (int)buffer.length())
{
SocketEngine::ChangeEventMask(user, FD_WANT_NO_WRITE);
return 1;
// Wrote entire record, continue sending
sendq.pop_front();
}
else if (ret > 0)
{
@ -1014,6 +1016,9 @@ info_done_dealloc:
return -1;
}
}
SocketEngine::ChangeEventMask(user, FD_WANT_NO_WRITE);
return 1;
}
void TellCiphersAndFingerprint(LocalUser* user)

View File

@ -618,8 +618,10 @@ class OpenSSLIOHook : public SSLIOHook
// Session is ready for transferring application data
StreamSocket::SendQueue& sendq = user->GetSendQ();
while (!sendq.empty())
{
ERR_clear_error();
FlattenSendQueue(sendq, profile->GetOutgoingRecordSize());
const StreamSocket::SendQueue::Element& buffer = sendq.front();
int ret = SSL_write(sess, buffer.data(), buffer.size());
@ -630,9 +632,8 @@ class OpenSSLIOHook : public SSLIOHook
if (ret == (int)buffer.length())
{
data_to_write = false;
SocketEngine::ChangeEventMask(user, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
return 1;
// Wrote entire record, continue sending
sendq.pop_front();
}
else if (ret > 0)
{
@ -666,6 +667,10 @@ class OpenSSLIOHook : public SSLIOHook
}
}
}
data_to_write = false;
SocketEngine::ChangeEventMask(user, FD_WANT_POLL_READ | FD_WANT_NO_WRITE);
return 1;
}
void TellCiphersAndFingerprint(LocalUser* user)