/***************************************************************************** $Id$ File: ed.cpp Date: 06Apr06 Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. Gmail: blackhedd This program is free software; you can redistribute it and/or modify it under the terms of either: 1) the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version; or 2) Ruby's License. See the file COPYING for complete licensing information. *****************************************************************************/ #include "project.h" /******************** SetSocketNonblocking ********************/ bool SetSocketNonblocking (SOCKET sd) { #ifdef OS_UNIX int val = fcntl (sd, F_GETFL, 0); return (fcntl (sd, F_SETFL, val | O_NONBLOCK) != SOCKET_ERROR) ? true : false; #endif #ifdef OS_WIN32 #ifdef BUILD_FOR_RUBY // 14Jun09 Ruby provides its own wrappers for ioctlsocket. On 1.8 this is a simple wrapper, // however, 1.9 keeps its own state about the socket. // NOTE: F_GETFL is not supported return (fcntl (sd, F_SETFL, O_NONBLOCK) == 0) ? true : false; #else unsigned long one = 1; return (ioctlsocket (sd, FIONBIO, &one) == 0) ? true : false; #endif #endif } /************ SetFdCloexec ************/ #ifdef OS_UNIX bool SetFdCloexec (int fd) { int flags = fcntl(fd, F_GETFD, 0); assert (flags >= 0); flags |= FD_CLOEXEC; return (fcntl(fd, F_SETFD, FD_CLOEXEC) == 0) ? true : false; } #else bool SetFdCloexec (int fd UNUSED) { return true; } #endif /**************************************** EventableDescriptor::EventableDescriptor ****************************************/ EventableDescriptor::EventableDescriptor (SOCKET sd, EventMachine_t *em): bCloseNow (false), bCloseAfterWriting (false), MySocket (sd), bAttached (false), bWatchOnly (false), EventCallback (NULL), bCallbackUnbind (true), UnbindReasonCode (0), ProxyTarget(NULL), ProxiedFrom(NULL), ProxiedBytes(0), MaxOutboundBufSize(0), MyEventMachine (em), PendingConnectTimeout(20000000), InactivityTimeout (0), NextHeartbeat (0), bPaused (false) { /* There are three ways to close a socket, all of which should * automatically signal to the event machine that this object * should be removed from the polling scheduler. * First is a hard close, intended for bad errors or possible * security violations. It immediately closes the connection * and puts this object into an error state. * Second is to set bCloseNow, which will cause the event machine * to delete this object (and thus close the connection in our * destructor) the next chance it gets. bCloseNow also inhibits * the writing of new data on the socket (but not necessarily * the reading of new data). * The third way is to set bCloseAfterWriting, which inhibits * the writing of new data and converts to bCloseNow as soon * as everything in the outbound queue has been written. * bCloseAfterWriting is really for use only by protocol handlers * (for example, HTTP writes an HTML page and then closes the * connection). All of the error states we generate internally * cause an immediate close to be scheduled, which may have the * effect of discarding outbound data. */ if (sd == INVALID_SOCKET) throw std::runtime_error ("bad eventable descriptor"); if (MyEventMachine == NULL) throw std::runtime_error ("bad em in eventable descriptor"); CreatedAt = MyEventMachine->GetCurrentLoopTime(); LastActivity = MyEventMachine->GetCurrentLoopTime(); #ifdef HAVE_EPOLL EpollEvent.events = 0; EpollEvent.data.ptr = this; #endif } /***************************************** EventableDescriptor::~EventableDescriptor *****************************************/ EventableDescriptor::~EventableDescriptor() NO_EXCEPT_FALSE { if (NextHeartbeat) MyEventMachine->ClearHeartbeat(NextHeartbeat, this); if (EventCallback && bCallbackUnbind) (*EventCallback)(GetBinding(), EM_CONNECTION_UNBOUND, NULL, UnbindReasonCode); if (ProxiedFrom) { (*EventCallback)(ProxiedFrom->GetBinding(), EM_PROXY_TARGET_UNBOUND, NULL, 0); ProxiedFrom->StopProxy(); } MyEventMachine->NumCloseScheduled--; StopProxy(); Close(); } /************************************* EventableDescriptor::SetEventCallback *************************************/ void EventableDescriptor::SetEventCallback (EMCallback cb) { EventCallback = cb; } /************************** EventableDescriptor::Close **************************/ void EventableDescriptor::Close() { /* EventMachine relies on the fact that when close(fd) * is called that the fd is removed from any * epoll event queues. * * However, this is not *always* the behavior of close(fd) * * See man 4 epoll Q6/A6 and then consider what happens * when using pipes with eventmachine. * (As is often done when communicating with a subprocess) * * The pipes end up looking like: * * ls -l /proc//fd * ... * lr-x------ 1 root root 64 2011-08-19 21:31 3 -> pipe:[940970] * l-wx------ 1 root root 64 2011-08-19 21:31 4 -> pipe:[940970] * * This meets the critera from man 4 epoll Q6/A4 for not * removing fds from epoll event queues until all fds * that reference the underlying file have been removed. * * If the EventableDescriptor associated with fd 3 is deleted, * its dtor will call EventableDescriptor::Close(), * which will call ::close(int fd). * * However, unless the EventableDescriptor associated with fd 4 is * also deleted before the next call to epoll_wait, events may fire * for fd 3 that were registered with an already deleted * EventableDescriptor. * * Therefore, it is necessary to notify EventMachine that * the fd associated with this EventableDescriptor is * closing. * * EventMachine also never closes fds for STDIN, STDOUT and * STDERR (0, 1 & 2) */ // Close the socket right now. Intended for emergencies. if (MySocket != INVALID_SOCKET) { MyEventMachine->Deregister (this); // Do not close STDIN, STDOUT, STDERR if (MySocket > 2 && !bAttached) { shutdown (MySocket, 1); close (MySocket); } MySocket = INVALID_SOCKET; } } /********************************* EventableDescriptor::ShouldDelete *********************************/ bool EventableDescriptor::ShouldDelete() { /* For use by a socket manager, which needs to know if this object * should be removed from scheduling events and deleted. * Has an immediate close been scheduled, or are we already closed? * If either of these are the case, return true. In theory, the manager will * then delete us, which in turn will make sure the socket is closed. * Note, if bCloseAfterWriting is true, we check a virtual method to see * if there is outbound data to write, and only request a close if there is none. */ return ((MySocket == INVALID_SOCKET) || bCloseNow || (bCloseAfterWriting && (GetOutboundDataSize() <= 0))); } /********************************** EventableDescriptor::ScheduleClose **********************************/ void EventableDescriptor::ScheduleClose (bool after_writing) { if (IsCloseScheduled()) { if (!after_writing) { // If closing has become more urgent, then upgrade the scheduled // after_writing close to one NOW. bCloseNow = true; } return; } MyEventMachine->NumCloseScheduled++; // KEEP THIS SYNCHRONIZED WITH ::IsCloseScheduled. if (after_writing) bCloseAfterWriting = true; else bCloseNow = true; } /************************************* EventableDescriptor::IsCloseScheduled *************************************/ bool EventableDescriptor::IsCloseScheduled() { // KEEP THIS SYNCHRONIZED WITH ::ScheduleClose. return (bCloseNow || bCloseAfterWriting); } /******************************* EventableDescriptor::StartProxy *******************************/ void EventableDescriptor::StartProxy(const uintptr_t to, const unsigned long bufsize, const unsigned long length) { EventableDescriptor *ed = dynamic_cast (Bindable_t::GetObject (to)); if (ed) { StopProxy(); ProxyTarget = ed; BytesToProxy = length; ProxiedBytes = 0; ed->SetProxiedFrom(this, bufsize); return; } throw std::runtime_error ("Tried to proxy to an invalid descriptor"); } /****************************** EventableDescriptor::StopProxy ******************************/ void EventableDescriptor::StopProxy() { if (ProxyTarget) { ProxyTarget->SetProxiedFrom(NULL, 0); ProxyTarget = NULL; } } /*********************************** EventableDescriptor::SetProxiedFrom ***********************************/ void EventableDescriptor::SetProxiedFrom(EventableDescriptor *from, const unsigned long bufsize) { if (from != NULL && ProxiedFrom != NULL) throw std::runtime_error ("Tried to proxy to a busy target"); ProxiedFrom = from; MaxOutboundBufSize = bufsize; } /******************************************** EventableDescriptor::_GenericInboundDispatch ********************************************/ void EventableDescriptor::_GenericInboundDispatch(const char *buf, unsigned long size) { assert(EventCallback); if (ProxyTarget) { if (BytesToProxy > 0) { unsigned long proxied = std::min(BytesToProxy, size); ProxyTarget->SendOutboundData(buf, proxied); ProxiedBytes += (unsigned long) proxied; BytesToProxy -= proxied; if (BytesToProxy == 0) { StopProxy(); (*EventCallback)(GetBinding(), EM_PROXY_COMPLETED, NULL, 0); if (proxied < size) { (*EventCallback)(GetBinding(), EM_CONNECTION_READ, buf + proxied, size - proxied); } } } else { ProxyTarget->SendOutboundData(buf, size); ProxiedBytes += size; } } else { (*EventCallback)(GetBinding(), EM_CONNECTION_READ, buf, size); } } /********************************* EventableDescriptor::_GenericGetPeername *********************************/ bool EventableDescriptor::_GenericGetPeername (struct sockaddr *s, socklen_t *len) { if (!s) return false; int gp = getpeername (GetSocket(), s, len); if (gp == -1) { char buf[200]; snprintf (buf, sizeof(buf)-1, "unable to get peer name: %s", strerror(errno)); throw std::runtime_error (buf); } return true; } /********************************* EventableDescriptor::_GenericGetSockname *********************************/ bool EventableDescriptor::_GenericGetSockname (struct sockaddr *s, socklen_t *len) { if (!s) return false; int gp = getsockname (GetSocket(), s, len); if (gp == -1) { char buf[200]; snprintf (buf, sizeof(buf)-1, "unable to get sock name: %s", strerror(errno)); throw std::runtime_error (buf); } return true; } /********************************************* EventableDescriptor::GetPendingConnectTimeout *********************************************/ uint64_t EventableDescriptor::GetPendingConnectTimeout() { return PendingConnectTimeout / 1000; } /********************************************* EventableDescriptor::SetPendingConnectTimeout *********************************************/ int EventableDescriptor::SetPendingConnectTimeout (uint64_t value) { if (value > 0) { PendingConnectTimeout = value * 1000; MyEventMachine->QueueHeartbeat(this); return 1; } return 0; } /************************************* EventableDescriptor::GetNextHeartbeat *************************************/ uint64_t EventableDescriptor::GetNextHeartbeat() { if (NextHeartbeat) MyEventMachine->ClearHeartbeat(NextHeartbeat, this); NextHeartbeat = 0; if (!ShouldDelete()) { uint64_t time_til_next = InactivityTimeout; if (IsConnectPending()) { if (time_til_next == 0 || PendingConnectTimeout < time_til_next) time_til_next = PendingConnectTimeout; } if (time_til_next == 0) return 0; NextHeartbeat = time_til_next + MyEventMachine->GetRealTime(); } return NextHeartbeat; } /****************************************** ConnectionDescriptor::ConnectionDescriptor ******************************************/ ConnectionDescriptor::ConnectionDescriptor (SOCKET sd, EventMachine_t *em): EventableDescriptor (sd, em), bConnectPending (false), bNotifyReadable (false), bNotifyWritable (false), bReadAttemptedAfterClose (false), bWriteAttemptedAfterClose (false), OutboundDataSize (0), #ifdef WITH_SSL SslBox (NULL), bHandshakeSignaled (false), bSslVerifyPeer (false), bSslPeerAccepted(false), #endif #ifdef HAVE_KQUEUE bGotExtraKqueueEvent(false), #endif bIsServer (false) { // 22Jan09: Moved ArmKqueueWriter into SetConnectPending() to fix assertion failure in _WriteOutboundData() // 5May09: Moved EPOLLOUT into SetConnectPending() so it doesn't happen for attached read pipes } /******************************************* ConnectionDescriptor::~ConnectionDescriptor *******************************************/ ConnectionDescriptor::~ConnectionDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); #ifdef WITH_SSL if (SslBox) delete SslBox; #endif } /*********************************** ConnectionDescriptor::_UpdateEvents ************************************/ void ConnectionDescriptor::_UpdateEvents() { _UpdateEvents(true, true); } void ConnectionDescriptor::_UpdateEvents(bool read, bool write) { if (MySocket == INVALID_SOCKET) return; if (!read && !write) return; #ifdef HAVE_EPOLL unsigned int old = EpollEvent.events; if (read) { if (SelectForRead()) EpollEvent.events |= EPOLLIN; else EpollEvent.events &= ~EPOLLIN; } if (write) { if (SelectForWrite()) EpollEvent.events |= EPOLLOUT; else EpollEvent.events &= ~EPOLLOUT; } if (old != EpollEvent.events) MyEventMachine->Modify (this); #endif #ifdef HAVE_KQUEUE if (read && SelectForRead()) MyEventMachine->ArmKqueueReader (this); bKqueueArmWrite = SelectForWrite(); if (write && bKqueueArmWrite) MyEventMachine->Modify (this); #endif } /*************************************** ConnectionDescriptor::SetConnectPending ****************************************/ void ConnectionDescriptor::SetConnectPending(bool f) { bConnectPending = f; MyEventMachine->QueueHeartbeat(this); _UpdateEvents(); } /********************************** ConnectionDescriptor::SetAttached ***********************************/ void ConnectionDescriptor::SetAttached(bool state) { bAttached = state; } /********************************** ConnectionDescriptor::SetWatchOnly ***********************************/ void ConnectionDescriptor::SetWatchOnly(bool watching) { bWatchOnly = watching; _UpdateEvents(); } /********************************* ConnectionDescriptor::HandleError *********************************/ void ConnectionDescriptor::HandleError() { if (bWatchOnly) { // An EPOLLHUP | EPOLLIN condition will call Read() before HandleError(), in which case the // socket is already detached and invalid, so we don't need to do anything. if (MySocket == INVALID_SOCKET) return; // HandleError() is called on WatchOnly descriptors by the epoll reactor // when it gets a EPOLLERR | EPOLLHUP. Usually this would show up as a readable and // writable event on other reactors, so we have to fire those events ourselves. if (bNotifyReadable) Read(); if (bNotifyWritable) Write(); } else { ScheduleClose (false); } } /*********************************** ConnectionDescriptor::ScheduleClose ***********************************/ void ConnectionDescriptor::ScheduleClose (bool after_writing) { if (bWatchOnly) throw std::runtime_error ("cannot close 'watch only' connections"); EventableDescriptor::ScheduleClose(after_writing); } /*************************************** ConnectionDescriptor::SetNotifyReadable ****************************************/ void ConnectionDescriptor::SetNotifyReadable(bool readable) { if (!bWatchOnly) throw std::runtime_error ("notify_readable must be on 'watch only' connections"); bNotifyReadable = readable; _UpdateEvents(true, false); } /*************************************** ConnectionDescriptor::SetNotifyWritable ****************************************/ void ConnectionDescriptor::SetNotifyWritable(bool writable) { if (!bWatchOnly) throw std::runtime_error ("notify_writable must be on 'watch only' connections"); bNotifyWritable = writable; _UpdateEvents(false, true); } /************************************** ConnectionDescriptor::SendOutboundData **************************************/ int ConnectionDescriptor::SendOutboundData (const char *data, unsigned long length) { if (bWatchOnly) throw std::runtime_error ("cannot send data on a 'watch only' connection"); if (ProxiedFrom && MaxOutboundBufSize && (unsigned int)(GetOutboundDataSize() + length) > MaxOutboundBufSize) ProxiedFrom->Pause(); #ifdef WITH_SSL if (SslBox) { if (length > 0) { unsigned long writed = 0; char *p = (char*)data; while (writed < length) { int to_write = SSLBOX_INPUT_CHUNKSIZE; int remaining = length - writed; if (remaining < SSLBOX_INPUT_CHUNKSIZE) to_write = remaining; int w = SslBox->PutPlaintext (p, to_write); if (w < 0) { ScheduleClose (false); }else _DispatchCiphertext(); p += to_write; writed += to_write; } } // TODO: What's the correct return value? return 1; // That's a wild guess, almost certainly wrong. } else #endif return _SendRawOutboundData (data, length); } /****************************************** ConnectionDescriptor::_SendRawOutboundData ******************************************/ int ConnectionDescriptor::_SendRawOutboundData (const char *data, unsigned long length) { /* This internal method is called to schedule bytes that * will be sent out to the remote peer. * It's not directly accessed by the caller, who hits ::SendOutboundData, * which may or may not filter or encrypt the caller's data before * sending it here. */ // Highly naive and incomplete implementation. // There's no throttle for runaways (which should abort only this connection // and not the whole process), and no coalescing of small pages. // (Well, not so bad, small pages are coalesced in ::Write) if (IsCloseScheduled()) return 0; // 25Mar10: Ignore 0 length packets as they are not meaningful in TCP (as opposed to UDP) // and can cause the assert(nbytes>0) to fail when OutboundPages has a bunch of 0 length pages. if (length == 0) return 0; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length)); OutboundDataSize += length; _UpdateEvents(false, true); return length; } /*********************************** ConnectionDescriptor::SelectForRead ***********************************/ bool ConnectionDescriptor::SelectForRead() { /* A connection descriptor is always scheduled for read, * UNLESS it's in a pending-connect state. * On Linux, unlike Unix, a nonblocking socket on which * connect has been called, does NOT necessarily select * both readable and writable in case of error. * The socket will select writable when the disposition * of the connect is known. On the other hand, a socket * which successfully connects and selects writable may * indeed have some data available on it, so it will * select readable in that case, violating expectations! * So we will not poll for readability until the socket * is known to be in a connected state. */ if (bPaused) return false; else if (bConnectPending) return false; else if (bWatchOnly) return bNotifyReadable ? true : false; else return true; } /************************************ ConnectionDescriptor::SelectForWrite ************************************/ bool ConnectionDescriptor::SelectForWrite() { /* Cf the notes under SelectForRead. * In a pending-connect state, we ALWAYS select for writable. * In a normal state, we only select for writable when we * have outgoing data to send. */ if (bPaused) return false; else if (bConnectPending) return true; else if (bWatchOnly) return bNotifyWritable ? true : false; else return (GetOutboundDataSize() > 0); } /*************************** ConnectionDescriptor::Pause ***************************/ bool ConnectionDescriptor::Pause() { if (bWatchOnly) throw std::runtime_error ("cannot pause/resume 'watch only' connections, set notify readable/writable instead"); bool old = bPaused; bPaused = true; _UpdateEvents(); return old == false; } /**************************** ConnectionDescriptor::Resume ****************************/ bool ConnectionDescriptor::Resume() { if (bWatchOnly) throw std::runtime_error ("cannot pause/resume 'watch only' connections, set notify readable/writable instead"); bool old = bPaused; bPaused = false; _UpdateEvents(); return old == true; } /************************** ConnectionDescriptor::Read **************************/ void ConnectionDescriptor::Read() { /* Read and dispatch data on a socket that has selected readable. * It's theoretically possible to get and dispatch incoming data on * a socket that has already been scheduled for closing or close-after-writing. * In those cases, we'll leave it up the to protocol handler to "do the * right thing" (which probably means to ignore the incoming data). * * 22Aug06: Chris Ochs reports that on FreeBSD, it's possible to come * here with the socket already closed, after the process receives * a ctrl-C signal (not sure if that's TERM or INT on BSD). The application * was one in which network connections were doing a lot of interleaved reads * and writes. * Since we always write before reading (in order to keep the outbound queues * as light as possible), I think what happened is that an interrupt caused * the socket to be closed in ConnectionDescriptor::Write. We'll then * come here in the same pass through the main event loop, and won't get * cleaned up until immediately after. * We originally asserted that the socket was valid when we got here. * To deal properly with the possibility that we are closed when we get here, * I removed the assert. HOWEVER, the potential for an infinite loop scares me, * so even though this is really clunky, I added a flag to assert that we never * come here more than once after being closed. (FCianfrocca) */ SOCKET sd = GetSocket(); //assert (sd != INVALID_SOCKET); (original, removed 22Aug06) if (sd == INVALID_SOCKET) { assert (!bReadAttemptedAfterClose); bReadAttemptedAfterClose = true; return; } if (bWatchOnly) { if (bNotifyReadable && EventCallback) (*EventCallback)(GetBinding(), EM_CONNECTION_NOTIFY_READABLE, NULL, 0); return; } LastActivity = MyEventMachine->GetCurrentLoopTime(); int total_bytes_read = 0; char readbuffer [16 * 1024 + 1]; for (int i=0; i < 10; i++) { // Don't read just one buffer and then move on. This is faster // if there is a lot of incoming. // But don't read indefinitely. Give other sockets a chance to run. // NOTICE, we're reading one less than the buffer size. // That's so we can put a guard byte at the end of what we send // to user code. int r = read (sd, readbuffer, sizeof(readbuffer) - 1); #ifdef OS_WIN32 int e = WSAGetLastError(); #else int e = errno; #endif //cerr << ""; if (r > 0) { total_bytes_read += r; // Add a null-terminator at the the end of the buffer // that we will send to the callback. // DO NOT EVER CHANGE THIS. We want to explicitly allow users // to be able to depend on this behavior, so they will have // the option to do some things faster. Additionally it's // a security guard against buffer overflows. readbuffer [r] = 0; _DispatchInboundData (readbuffer, r); if (bPaused) break; } else if (r == 0) { break; } else { #ifdef OS_UNIX if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EAGAIN) && (e != EINTR)) { #endif #ifdef OS_WIN32 if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK)) { #endif // 26Mar11: Previously, all read errors were assumed to be EWOULDBLOCK and ignored. // Now, instead, we call Close() on errors like ECONNRESET and ENOTCONN. UnbindReasonCode = e; Close(); break; } else { // Basically a would-block, meaning we've read everything there is to read. break; } } } if (total_bytes_read == 0) { // If we read no data on a socket that selected readable, // it generally means the other end closed the connection gracefully. ScheduleClose (false); //bCloseNow = true; } } /****************************************** ConnectionDescriptor::_DispatchInboundData ******************************************/ #ifdef WITH_SSL void ConnectionDescriptor::_DispatchInboundData (const char *buffer, unsigned long size) { if (SslBox) { SslBox->PutCiphertext (buffer, size); int s; char B [2048]; while ((s = SslBox->GetPlaintext (B, sizeof(B) - 1)) > 0) { _CheckHandshakeStatus(); B [s] = 0; _GenericInboundDispatch(B, s); } // If our SSL handshake had a problem, shut down the connection. if (s == -2) { #ifndef EPROTO // OpenBSD does not have EPROTO #define EPROTO EINTR #endif #ifdef OS_UNIX UnbindReasonCode = EPROTO; #endif #ifdef OS_WIN32 UnbindReasonCode = WSAECONNABORTED; #endif ScheduleClose(false); return; } _CheckHandshakeStatus(); _DispatchCiphertext(); } else { _GenericInboundDispatch(buffer, size); } } #else void ConnectionDescriptor::_DispatchInboundData (const char *buffer, unsigned long size) { _GenericInboundDispatch(buffer, size); } #endif /******************************************* ConnectionDescriptor::_CheckHandshakeStatus *******************************************/ void ConnectionDescriptor::_CheckHandshakeStatus() { #ifdef WITH_SSL if (SslBox && (!bHandshakeSignaled) && SslBox->IsHandshakeCompleted()) { bHandshakeSignaled = true; if (EventCallback) (*EventCallback)(GetBinding(), EM_SSL_HANDSHAKE_COMPLETED, NULL, 0); } #endif } /*************************** ConnectionDescriptor::Write ***************************/ void ConnectionDescriptor::Write() { /* A socket which is in a pending-connect state will select * writable when the disposition of the connect is known. * At that point, check to be sure there are no errors, * and if none, then promote the socket out of the pending * state. * TODO: I haven't figured out how Windows signals errors on * unconnected sockets. Maybe it does the untraditional but * logical thing and makes the socket selectable for error. * If so, it's unsupported here for the time being, and connect * errors will have to be caught by the timeout mechanism. */ if (bConnectPending) { int error; socklen_t len; len = sizeof(error); #ifdef OS_UNIX int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, &error, &len); #endif #ifdef OS_WIN32 int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, (char*)&error, &len); #endif if ((o == 0) && (error == 0)) { if (EventCallback) (*EventCallback)(GetBinding(), EM_CONNECTION_COMPLETED, "", 0); // 5May09: Moved epoll/kqueue read/write arming into SetConnectPending, so it can be called // from EventMachine_t::AttachFD as well. SetConnectPending (false); } else { if (o == 0) UnbindReasonCode = error; ScheduleClose (false); //bCloseNow = true; } } else { if (bNotifyWritable) { if (EventCallback) (*EventCallback)(GetBinding(), EM_CONNECTION_NOTIFY_WRITABLE, NULL, 0); _UpdateEvents(false, true); return; } assert(!bWatchOnly); /* 5May09: Kqueue bugs on OSX cause one extra writable event to fire even though we're using EV_ONESHOT. We ignore this extra event once, but only the first time. If it happens again, we should fall through to the assert(nbytes>0) failure to catch any EM bugs which might cause ::Write to be called in a busy-loop. */ #ifdef HAVE_KQUEUE if (MyEventMachine->GetPoller() == Poller_Kqueue) { if (OutboundDataSize == 0 && !bGotExtraKqueueEvent) { bGotExtraKqueueEvent = true; return; } else if (OutboundDataSize > 0) { bGotExtraKqueueEvent = false; } } #endif _WriteOutboundData(); } } /**************************************** ConnectionDescriptor::_WriteOutboundData ****************************************/ void ConnectionDescriptor::_WriteOutboundData() { /* This is a helper function called by ::Write. * It's possible for a socket to select writable and then no longer * be writable by the time we get around to writing. The kernel might * have used up its available output buffers between the select call * and when we get here. So this condition is not an error. * * 20Jul07, added the same kind of protection against an invalid socket * that is at the top of ::Read. Not entirely how this could happen in * real life (connection-reset from the remote peer, perhaps?), but I'm * doing it to address some reports of crashing under heavy loads. */ SOCKET sd = GetSocket(); //assert (sd != INVALID_SOCKET); if (sd == INVALID_SOCKET) { assert (!bWriteAttemptedAfterClose); bWriteAttemptedAfterClose = true; return; } LastActivity = MyEventMachine->GetCurrentLoopTime(); size_t nbytes = 0; #ifdef HAVE_WRITEV int iovcnt = OutboundPages.size(); // Max of 16 outbound pages at a time if (iovcnt > 16) iovcnt = 16; iovec iov[16]; for(int i = 0; i < iovcnt; i++){ OutboundPage *op = &(OutboundPages[i]); #ifdef CC_SUNWspro // TODO: The void * cast works fine on Solaris 11, but // I don't know at what point that changed from older Solaris. iov[i].iov_base = (char *)(op->Buffer + op->Offset); #else iov[i].iov_base = (void *)(op->Buffer + op->Offset); #endif iov[i].iov_len = op->Length - op->Offset; nbytes += iov[i].iov_len; } #else char output_buffer [16 * 1024]; while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) { OutboundPage *op = &(OutboundPages[0]); if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) { memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset); nbytes += (op->Length - op->Offset); op->Free(); OutboundPages.pop_front(); } else { int len = sizeof(output_buffer) - nbytes; memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len); op->Offset += len; nbytes += len; } } #endif // We should never have gotten here if there were no data to write, // so assert that as a sanity check. // Don't bother to make sure nbytes is less than output_buffer because // if it were we probably would have crashed already. assert (nbytes > 0); assert (GetSocket() != INVALID_SOCKET); #ifdef HAVE_WRITEV int bytes_written = writev (GetSocket(), iov, iovcnt); #else int bytes_written = write (GetSocket(), output_buffer, nbytes); #endif bool err = false; #ifdef OS_WIN32 int e = WSAGetLastError(); #else int e = errno; #endif if (bytes_written < 0) { err = true; bytes_written = 0; } assert (bytes_written >= 0); OutboundDataSize -= bytes_written; if (ProxiedFrom && MaxOutboundBufSize && (unsigned int)GetOutboundDataSize() < MaxOutboundBufSize && ProxiedFrom->IsPaused()) ProxiedFrom->Resume(); #ifdef HAVE_WRITEV if (!err) { unsigned int sent = bytes_written; std::deque::iterator op = OutboundPages.begin(); for (int i = 0; i < iovcnt; i++) { if (iov[i].iov_len <= sent) { // Sent this page in full, free it. op->Free(); OutboundPages.pop_front(); sent -= iov[i].iov_len; } else { // Sent part (or none) of this page, increment offset to send the remainder op->Offset += sent; break; } // Shouldn't be possible run out of pages before the loop ends assert(op != OutboundPages.end()); *op++; } } #else if ((size_t)bytes_written < nbytes) { int len = nbytes - bytes_written; char *buffer = (char*) malloc (len + 1); if (!buffer) throw std::runtime_error ("bad alloc throwing back data"); memcpy (buffer, output_buffer + bytes_written, len); buffer [len] = 0; OutboundPages.push_front (OutboundPage (buffer, len)); } #endif _UpdateEvents(false, true); if (err) { #ifdef OS_UNIX if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EINTR)) { #endif #ifdef OS_WIN32 if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK)) { #endif UnbindReasonCode = e; Close(); } } } /*************************************** ConnectionDescriptor::ReportErrorStatus ***************************************/ int ConnectionDescriptor::ReportErrorStatus() { if (MySocket == INVALID_SOCKET) { return -1; } int error; socklen_t len; len = sizeof(error); #ifdef OS_UNIX int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, &error, &len); #endif #ifdef OS_WIN32 int o = getsockopt (GetSocket(), SOL_SOCKET, SO_ERROR, (char*)&error, &len); #endif if ((o == 0) && (error == 0)) return 0; else if (o == 0) return error; else return -1; } /****************************** ConnectionDescriptor::StartTls ******************************/ #ifdef WITH_SSL void ConnectionDescriptor::StartTls() { if (SslBox) throw std::runtime_error ("SSL/TLS already running on connection"); SslBox = new SslBox_t (bIsServer, PrivateKeyFilename, CertChainFilename, bSslVerifyPeer, bSslFailIfNoPeerCert, SniHostName, CipherList, EcdhCurve, DhParam, Protocols, GetBinding()); _DispatchCiphertext(); } #else void ConnectionDescriptor::StartTls() { throw std::runtime_error ("Encryption not available on this event-machine"); } #endif /********************************* ConnectionDescriptor::SetTlsParms *********************************/ #ifdef WITH_SSL void ConnectionDescriptor::SetTlsParms (const char *privkey_filename, const char *certchain_filename, bool verify_peer, bool fail_if_no_peer_cert, const char *sni_hostname, const char *cipherlist, const char *ecdh_curve, const char *dhparam, int protocols) { if (SslBox) throw std::runtime_error ("call SetTlsParms before calling StartTls"); if (privkey_filename && *privkey_filename) PrivateKeyFilename = privkey_filename; if (certchain_filename && *certchain_filename) CertChainFilename = certchain_filename; bSslVerifyPeer = verify_peer; bSslFailIfNoPeerCert = fail_if_no_peer_cert; if (sni_hostname && *sni_hostname) SniHostName = sni_hostname; if (cipherlist && *cipherlist) CipherList = cipherlist; if (ecdh_curve && *ecdh_curve) EcdhCurve = ecdh_curve; if (dhparam && *dhparam) DhParam = dhparam; Protocols = protocols; } #else void ConnectionDescriptor::SetTlsParms (const char *privkey_filename UNUSED, const char *certchain_filename UNUSED, bool verify_peer UNUSED, bool fail_if_no_peer_cert UNUSED, const char *sni_hostname UNUSED, const char *cipherlist UNUSED, const char *ecdh_curve UNUSED, const char *dhparam UNUSED, int protocols UNUSED) { throw std::runtime_error ("Encryption not available on this event-machine"); } #endif /********************************* ConnectionDescriptor::GetPeerCert *********************************/ #ifdef WITH_SSL X509 *ConnectionDescriptor::GetPeerCert() { if (!SslBox) throw std::runtime_error ("SSL/TLS not running on this connection"); return SslBox->GetPeerCert(); } #endif /********************************* ConnectionDescriptor::GetCipherBits *********************************/ #ifdef WITH_SSL int ConnectionDescriptor::GetCipherBits() { if (!SslBox) throw std::runtime_error ("SSL/TLS not running on this connection"); return SslBox->GetCipherBits(); } #endif /********************************* ConnectionDescriptor::GetCipherName *********************************/ #ifdef WITH_SSL const char *ConnectionDescriptor::GetCipherName() { if (!SslBox) throw std::runtime_error ("SSL/TLS not running on this connection"); return SslBox->GetCipherName(); } #endif /********************************* ConnectionDescriptor::GetCipherProtocol *********************************/ #ifdef WITH_SSL const char *ConnectionDescriptor::GetCipherProtocol() { if (!SslBox) throw std::runtime_error ("SSL/TLS not running on this connection"); return SslBox->GetCipherProtocol(); } #endif /********************************* ConnectionDescriptor::GetSNIHostname *********************************/ #ifdef WITH_SSL const char *ConnectionDescriptor::GetSNIHostname() { if (!SslBox) throw std::runtime_error ("SSL/TLS not running on this connection"); return SslBox->GetSNIHostname(); } #endif /*********************************** ConnectionDescriptor::VerifySslPeer ***********************************/ #ifdef WITH_SSL bool ConnectionDescriptor::VerifySslPeer(const char *cert) { bSslPeerAccepted = false; if (EventCallback) (*EventCallback)(GetBinding(), EM_SSL_VERIFY, cert, strlen(cert)); return bSslPeerAccepted; } #endif /*********************************** ConnectionDescriptor::AcceptSslPeer ***********************************/ #ifdef WITH_SSL void ConnectionDescriptor::AcceptSslPeer() { bSslPeerAccepted = true; } #endif /***************************************** ConnectionDescriptor::_DispatchCiphertext *****************************************/ #ifdef WITH_SSL void ConnectionDescriptor::_DispatchCiphertext() { assert (SslBox); char BigBuf [SSLBOX_OUTPUT_CHUNKSIZE]; bool did_work; do { did_work = false; // try to drain ciphertext while (SslBox->CanGetCiphertext()) { int r = SslBox->GetCiphertext (BigBuf, sizeof(BigBuf)); assert (r > 0); _SendRawOutboundData (BigBuf, r); did_work = true; } // Pump the SslBox, in case it has queued outgoing plaintext // This will return >0 if data was written, // 0 if no data was written, and <0 if there was a fatal error. bool pump; do { pump = false; int w = SslBox->PutPlaintext (NULL, 0); if (w > 0) { did_work = true; pump = true; } else if (w < 0) ScheduleClose (false); } while (pump); // try to put plaintext. INCOMPLETE, doesn't belong here? // In SendOutboundData, we're spooling plaintext directly // into SslBox. That may be wrong, we may need to buffer it // up here! /* const char *ptr; int ptr_length; while (OutboundPlaintext.GetPage (&ptr, &ptr_length)) { assert (ptr && (ptr_length > 0)); int w = SslMachine.PutPlaintext (ptr, ptr_length); if (w > 0) { OutboundPlaintext.DiscardBytes (w); did_work = true; } else break; } */ } while (did_work); } #endif /******************************* ConnectionDescriptor::Heartbeat *******************************/ void ConnectionDescriptor::Heartbeat() { /* Only allow a certain amount of time to go by while waiting * for a pending connect. If it expires, then kill the socket. * For a connected socket, close it if its inactivity timer * has expired. */ if (bConnectPending) { if ((MyEventMachine->GetCurrentLoopTime() - CreatedAt) >= PendingConnectTimeout) { UnbindReasonCode = ETIMEDOUT; ScheduleClose (false); //bCloseNow = true; } } else { if (InactivityTimeout && ((MyEventMachine->GetCurrentLoopTime() - LastActivity) >= InactivityTimeout)) { UnbindReasonCode = ETIMEDOUT; ScheduleClose (false); //bCloseNow = true; } } } /**************************************** LoopbreakDescriptor::LoopbreakDescriptor ****************************************/ LoopbreakDescriptor::LoopbreakDescriptor (SOCKET sd, EventMachine_t *parent_em): EventableDescriptor (sd, parent_em) { /* This is really bad and ugly. Change someday if possible. * We have to know about an event-machine (probably the one that owns us), * so we can pass newly-created connections to it. */ bCallbackUnbind = false; #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /************************* LoopbreakDescriptor::Read *************************/ void LoopbreakDescriptor::Read() { // TODO, refactor, this code is probably in the wrong place. assert (MyEventMachine); MyEventMachine->_ReadLoopBreaker(); } /************************** LoopbreakDescriptor::Write **************************/ void LoopbreakDescriptor::Write() { // Why are we here? throw std::runtime_error ("bad code path in loopbreak"); } /************************************** AcceptorDescriptor::AcceptorDescriptor **************************************/ AcceptorDescriptor::AcceptorDescriptor (SOCKET sd, EventMachine_t *parent_em): EventableDescriptor (sd, parent_em) { #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /*************************************** AcceptorDescriptor::~AcceptorDescriptor ***************************************/ AcceptorDescriptor::~AcceptorDescriptor() { } /**************************************** STATIC: AcceptorDescriptor::StopAcceptor ****************************************/ void AcceptorDescriptor::StopAcceptor (const uintptr_t binding) { // TODO: This is something of a hack, or at least it's a static method of the wrong class. AcceptorDescriptor *ad = dynamic_cast (Bindable_t::GetObject (binding)); if (ad) ad->ScheduleClose (false); else throw std::runtime_error ("failed to close nonexistent acceptor"); } /************************ AcceptorDescriptor::Read ************************/ void AcceptorDescriptor::Read() { /* Accept up to a certain number of sockets on the listening connection. * Don't try to accept all that are present, because this would allow a DoS attack * in which no data were ever read or written. We should accept more than one, * if available, to keep the partially accepted sockets from backing up in the kernel. */ /* Make sure we use non-blocking i/o on the acceptor socket, since we're selecting it * for readability. According to Stevens UNP, it's possible for an acceptor to select readable * and then block when we call accept. For example, the other end resets the connection after * the socket selects readable and before we call accept. The kernel will remove the dead * socket from the accept queue. If the accept queue is now empty, accept will block. */ struct sockaddr_in6 pin; socklen_t addrlen = sizeof (pin); int accept_count = EventMachine_t::GetSimultaneousAcceptCount(); for (int i=0; i < accept_count; i++) { #if defined(HAVE_CONST_SOCK_CLOEXEC) && defined(HAVE_ACCEPT4) SOCKET sd = accept4 (GetSocket(), (struct sockaddr*)&pin, &addrlen, SOCK_CLOEXEC); if (sd == INVALID_SOCKET) { // We may be running in a kernel where // SOCK_CLOEXEC is not supported - fall back: sd = accept (GetSocket(), (struct sockaddr*)&pin, &addrlen); } #else SOCKET sd = accept (GetSocket(), (struct sockaddr*)&pin, &addrlen); #endif if (sd == INVALID_SOCKET) { // This breaks the loop when we've accepted everything on the kernel queue, // up to 10 new connections. But what if the *first* accept fails? // Does that mean anything serious is happening, beyond the situation // described in the note above? break; } // Set the newly-accepted socket non-blocking and to close on exec. // On Windows, this may fail because, weirdly, Windows inherits the non-blocking // attribute that we applied to the acceptor socket into the accepted one. if (!SetFdCloexec(sd) || !SetSocketNonblocking (sd)) { //int val = fcntl (sd, F_GETFL, 0); //if (fcntl (sd, F_SETFL, val | O_NONBLOCK) == -1) { shutdown (sd, 1); close (sd); continue; } // Disable slow-start (Nagle algorithm). Eventually make this configurable. int one = 1; setsockopt (sd, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof(one)); ConnectionDescriptor *cd = new ConnectionDescriptor (sd, MyEventMachine); if (!cd) throw std::runtime_error ("no newly accepted connection"); cd->SetServerMode(); if (EventCallback) { (*EventCallback) (GetBinding(), EM_CONNECTION_ACCEPTED, NULL, cd->GetBinding()); } #ifdef HAVE_EPOLL cd->GetEpollEvent()->events = 0; if (cd->SelectForRead()) cd->GetEpollEvent()->events |= EPOLLIN; if (cd->SelectForWrite()) cd->GetEpollEvent()->events |= EPOLLOUT; #endif assert (MyEventMachine); MyEventMachine->Add (cd); #ifdef HAVE_KQUEUE bKqueueArmWrite = cd->SelectForWrite(); if (bKqueueArmWrite) MyEventMachine->Modify (cd); if (cd->SelectForRead()) MyEventMachine->ArmKqueueReader (cd); #endif } } /************************* AcceptorDescriptor::Write *************************/ void AcceptorDescriptor::Write() { // Why are we here? throw std::runtime_error ("bad code path in acceptor"); } /***************************** AcceptorDescriptor::Heartbeat *****************************/ void AcceptorDescriptor::Heartbeat() { // No-op } /************************************** DatagramDescriptor::DatagramDescriptor **************************************/ DatagramDescriptor::DatagramDescriptor (SOCKET sd, EventMachine_t *parent_em): EventableDescriptor (sd, parent_em), OutboundDataSize (0) { memset (&ReturnAddress, 0, sizeof(ReturnAddress)); /* Provisionally added 19Oct07. All datagram sockets support broadcasting. * Until now, sending to a broadcast address would give EACCES (permission denied) * on systems like Linux and BSD that require the SO_BROADCAST socket-option in order * to accept a packet to a broadcast address. Solaris doesn't require it. I think * Windows DOES require it but I'm not sure. * * Ruby does NOT do what we're doing here. In Ruby, you have to explicitly set SO_BROADCAST * on a UDP socket in order to enable broadcasting. The reason for requiring the option * in the first place is so that applications don't send broadcast datagrams by mistake. * I imagine that could happen if a user of an application typed in an address that happened * to be a broadcast address on that particular subnet. * * This is provisional because someone may eventually come up with a good reason not to * do it for all UDP sockets. If that happens, then we'll need to add a usercode-level API * to set the socket option, just like Ruby does. AND WE'LL ALSO BREAK CODE THAT DOESN'T * EXPLICITLY SET THE OPTION. */ int oval = 1; setsockopt (GetSocket(), SOL_SOCKET, SO_BROADCAST, (char*)&oval, sizeof(oval)); #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #ifdef HAVE_KQUEUE MyEventMachine->ArmKqueueReader (this); #endif } /*************************************** DatagramDescriptor::~DatagramDescriptor ***************************************/ DatagramDescriptor::~DatagramDescriptor() { // Run down any stranded outbound data. for (size_t i=0; i < OutboundPages.size(); i++) OutboundPages[i].Free(); } /***************************** DatagramDescriptor::Heartbeat *****************************/ void DatagramDescriptor::Heartbeat() { // Close it if its inactivity timer has expired. if (InactivityTimeout && ((MyEventMachine->GetCurrentLoopTime() - LastActivity) >= InactivityTimeout)) ScheduleClose (false); //bCloseNow = true; } /************************ DatagramDescriptor::Read ************************/ void DatagramDescriptor::Read() { SOCKET sd = GetSocket(); assert (sd != INVALID_SOCKET); LastActivity = MyEventMachine->GetCurrentLoopTime(); // This is an extremely large read buffer. // In many cases you wouldn't expect to get any more than 4K. char readbuffer [16 * 1024]; for (int i=0; i < 10; i++) { // Don't read just one buffer and then move on. This is faster // if there is a lot of incoming. // But don't read indefinitely. Give other sockets a chance to run. // NOTICE, we're reading one less than the buffer size. // That's so we can put a guard byte at the end of what we send // to user code. struct sockaddr_in6 sin; socklen_t slen = sizeof (sin); memset (&sin, 0, slen); int r = recvfrom (sd, readbuffer, sizeof(readbuffer) - 1, 0, (struct sockaddr*)&sin, &slen); //cerr << ""; // In UDP, a zero-length packet is perfectly legal. if (r >= 0) { // Add a null-terminator at the the end of the buffer // that we will send to the callback. // DO NOT EVER CHANGE THIS. We want to explicitly allow users // to be able to depend on this behavior, so they will have // the option to do some things faster. Additionally it's // a security guard against buffer overflows. readbuffer [r] = 0; // Set up a "temporary" return address so that callers can "reply" to us // from within the callback we are about to invoke. That means that ordinary // calls to "send_data_to_connection" (which is of course misnamed in this // case) will result in packets being sent back to the same place that sent // us this one. // There is a different call (evma_send_datagram) for cases where the caller // actually wants to send a packet somewhere else. memset (&ReturnAddress, 0, sizeof(ReturnAddress)); memcpy (&ReturnAddress, &sin, slen); _GenericInboundDispatch(readbuffer, r); } else { // Basically a would-block, meaning we've read everything there is to read. break; } } } /************************* DatagramDescriptor::Write *************************/ void DatagramDescriptor::Write() { /* It's possible for a socket to select writable and then no longer * be writable by the time we get around to writing. The kernel might * have used up its available output buffers between the select call * and when we get here. So this condition is not an error. * This code is very reminiscent of ConnectionDescriptor::_WriteOutboundData, * but differs in the that the outbound data pages (received from the * user) are _message-structured._ That is, we send each of them out * one message at a time. * TODO, we are currently suppressing the EMSGSIZE error!!! */ SOCKET sd = GetSocket(); assert (sd != INVALID_SOCKET); LastActivity = MyEventMachine->GetCurrentLoopTime(); assert (OutboundPages.size() > 0); // Send out up to 10 packets, then cycle the machine. for (int i = 0; i < 10; i++) { if (OutboundPages.size() <= 0) break; OutboundPage *op = &(OutboundPages[0]); // The nasty cast to (char*) is needed because Windows is brain-dead. int s = sendto (sd, (char*)op->Buffer, op->Length, 0, (struct sockaddr*)&(op->From), (op->From.sin6_family == AF_INET6 ? sizeof (struct sockaddr_in6) : sizeof (struct sockaddr_in))); #ifdef OS_WIN32 int e = WSAGetLastError(); #else int e = errno; #endif OutboundDataSize -= op->Length; op->Free(); OutboundPages.pop_front(); if (s == SOCKET_ERROR) { #ifdef OS_UNIX if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EINTR)) { #endif #ifdef OS_WIN32 if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK)) { #endif UnbindReasonCode = e; Close(); break; } } } #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; if (SelectForWrite()) EpollEvent.events |= EPOLLOUT; assert (MyEventMachine); MyEventMachine->Modify (this); #endif #ifdef HAVE_KQUEUE bKqueueArmWrite = SelectForWrite(); assert (MyEventMachine); MyEventMachine->Modify (this); #endif } /********************************** DatagramDescriptor::SelectForWrite **********************************/ bool DatagramDescriptor::SelectForWrite() { /* Changed 15Nov07, per bug report by Mark Zvillius. * The outbound data size will be zero if there are zero-length outbound packets, * so we now select writable in case the outbound page buffer is not empty. * Note that the superclass ShouldDelete method still checks for outbound data size, * which may be wrong. */ //return (GetOutboundDataSize() > 0); (Original) return (OutboundPages.size() > 0); } /************************************ DatagramDescriptor::SendOutboundData ************************************/ int DatagramDescriptor::SendOutboundData (const char *data, unsigned long length) { // This is almost an exact clone of ConnectionDescriptor::_SendRawOutboundData. // That means most of it could be factored to a common ancestor. Note that // empty datagrams are meaningful, which isn't the case for TCP streams. if (IsCloseScheduled()) return 0; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length, ReturnAddress)); OutboundDataSize += length; #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | EPOLLOUT); assert (MyEventMachine); MyEventMachine->Modify (this); #endif #ifdef HAVE_KQUEUE bKqueueArmWrite = true; assert (MyEventMachine); MyEventMachine->Modify (this); #endif return length; } /**************************************** DatagramDescriptor::SendOutboundDatagram ****************************************/ int DatagramDescriptor::SendOutboundDatagram (const char *data, unsigned long length, const char *address, int port) { // This is an exact clone of ConnectionDescriptor::SendOutboundData. // That means it needs to move to a common ancestor. // TODO: Refactor this so there's no overlap with SendOutboundData. if (IsCloseScheduled()) //if (bCloseNow || bCloseAfterWriting) return 0; if (!address || !*address || !port) return 0; struct sockaddr_in6 addr_here; size_t addr_here_len = sizeof addr_here; if (0 != EventMachine_t::name2address (address, port, SOCK_DGRAM, (struct sockaddr *)&addr_here, &addr_here_len)) return -1; if (!data && (length > 0)) throw std::runtime_error ("bad outbound data"); char *buffer = (char *) malloc (length + 1); if (!buffer) throw std::runtime_error ("no allocation for outbound data"); memcpy (buffer, data, length); buffer [length] = 0; OutboundPages.push_back (OutboundPage (buffer, length, addr_here)); OutboundDataSize += length; #ifdef HAVE_EPOLL EpollEvent.events = (EPOLLIN | EPOLLOUT); assert (MyEventMachine); MyEventMachine->Modify (this); #endif #ifdef HAVE_KQUEUE bKqueueArmWrite = true; assert (MyEventMachine); MyEventMachine->Modify (this); #endif return length; } /********************************************** ConnectionDescriptor::GetCommInactivityTimeout **********************************************/ uint64_t ConnectionDescriptor::GetCommInactivityTimeout() { return InactivityTimeout / 1000; } /********************************************** ConnectionDescriptor::SetCommInactivityTimeout **********************************************/ int ConnectionDescriptor::SetCommInactivityTimeout (uint64_t value) { InactivityTimeout = value * 1000; MyEventMachine->QueueHeartbeat(this); return 1; } /******************************* DatagramDescriptor::GetPeername *******************************/ bool DatagramDescriptor::GetPeername (struct sockaddr *s, socklen_t *len) { bool ok = false; if (s) { *len = sizeof(ReturnAddress); memset (s, 0, sizeof(ReturnAddress)); memcpy (s, &ReturnAddress, sizeof(ReturnAddress)); ok = true; } return ok; } /******************************************** DatagramDescriptor::GetCommInactivityTimeout ********************************************/ uint64_t DatagramDescriptor::GetCommInactivityTimeout() { return InactivityTimeout / 1000; } /******************************************** DatagramDescriptor::SetCommInactivityTimeout ********************************************/ int DatagramDescriptor::SetCommInactivityTimeout (uint64_t value) { if (value > 0) { InactivityTimeout = value * 1000; MyEventMachine->QueueHeartbeat(this); return 1; } return 0; } /************************************ InotifyDescriptor::InotifyDescriptor *************************************/ InotifyDescriptor::InotifyDescriptor (EventMachine_t *em): EventableDescriptor(0, em) { bCallbackUnbind = false; #ifndef HAVE_INOTIFY throw std::runtime_error("no inotify support on this system"); #else int fd = inotify_init(); if (fd == -1) { char buf[200]; snprintf (buf, sizeof(buf)-1, "unable to create inotify descriptor: %s", strerror(errno)); throw std::runtime_error (buf); } MySocket = fd; SetSocketNonblocking(MySocket); #ifdef HAVE_EPOLL EpollEvent.events = EPOLLIN; #endif #endif } /************************************* InotifyDescriptor::~InotifyDescriptor **************************************/ InotifyDescriptor::~InotifyDescriptor() { close(MySocket); MySocket = INVALID_SOCKET; } /*********************** InotifyDescriptor::Read ************************/ void InotifyDescriptor::Read() { assert (MyEventMachine); MyEventMachine->_ReadInotifyEvents(); } /************************ InotifyDescriptor::Write *************************/ void InotifyDescriptor::Write() { throw std::runtime_error("bad code path in inotify"); }