rf-web/vendor/bundle/gems/eventmachine-1.2.7/ext/pipe.cpp

355 lines
10 KiB
C++
Raw Normal View History

2019-10-21 08:18:17 +00:00
/*****************************************************************************
$Id$
File: pipe.cpp
Date: 30May07
Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
Gmail: blackhedd
This program is free software; you can redistribute it and/or modify
it under the terms of either: 1) the GNU General Public License
as published by the Free Software Foundation; either version 2 of the
License, or (at your option) any later version; or 2) Ruby's License.
See the file COPYING for complete licensing information.
*****************************************************************************/
#include "project.h"
#ifdef OS_UNIX
// THIS ENTIRE FILE IS ONLY COMPILED ON UNIX-LIKE SYSTEMS.
/******************************
PipeDescriptor::PipeDescriptor
******************************/
PipeDescriptor::PipeDescriptor (int fd, pid_t subpid, EventMachine_t *parent_em):
EventableDescriptor (fd, parent_em),
bReadAttemptedAfterClose (false),
OutboundDataSize (0),
SubprocessPid (subpid)
{
#ifdef HAVE_EPOLL
EpollEvent.events = EPOLLIN;
#endif
#ifdef HAVE_KQUEUE
MyEventMachine->ArmKqueueReader (this);
#endif
}
/*******************************
PipeDescriptor::~PipeDescriptor
*******************************/
PipeDescriptor::~PipeDescriptor() NO_EXCEPT_FALSE
{
// Run down any stranded outbound data.
for (size_t i=0; i < OutboundPages.size(); i++)
OutboundPages[i].Free();
/* As a virtual destructor, we come here before the base-class
* destructor that closes our file-descriptor.
* We have to make sure the subprocess goes down (if it's not
* already down) and we have to reap the zombie.
*
* This implementation is PROVISIONAL and will surely be improved.
* The intention here is that we never block, hence the highly
* undesirable sleeps. But if we can't reap the subprocess even
* after sending it SIGKILL, then something is wrong and we
* throw a fatal exception, which is also not something we should
* be doing.
*
* Eventually the right thing to do will be to have the reactor
* core respond to SIGCHLD by chaining a handler on top of the
* one Ruby may have installed, and dealing with a list of dead
* children that are pending cleanup.
*
* Since we want to have a signal processor integrated into the
* client-visible API, let's wait until that is done before cleaning
* this up.
*
* Added a very ugly hack to support passing the subprocess's exit
* status to the user. It only makes logical sense for user code to access
* the subprocess exit status in the unbind callback. But unbind is called
* back during the EventableDescriptor destructor. So by that time there's
* no way to call back this object through an object binding, because it's
* already been cleaned up. We might have added a parameter to the unbind
* callback, but that would probably break a huge amount of existing code.
* So the hack-solution is to define an instance variable in the EventMachine
* object and stick the exit status in there, where it can easily be accessed
* with an accessor visible to user code.
* User code should ONLY access the exit status from within the unbind callback.
* Otherwise there's no guarantee it'll be valid.
* This hack won't make it impossible to run multiple EventMachines in a single
* process, but it will make it impossible to reliably nest unbind calls
* within other unbind calls. (Not sure if that's even possible.)
*/
assert (MyEventMachine);
/* Another hack to make the SubprocessPid available to get_subprocess_status */
MyEventMachine->SubprocessPid = SubprocessPid;
/* 01Mar09: Updated to use a small nanosleep in a loop. When nanosleep is interrupted by SIGCHLD,
* it resumes the system call after processing the signal (resulting in unnecessary latency).
* Calling nanosleep in a loop avoids this problem.
*/
struct timespec req = {0, 50000000}; // 0.05s
int n;
// wait 0.5s for the process to die
for (n=0; n<10; n++) {
if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return;
nanosleep (&req, NULL);
}
// send SIGTERM and wait another 1s
kill (SubprocessPid, SIGTERM);
for (n=0; n<20; n++) {
nanosleep (&req, NULL);
if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return;
}
// send SIGKILL and wait another 5s
kill (SubprocessPid, SIGKILL);
for (n=0; n<100; n++) {
nanosleep (&req, NULL);
if (waitpid (SubprocessPid, &(MyEventMachine->SubprocessExitStatus), WNOHANG) != 0) return;
}
// still not dead, give up!
throw std::runtime_error ("unable to reap subprocess");
}
/********************
PipeDescriptor::Read
********************/
void PipeDescriptor::Read()
{
int sd = GetSocket();
if (sd == INVALID_SOCKET) {
assert (!bReadAttemptedAfterClose);
bReadAttemptedAfterClose = true;
return;
}
LastActivity = MyEventMachine->GetCurrentLoopTime();
int total_bytes_read = 0;
char readbuffer [16 * 1024];
for (int i=0; i < 10; i++) {
// Don't read just one buffer and then move on. This is faster
// if there is a lot of incoming.
// But don't read indefinitely. Give other sockets a chance to run.
// NOTICE, we're reading one less than the buffer size.
// That's so we can put a guard byte at the end of what we send
// to user code.
// Use read instead of recv, which on Linux gives a "socket operation
// on nonsocket" error.
int r = read (sd, readbuffer, sizeof(readbuffer) - 1);
//cerr << "<R:" << r << ">";
if (r > 0) {
total_bytes_read += r;
// Add a null-terminator at the the end of the buffer
// that we will send to the callback.
// DO NOT EVER CHANGE THIS. We want to explicitly allow users
// to be able to depend on this behavior, so they will have
// the option to do some things faster. Additionally it's
// a security guard against buffer overflows.
readbuffer [r] = 0;
_GenericInboundDispatch(readbuffer, r);
}
else if (r == 0) {
break;
}
else {
// Basically a would-block, meaning we've read everything there is to read.
break;
}
}
if (total_bytes_read == 0) {
// If we read no data on a socket that selected readable,
// it generally means the other end closed the connection gracefully.
ScheduleClose (false);
//bCloseNow = true;
}
}
/*********************
PipeDescriptor::Write
*********************/
void PipeDescriptor::Write()
{
int sd = GetSocket();
assert (sd != INVALID_SOCKET);
LastActivity = MyEventMachine->GetCurrentLoopTime();
char output_buffer [16 * 1024];
size_t nbytes = 0;
while ((OutboundPages.size() > 0) && (nbytes < sizeof(output_buffer))) {
OutboundPage *op = &(OutboundPages[0]);
if ((nbytes + op->Length - op->Offset) < sizeof (output_buffer)) {
memcpy (output_buffer + nbytes, op->Buffer + op->Offset, op->Length - op->Offset);
nbytes += (op->Length - op->Offset);
op->Free();
OutboundPages.pop_front();
}
else {
int len = sizeof(output_buffer) - nbytes;
memcpy (output_buffer + nbytes, op->Buffer + op->Offset, len);
op->Offset += len;
nbytes += len;
}
}
// We should never have gotten here if there were no data to write,
// so assert that as a sanity check.
// Don't bother to make sure nbytes is less than output_buffer because
// if it were we probably would have crashed already.
assert (nbytes > 0);
assert (GetSocket() != INVALID_SOCKET);
int bytes_written = write (GetSocket(), output_buffer, nbytes);
#ifdef OS_WIN32
int e = WSAGetLastError();
#else
int e = errno;
#endif
if (bytes_written > 0) {
OutboundDataSize -= bytes_written;
if ((size_t)bytes_written < nbytes) {
int len = nbytes - bytes_written;
char *buffer = (char*) malloc (len + 1);
if (!buffer)
throw std::runtime_error ("bad alloc throwing back data");
memcpy (buffer, output_buffer + bytes_written, len);
buffer [len] = 0;
OutboundPages.push_front (OutboundPage (buffer, len));
}
#ifdef HAVE_EPOLL
EpollEvent.events = EPOLLIN;
if (SelectForWrite())
EpollEvent.events |= EPOLLOUT;
assert (MyEventMachine);
MyEventMachine->Modify (this);
#endif
}
else {
#ifdef OS_UNIX
if ((e != EINPROGRESS) && (e != EWOULDBLOCK) && (e != EINTR))
#endif
#ifdef OS_WIN32
if ((e != WSAEINPROGRESS) && (e != WSAEWOULDBLOCK))
#endif
Close();
}
}
/*************************
PipeDescriptor::Heartbeat
*************************/
void PipeDescriptor::Heartbeat()
{
// If an inactivity timeout is defined, then check for it.
if (InactivityTimeout && ((MyEventMachine->GetCurrentLoopTime() - LastActivity) >= InactivityTimeout))
ScheduleClose (false);
//bCloseNow = true;
}
/*****************************
PipeDescriptor::SelectForRead
*****************************/
bool PipeDescriptor::SelectForRead()
{
/* Pipe descriptors, being local by definition, don't have
* a pending state, so this is simpler than for the
* ConnectionDescriptor object.
*/
return bPaused ? false : true;
}
/******************************
PipeDescriptor::SelectForWrite
******************************/
bool PipeDescriptor::SelectForWrite()
{
/* Pipe descriptors, being local by definition, don't have
* a pending state, so this is simpler than for the
* ConnectionDescriptor object.
*/
return (GetOutboundDataSize() > 0) && !bPaused ? true : false;
}
/********************************
PipeDescriptor::SendOutboundData
********************************/
int PipeDescriptor::SendOutboundData (const char *data, unsigned long length)
{
//if (bCloseNow || bCloseAfterWriting)
if (IsCloseScheduled())
return 0;
if (!data && (length > 0))
throw std::runtime_error ("bad outbound data");
char *buffer = (char *) malloc (length + 1);
if (!buffer)
throw std::runtime_error ("no allocation for outbound data");
memcpy (buffer, data, length);
buffer [length] = 0;
OutboundPages.push_back (OutboundPage (buffer, length));
OutboundDataSize += length;
#ifdef HAVE_EPOLL
EpollEvent.events = (EPOLLIN | EPOLLOUT);
assert (MyEventMachine);
MyEventMachine->Modify (this);
#endif
return length;
}
/********************************
PipeDescriptor::GetSubprocessPid
********************************/
bool PipeDescriptor::GetSubprocessPid (pid_t *pid)
{
bool ok = false;
if (pid && (SubprocessPid > 0)) {
*pid = SubprocessPid;
ok = true;
}
return ok;
}
#endif // OS_UNIX