403Webshell
Server IP : 23.254.227.96  /  Your IP : 216.73.216.46
Web Server : Apache/2.4.62 (Unix) OpenSSL/1.1.1k
System : Linux hwsrv-1277026.hostwindsdns.com 4.18.0-477.13.1.el8_8.x86_64 #1 SMP Tue May 30 14:53:41 EDT 2023 x86_64
User : viralblo ( 1001)
PHP Version : 8.1.31
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/local/src/memcache-8.0/src/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/local/src/memcache-8.0/src/memcache_pool.c
/*
  +----------------------------------------------------------------------+
  | PHP Version 5                                                        |
  +----------------------------------------------------------------------+
  | Copyright (c) 1997-2007 The PHP Group                                |
  +----------------------------------------------------------------------+
  | This source file is subject to version 3.0 of the PHP license,       |
  | that is bundled with this package in the file LICENSE, and is        |
  | available through the world-wide-web at the following url:           |
  | http://www.php.net/license/3_0.txt.                                  |
  | If you did not receive a copy of the PHP license and are unable to   |
  | obtain it through the world-wide-web, please send a note to          |
  | license@php.net so we can mail you a copy immediately.               |
  +----------------------------------------------------------------------+
  | Authors: Antony Dovgal <tony2001@phpclub.net>                        |
  |          Mikael Johansson <mikael AT synd DOT info>                  |
  +----------------------------------------------------------------------+
*/

/* $Id$ */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <zlib.h>
#ifdef PHP_WIN32
#include <winsock2.h>
#else
#include <arpa/inet.h>
#endif

#include "php.h"
#include "php_network.h"
#include "ext/standard/crc32.h"
#include "ext/standard/php_var.h"
#include "ext/standard/php_string.h"
#include "ext/standard/php_smart_string.h"
#include "zend_smart_str.h"
#include "memcache_pool.h"

ZEND_DECLARE_MODULE_GLOBALS(memcache)

#if PHP_VERSION_ID >= 80000
#define mmc_string_concat2 zend_string_concat2
#else
static zend_string* mmc_string_concat2(
		const char *str1, size_t str1_len,
		const char *str2, size_t str2_len)
{
	size_t len = str1_len + str2_len;
	zend_string *res = zend_string_alloc(len, 0);

	memcpy(ZSTR_VAL(res), str1, str1_len);
	memcpy(ZSTR_VAL(res) + str1_len, str2, str2_len);
	ZSTR_VAL(res)[len] = '\0';

	return res;
}
#endif

MMC_POOL_INLINE void mmc_buffer_alloc(mmc_buffer_t *buffer, unsigned int size)  /*
	ensures space for an additional size bytes {{{ */
{
#if PHP_VERSION_ID < 70200
	register size_t newlen;
#endif
	smart_string_alloc((&(buffer->value)), size, 0);
}
/* }}} */

MMC_POOL_INLINE void mmc_buffer_free(mmc_buffer_t *buffer)  /* {{{ */
{
	if (buffer->value.c != NULL) {
		smart_string_free(&(buffer->value));
	}
	ZEND_SECURE_ZERO(buffer, sizeof(*buffer));
}
/* }}} */

static unsigned int mmc_hash_crc32_init()						{ return ~0; }
static unsigned int mmc_hash_crc32_finish(unsigned int seed)	{ return ~seed; }

static unsigned int mmc_hash_crc32_combine(unsigned int seed, const void *key, unsigned int key_len) /*
	CRC32 hash {{{ */
{
	const char *p = (const char *)key, *end = p + key_len;
	while (p < end) {
		CRC32(seed, *(p++));
	}

  	return seed;
}
/* }}} */

mmc_hash_function_t mmc_hash_crc32 = {
	mmc_hash_crc32_init,
	mmc_hash_crc32_combine,
	mmc_hash_crc32_finish
};

static unsigned int mmc_hash_fnv1a_combine(unsigned int seed, const void *key, unsigned int key_len) /*
	FNV-1a hash {{{ */
{
	const char *p = (const char *)key, *end = p + key_len;
	while (p < end) {
		seed ^= (unsigned int)*(p++);
		seed *= FNV_32_PRIME;
	}

	return seed;
}
/* }}} */

static unsigned int mmc_hash_fnv1a_init()						{ return FNV_32_INIT; }
static unsigned int mmc_hash_fnv1a_finish(unsigned int seed)	{ return seed; }

mmc_hash_function_t mmc_hash_fnv1a = {
	mmc_hash_fnv1a_init,
	mmc_hash_fnv1a_combine,
	mmc_hash_fnv1a_finish
};

double timeval_to_double(struct timeval tv) {
	return (double)tv.tv_sec + ((double)tv.tv_usec) / 1000000;
}

struct timeval double_to_timeval(double sec) {
	struct timeval tv;
	tv.tv_sec = (long)sec;
	tv.tv_usec = (sec - tv.tv_sec) * 1000000;
	return tv;
}

static size_t mmc_stream_read_buffered(mmc_stream_t *io, char *buf, size_t count) /*
	attempts to reads count bytes from the stream buffer {{{ */
{
	size_t toread = io->buffer.value.len - io->buffer.idx < count ? io->buffer.value.len - io->buffer.idx : count;
	memcpy(buf, io->buffer.value.c + io->buffer.idx, toread);
	io->buffer.idx += toread;
	return toread;
}
/* }}} */

static char *mmc_stream_readline_buffered(mmc_stream_t *io, char *buf, size_t maxlen, size_t *retlen)  /*
	reads count bytes from the stream buffer, this implementation only detects \r\n (like memcached sends) {{{ */
{
	char *eol;

	eol = memchr(io->buffer.value.c + io->buffer.idx, '\n', io->buffer.value.len - io->buffer.idx);
	if (eol != NULL) {
		*retlen = eol - io->buffer.value.c - io->buffer.idx + 1;
	}
	else {
		*retlen = io->buffer.value.len - io->buffer.idx;
	}

	/* ensure space for data + \0 */
	if (*retlen >= maxlen) {
		*retlen = maxlen - 1;
	}

	memcpy(buf, io->buffer.value.c + io->buffer.idx, *retlen);
	io->buffer.idx += *retlen;
	buf[*retlen] = '\0';

	return buf;
}
/* }}} */

static size_t mmc_stream_read_wrapper(mmc_stream_t *io, char *buf, size_t count)  /* {{{ */
{
	return php_stream_read(io->stream, buf, count);
}
/* }}} */

static char *mmc_stream_readline_wrapper(mmc_stream_t *io, char *buf, size_t maxlen, size_t *retlen)  /* {{{ */
{
	return php_stream_get_line(io->stream, buf, maxlen, retlen);
}
/* }}} */

void mmc_request_reset(mmc_request_t *request) /* {{{ */
{
	request->key_len = 0;
	mmc_buffer_reset(&(request->sendbuf));
	mmc_queue_reset(&(request->failed_servers));
	request->failed_index = 0;
}
/* }}} */

void mmc_request_free(mmc_request_t *request)  /* {{{ */
{
	mmc_buffer_free(&(request->sendbuf));
	mmc_buffer_free(&(request->readbuf));
	mmc_queue_free(&(request->failed_servers));
	efree(request);
}
/* }}} */

static inline int mmc_request_send(mmc_t *mmc, mmc_request_t *request) /* {{{ */
{
	int count, bytes;

	/* send next chunk of buffer */
	count = request->sendbuf.value.len - request->sendbuf.idx;
	if (count > request->io->stream->chunk_size) {
		count = request->io->stream->chunk_size;
	}

	bytes = send(request->io->fd, request->sendbuf.value.c + request->sendbuf.idx, count, MSG_NOSIGNAL);
	if (bytes >= 0) {
		request->sendbuf.idx += bytes;

		/* done sending? */
		if (request->sendbuf.idx >= request->sendbuf.value.len) {
			return MMC_REQUEST_DONE;
		}

		return MMC_REQUEST_MORE;
	}
	else {
		char *message, buf[1024];
		long err = php_socket_errno();

		if (err == EAGAIN) {
			return MMC_REQUEST_MORE;
		}

		message = php_socket_strerror(err, buf, 1024);
		return mmc_server_failure(mmc, request->io, message, err);
	}
}
/* }}} */

static int mmc_request_read_udp(mmc_t *mmc, mmc_request_t *request) /*
	reads an entire datagram into buffer and validates the udp header {{{ */
{
	size_t bytes;
	mmc_udp_header_t *header;
	uint16_t reqid, seqid;

	/* reset buffer if completely consumed */
	if (request->io->buffer.idx >= request->io->buffer.value.len) {
		mmc_buffer_reset(&(request->io->buffer));
	}

	/* attempt to read datagram + sentinel-byte */
	mmc_buffer_alloc(&(request->io->buffer), MMC_MAX_UDP_LEN + 1);
	bytes = php_stream_read(request->io->stream, request->io->buffer.value.c + request->io->buffer.value.len, MMC_MAX_UDP_LEN + 1);

	if (bytes < sizeof(mmc_udp_header_t)) {
		return mmc_server_failure(mmc, request->io, "Failed te read complete UDP header from stream", 0);
	}
	if (bytes > MMC_MAX_UDP_LEN) {
		return mmc_server_failure(mmc, request->io, "Server sent packet larger than MMC_MAX_UDP_LEN bytes", 0);
	}

	header = (mmc_udp_header_t *)(request->io->buffer.value.c + request->io->buffer.value.len);
	reqid = ntohs(header->reqid);
	seqid = ntohs(header->seqid);

	/* initialize udp header fields */
	if (!request->udp.total && request->udp.reqid == reqid) {
		request->udp.seqid = seqid;
		request->udp.total = ntohs(header->total);
	}

	/* detect dropped packets and reschedule for tcp delivery */
	if (request->udp.reqid != reqid || request->udp.seqid != seqid) {
		/* ensure that no more udp requests are scheduled for a while */
		request->io->status = MMC_STATUS_FAILED;
		request->io->failed = (long)time(NULL);

		/* discard packets for previous requests */
		if (reqid < request->udp.reqid) {
			return MMC_REQUEST_MORE;
		}

		php_error_docref(NULL, E_NOTICE, "UDP packet loss, expected reqid/seqid %d/%d got %d/%d",
			(int)request->udp.reqid, (int)request->udp.seqid, (int)reqid, (int)seqid);
		return MMC_REQUEST_RETRY;
	}

	request->udp.seqid++;

	/* skip udp header */
	if (request->io->buffer.idx > 0) {
		memmove(
			request->io->buffer.value.c + request->io->buffer.value.len,
			request->io->buffer.value.c + request->io->buffer.value.len + sizeof(mmc_udp_header_t),
			bytes - sizeof(mmc_udp_header_t));
	}
	else {
		request->io->buffer.idx += sizeof(mmc_udp_header_t);
	}

	request->io->buffer.value.len += bytes;
	return MMC_OK;
}
/* }}} */

static void mmc_compress(mmc_pool_t *pool, mmc_buffer_t *buffer, const char *value, int value_len, unsigned int *flags, int copy) /* {{{ */
{
	/* autocompress large values */
	if (pool->compress_threshold && value_len >= pool->compress_threshold) {
		*flags |= MMC_COMPRESSED;
	}

	if (*flags & MMC_COMPRESSED) {
		int status;
		mmc_buffer_t prev;
		unsigned long result_len = value_len * (1 - pool->min_compress_savings);

		if (copy) {
			/* value is already in output buffer */
			prev = *buffer;

			/* allocate space for prev header + result */
			ZEND_SECURE_ZERO(buffer, sizeof(*buffer));
			mmc_buffer_alloc(buffer, prev.value.len + result_len);

			/* append prev header */
			smart_string_appendl(&(buffer->value), prev.value.c, prev.value.len - value_len);
			buffer->idx = prev.idx;
		}
		else {
			/* allocate space directly in buffer */
			mmc_buffer_alloc(buffer, result_len);
		}

		if (MMC_COMPRESSION_LEVEL >= 0) {
			status = compress2((unsigned char *)buffer->value.c + buffer->value.len, &result_len, (unsigned const char *)value, value_len, MMC_COMPRESSION_LEVEL);
		} else {
			status = compress((unsigned char *)buffer->value.c + buffer->value.len, &result_len, (unsigned const char *)value, value_len);
		}

		if (status == Z_OK) {
			buffer->value.len += result_len;
		}
		else {
			smart_string_appendl(&(buffer->value), value, value_len);
			*flags &= ~MMC_COMPRESSED;
		}

		if (copy) {
			mmc_buffer_free(&prev);
		}
	}
	else if (!copy) {
		smart_string_appendl(&(buffer->value), value, value_len);
	}
}
/* }}}*/

static int mmc_uncompress(const char *data, unsigned long data_len, char **result, unsigned long *result_len) /* {{{ */
{
	int status, factor = 1;

	do {
		*result_len = data_len * (1 << factor++);
		*result = (char *)erealloc(*result, *result_len + 1);
		status = uncompress((unsigned char *)*result, result_len, (unsigned const char *)data, data_len);
	} while (status == Z_BUF_ERROR && factor < 16);

	if (status == Z_OK) {
		return MMC_OK;
	}

	efree(*result);
	return MMC_REQUEST_FAILURE;
}
/* }}}*/

int mmc_pack_value(mmc_pool_t *pool, mmc_buffer_t *buffer, zval *value, unsigned int *flags) /*
	does serialization and compression to pack a zval into the buffer {{{ */
{
	if (*flags & 0xffff & ~MMC_COMPRESSED) {
		php_error_docref(NULL, E_WARNING, "The lowest two bytes of the flags array is reserved for pecl/memcache internal use");
		return MMC_REQUEST_FAILURE;
	}

	*flags &= ~MMC_SERIALIZED;
	switch (Z_TYPE_P(value)) {
		case IS_STRING:
			*flags |= MMC_TYPE_STRING;
			mmc_compress(pool, buffer, Z_STRVAL_P(value), Z_STRLEN_P(value), flags, 0);
			break;

		case IS_LONG:
			*flags |= MMC_TYPE_LONG;
			*flags &= ~MMC_COMPRESSED;
			smart_string_append_long(&(buffer->value), Z_LVAL_P(value));
			break;

		case IS_DOUBLE: {
			char buf[256];
			int len = snprintf(buf, 256, "%.14g", Z_DVAL_P(value));
			*flags |= MMC_TYPE_DOUBLE;
			*flags &= ~MMC_COMPRESSED;
			smart_string_appendl(&(buffer->value), buf, len);
			break;
		}

		case IS_TRUE:
		case IS_FALSE:
			*flags |= MMC_TYPE_BOOL;
			*flags &= ~MMC_COMPRESSED;
			smart_string_appendc(&(buffer->value), Z_TYPE_P(value) == IS_TRUE ? '1' : '0');
			break;

		default: {
			php_serialize_data_t value_hash;
			zval value_copy, *value_copy_ptr;
			size_t prev_len = buffer->value.len;
			smart_str buf = {0};

			/* FIXME: we should be using 'Z' instead of this, but unfortunately it's PHP5-only */
			value_copy = *value;
			zval_copy_ctor(&value_copy);
			value_copy_ptr = &value_copy;

			PHP_VAR_SERIALIZE_INIT(value_hash);
			php_var_serialize(&buf, value_copy_ptr, &value_hash);
			PHP_VAR_SERIALIZE_DESTROY(value_hash);

			if (!buf.s) {
				zval_dtor(&value_copy);
				php_error_docref(NULL, E_WARNING, "Failed to serialize value");
				return MMC_REQUEST_FAILURE;
			}

			smart_string_appendl(&(buffer->value), ZSTR_VAL(buf.s), ZSTR_LEN(buf.s));
			smart_str_free(&buf);

			/* trying to save null or something went really wrong */
			if (buffer->value.c == NULL || buffer->value.len == prev_len) {
				zval_dtor(&value_copy);
				php_error_docref(NULL, E_WARNING, "Failed to serialize value");
				return MMC_REQUEST_FAILURE;
			}

			*flags |= MMC_SERIALIZED;
			zval_dtor(&value_copy);

			mmc_compress(pool, buffer, buffer->value.c + prev_len, buffer->value.len - prev_len, flags, 1);
		}
	}

	return MMC_OK;
}
/* }}} */

int mmc_unpack_value(
	mmc_t *mmc, mmc_request_t *request, mmc_buffer_t *buffer, const char *key, unsigned int key_len,
	unsigned int flags, unsigned long cas, unsigned int bytes) /*
	does uncompression and unserializing to reconstruct a zval {{{ */
{
	char *data = NULL;
	unsigned long data_len;

	zval object;

	if (flags & MMC_COMPRESSED) {
		if (mmc_uncompress(buffer->value.c, bytes, &data, &data_len) != MMC_OK) {
			php_error_docref(NULL, E_NOTICE, "Failed to uncompress data");
			return MMC_REQUEST_DONE;
		}
	}
	else {
		data = buffer->value.c;
		data_len = bytes;
	}

	if (flags & MMC_SERIALIZED) {
		php_unserialize_data_t var_hash;
		const unsigned char *p = (unsigned char *)data;
		char key_tmp[MMC_MAX_KEY_LEN + 1];
		mmc_request_value_handler value_handler;
		void *value_handler_param;
		mmc_buffer_t buffer_tmp;

		/* make copies of data to ensure re-entrancy */
		memcpy(key_tmp, key, key_len + 1);
		value_handler = request->value_handler;
		value_handler_param = request->value_handler_param;

		if (!(flags & MMC_COMPRESSED)) {
			buffer_tmp = *buffer;
			mmc_buffer_release(buffer);
		}

		PHP_VAR_UNSERIALIZE_INIT(var_hash);
		if (!php_var_unserialize(&object, &p, p + data_len, &var_hash)) {
			PHP_VAR_UNSERIALIZE_DESTROY(var_hash);

			if (flags & MMC_COMPRESSED) {
				efree(data);
			}
			else if (buffer->value.c == NULL) {
				*buffer = buffer_tmp;
			}
			else {
				mmc_buffer_free(&buffer_tmp);
			}

			php_error_docref(NULL, E_NOTICE, "Failed to unserialize data");
			return MMC_REQUEST_DONE;
		}

		PHP_VAR_UNSERIALIZE_DESTROY(var_hash);

		if (flags & MMC_COMPRESSED) {
			efree(data);
		}
		else if (buffer->value.c == NULL) {
			*buffer = buffer_tmp;
		}
		else {
			mmc_buffer_free(&buffer_tmp);
		}

		/* delegate to value handler */
		return value_handler(key_tmp, key_len, &object, flags, cas, value_handler_param);
	}
	else {
		switch (flags & 0x0f00) {
			case MMC_TYPE_LONG: {
				long val;
				data[data_len] = '\0';
				val = strtol(data, NULL, 10);
				ZVAL_LONG(&object, val);
				break;
			}

			case MMC_TYPE_DOUBLE: {
				double val = 0;
				data[data_len] = '\0';
				sscanf(data, "%lg", &val);
				ZVAL_DOUBLE(&object, val);
				break;
			}

			case MMC_TYPE_BOOL:
				ZVAL_BOOL(&object, data_len == 1 && data[0] == '1');
				break;

			default:
				data[data_len] = '\0';
				ZVAL_STRINGL(&object, data, data_len);
				efree(data);

				if (!(flags & MMC_COMPRESSED)) {
					/* release buffer because it's now owned by the zval */
					mmc_buffer_release(buffer);
				}
		}

		/* delegate to value handler */
		return request->value_handler(key, key_len, &object, flags, cas, request->value_handler_param);
	}
}
/* }}}*/


mmc_t *mmc_server_new(
	const char *host, int host_len, unsigned short tcp_port, unsigned short udp_port,
	int persistent, double timeout, int retry_interval) /* {{{ */
{
	mmc_t *mmc = pemalloc(sizeof(mmc_t), persistent);
	ZEND_SECURE_ZERO(mmc, sizeof(*mmc));

	mmc->host = pemalloc(host_len + 1, persistent);
	memcpy(mmc->host, host, host_len);
	mmc->host[host_len] = '\0';

	mmc->tcp.port = tcp_port;
	mmc->tcp.status = MMC_STATUS_DISCONNECTED;
	mmc->udp.port = udp_port;
	mmc->udp.status = MMC_STATUS_DISCONNECTED;

	mmc->persistent = persistent;
	mmc->timeout = double_to_timeval(timeout);

	mmc->tcp.retry_interval = retry_interval;
	mmc->tcp.chunk_size = MEMCACHE_G(chunk_size);
	mmc->udp.retry_interval = retry_interval;
	mmc->udp.chunk_size = MEMCACHE_G(chunk_size); /* = MMC_MAX_UDP_LEN;*/

	return mmc;
}
/* }}} */

static void _mmc_server_disconnect(mmc_t *mmc, mmc_stream_t *io, int close_persistent_stream) /* {{{ */
{
	mmc_buffer_free(&(io->buffer));

	if (io->stream != NULL) {
		if (mmc->persistent) {
			if (close_persistent_stream) {
				php_stream_pclose(io->stream);
			}
		}
		else {
			php_stream_close(io->stream);
		}

		io->stream = NULL;
		io->fd = 0;
	}

	io->status = MMC_STATUS_DISCONNECTED;
}
/* }}} */

void mmc_server_disconnect(mmc_t *mmc, mmc_stream_t *io) /* {{{ */
{
	_mmc_server_disconnect(mmc, io, 1);
}
/* }}} */

static void mmc_server_seterror(mmc_t *mmc, const char *error, int errnum) /* {{{ */
{
	if (error != NULL) {
		if (mmc->error != NULL) {
			efree(mmc->error);
		}

		mmc->error = estrdup(error);
		mmc->errnum = errnum;
	}
}
/* }}} */

static void mmc_server_deactivate(mmc_pool_t *pool, mmc_t *mmc) /*
	disconnect and marks the server as down, failovers all queued requests {{{ */
{
	mmc_queue_t readqueue;
	mmc_request_t *request;

	mmc_server_disconnect(mmc, &(mmc->tcp));
	mmc_server_disconnect(mmc, &(mmc->udp));

	mmc->tcp.status = MMC_STATUS_FAILED;
	mmc->udp.status = MMC_STATUS_FAILED;
	mmc->tcp.failed = (long)time(NULL);
	mmc->udp.failed = mmc->tcp.failed;

	mmc_queue_remove(pool->sending, mmc);
	mmc_queue_remove(pool->reading, mmc);

	/* failover queued requests, sendque can be ignored since
	 * readque + readreq + buildreq will always contain all active requests */
	mmc_queue_reset(&(mmc->sendqueue));
	mmc->sendreq = NULL;

	readqueue = mmc->readqueue;
	mmc_queue_release(&(mmc->readqueue));

	if (mmc->readreq != NULL) {
		mmc_queue_push(&readqueue, mmc->readreq);
		mmc->readreq = NULL;
	}

	if (mmc->buildreq != NULL) {
		mmc_queue_push(&readqueue, mmc->buildreq);
		mmc->buildreq = NULL;
	}

	/* delegate to failover handlers */
	while ((request = mmc_queue_pop(&readqueue)) != NULL) {
		request->failover_handler(pool, mmc, request, request->failover_handler_param);
	}

	mmc_queue_free(&readqueue);

	/* fire userspace failure event */
	if (pool->failure_callback != NULL) {
		pool->failure_callback(pool, mmc, &pool->failure_callback_param);
	}
}
/* }}} */

int mmc_server_failure(mmc_t *mmc, mmc_stream_t *io, const char *error, int errnum) /*
	determines if a request should be retried or is a hard network failure {{{ */
{
	switch (io->status) {
		case MMC_STATUS_DISCONNECTED:
			return MMC_REQUEST_RETRY;

		/* attempt reconnect of sockets in unknown state */
		case MMC_STATUS_UNKNOWN:
			io->status = MMC_STATUS_DISCONNECTED;
			return MMC_REQUEST_RETRY;
	}

	mmc_server_seterror(mmc, error, errnum);
	return MMC_REQUEST_FAILURE;
}
/* }}} */

int mmc_request_failure(mmc_t *mmc, mmc_stream_t *io, const char *message, unsigned int message_len, int errnum) /*
	 checks for a valid server generated error message and calls mmc_server_failure() {{{ */
{
	if (message_len) {
		return mmc_server_failure(mmc, io, message, errnum);
	}

	return mmc_server_failure(mmc, io, "Malformed server response", errnum);
}
/* }}} */

static int mmc_server_connect(mmc_pool_t *pool, mmc_t *mmc, mmc_stream_t *io, int udp) /*
	connects a stream, calls mmc_server_deactivate() on failure {{{ */
{
	char *host, *hash_key = NULL;
	zend_string *errstr = NULL;
	int	host_len, errnum = 0;
	struct timeval tv = mmc->timeout;
	int fd;

	/* close open stream */
	if (io->stream != NULL) {
		mmc_server_disconnect(mmc, io);
	}

	if (mmc->persistent) {
		spprintf(&hash_key, 0, "memcache:stream:%s:%u:%d", mmc->host, io->port, udp);
	}

	if (udp) {
		host_len = spprintf(&host, 0, "udp://%s:%u", mmc->host, io->port);
	}
	else if (io->port) {
		host_len = spprintf(&host, 0, "%s:%u", mmc->host, io->port);
	}
	else {
		host_len = spprintf(&host, 0, "%s", mmc->host);
	}

	io->stream = php_stream_xport_create(
		host, host_len,
		REPORT_ERRORS | (mmc->persistent ? STREAM_OPEN_PERSISTENT : 0),
		STREAM_XPORT_CLIENT | STREAM_XPORT_CONNECT,
		hash_key, &tv, NULL, &errstr, &errnum);

	efree(host);

	if (hash_key != NULL) {
		efree(hash_key);
	}

	/* check connection and extract socket for select() purposes */
	if (!io->stream || php_stream_cast(io->stream, PHP_STREAM_AS_FD_FOR_SELECT, (void **)&fd, 1) != SUCCESS) {
		if (errstr != NULL) {
			zend_string* error = mmc_string_concat2(
				"Connection failed: ", sizeof("Connection failed: ") - 1,
				ZSTR_VAL(errstr), ZSTR_LEN(errstr));

			mmc_server_seterror(mmc, ZSTR_VAL(error), errnum);
			zend_string_release(error);
		} else {
			mmc_server_seterror(mmc, "Connection failed", errnum);
		}
		mmc_server_deactivate(pool, mmc);

		if (errstr != NULL) {
			efree(errstr);
		}

		return MMC_REQUEST_FAILURE;
	}
	php_stream_auto_cleanup(io->stream);
	php_stream_set_chunk_size(io->stream, io->chunk_size);
	php_stream_set_option(io->stream, PHP_STREAM_OPTION_BLOCKING, 0, NULL);
	php_stream_set_option(io->stream, PHP_STREAM_OPTION_READ_TIMEOUT, 0, &(mmc->timeout));

	/* doing our own buffering increases performance */
	php_stream_set_option(io->stream, PHP_STREAM_OPTION_READ_BUFFER, PHP_STREAM_BUFFER_NONE, NULL);
	php_stream_set_option(io->stream, PHP_STREAM_OPTION_WRITE_BUFFER, PHP_STREAM_BUFFER_NONE, NULL);

	io->fd = fd;
	io->status = MMC_STATUS_CONNECTED;

	/* php_stream buffering prevent us from detecting datagram boundaries when using udp */
	if (udp) {
		io->read = mmc_stream_read_buffered;
		io->readline = mmc_stream_readline_buffered;
	}
	else {
		io->read = mmc_stream_read_wrapper;
		io->readline = mmc_stream_readline_wrapper;
	}
	
#ifdef SO_NOSIGPIPE
	/* Mac OS X doesn't have MSG_NOSIGNAL */
	{
		int optval = 1;
		setsockopt(io->fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)&optval, sizeof(optval));
	}
#endif

	if (mmc->error != NULL) {
		efree(mmc->error);
		mmc->error = NULL;
	}

	return MMC_OK;
}
/* }}} */

int mmc_server_valid(mmc_t *mmc) /*
	checks if a server should be considered valid to serve requests {{{ */
{
	if (mmc != NULL) {
		if (mmc->tcp.status >= MMC_STATUS_DISCONNECTED) {
			return 1;
		}

		if (mmc->tcp.status == MMC_STATUS_FAILED &&
			mmc->tcp.retry_interval >= 0 && (long)time(NULL) >= mmc->tcp.failed + mmc->tcp.retry_interval) {
			return 1;
		}
	}

	return 0;
}
/* }}} */

void mmc_server_sleep(mmc_t *mmc) /*
	prepare server struct for persistent sleep {{{ */
{
	mmc_buffer_free(&(mmc->tcp.buffer));
	mmc_buffer_free(&(mmc->udp.buffer));

	mmc->sendreq = NULL;
	mmc->readreq = NULL;
	mmc->buildreq = NULL;

	mmc_queue_free(&(mmc->sendqueue));
	mmc_queue_free(&(mmc->readqueue));

	if (mmc->error != NULL) {
		efree(mmc->error);
		mmc->error = NULL;
	}
}
/* }}} */

void mmc_server_free(mmc_t *mmc) /* {{{ */
{
	mmc_server_sleep(mmc);

	_mmc_server_disconnect(mmc, &(mmc->tcp), 0);
	_mmc_server_disconnect(mmc, &(mmc->udp), 0);

	pefree(mmc->host, mmc->persistent);
	pefree(mmc, mmc->persistent);
}
/* }}} */

static void mmc_pool_init_hash(mmc_pool_t *pool) /* {{{ */
{
	mmc_hash_function_t *hash;

	switch (MEMCACHE_G(hash_strategy)) {
		case MMC_CONSISTENT_HASH:
			pool->hash = &mmc_consistent_hash;
			break;
		default:
			pool->hash = &mmc_standard_hash;
	}

	switch (MEMCACHE_G(hash_function)) {
		case MMC_HASH_FNV1A:
			hash = &mmc_hash_fnv1a;
			break;
		default:
			hash = &mmc_hash_crc32;
	}

	pool->hash_state = pool->hash->create_state(hash);
}
/* }}} */

mmc_pool_t *mmc_pool_new() /* {{{ */
{
	mmc_pool_t *pool = emalloc(sizeof(mmc_pool_t));
	ZEND_SECURE_ZERO(pool, sizeof(*pool));

	switch (MEMCACHE_G(protocol)) {
		case MMC_BINARY_PROTOCOL:
			pool->protocol = &mmc_binary_protocol;
			break;
		default:
			pool->protocol = &mmc_ascii_protocol;
	}

	mmc_pool_init_hash(pool);
	pool->compress_threshold = MEMCACHE_G(compress_threshold);
	pool->min_compress_savings = MMC_DEFAULT_SAVINGS;

	pool->sending = &(pool->_sending1);
	pool->reading = &(pool->_reading1);

	return pool;
}
/* }}} */

void mmc_pool_free(mmc_pool_t *pool) /* {{{ */
{
	int i;
	mmc_request_t *request;

	for (i=0; i<pool->num_servers; i++) {
		if (pool->servers[i] != NULL) {
			if (pool->servers[i]->persistent) {
				mmc_server_sleep(pool->servers[i]);
			} else {
				mmc_server_free(pool->servers[i]);
			}
			pool->servers[i] = NULL;
		}
	}

	if (pool->num_servers) {
		efree(pool->servers);
	}

	pool->hash->free_state(pool->hash_state);

	mmc_queue_free(&(pool->_sending1));
	mmc_queue_free(&(pool->_sending2));
	mmc_queue_free(&(pool->_reading1));
	mmc_queue_free(&(pool->_reading2));
	mmc_queue_free(&(pool->pending));

	/* requests are owned by us so free them */
	while ((request = mmc_queue_pop(&(pool->free_requests))) != NULL) {
		pool->protocol->free_request(request);
	}
	mmc_queue_free(&(pool->free_requests));

	efree(pool);
}
/* }}} */

void mmc_pool_add(mmc_pool_t *pool, mmc_t *mmc, unsigned int weight) /*
	adds a server to the pool and hash strategy {{{ */
{
	pool->hash->add_server(pool->hash_state, mmc, weight);
	pool->servers = erealloc(pool->servers, sizeof(*pool->servers) * (pool->num_servers + 1));
	pool->servers[pool->num_servers] = mmc;

	/* store the smallest timeout for any server */
	if (!pool->num_servers || timeval_to_double(mmc->timeout) < timeval_to_double(pool->timeout)) {
		pool->timeout = mmc->timeout;
	}

	pool->num_servers++;
}
/* }}} */

void mmc_pool_close(mmc_pool_t *pool) /*
	disconnects and removes all servers in the pool {{{ */
{
	if (pool->num_servers) {
		int i;

		for (i=0; i<pool->num_servers; i++) {
			if (pool->servers[i]->persistent) {
				mmc_server_sleep(pool->servers[i]);
			} else {
				mmc_server_free(pool->servers[i]);
			}
		}

		efree(pool->servers);
		pool->servers = NULL;
		pool->num_servers = 0;

		/* reallocate the hash strategy state */
		pool->hash->free_state(pool->hash_state);
		mmc_pool_init_hash(pool);
	}
}
/* }}} */

int mmc_pool_open(mmc_pool_t *pool, mmc_t *mmc, mmc_stream_t *io, int udp) /*
	connects if the stream is not already connected {{{ */
{
	switch (io->status) {
		case MMC_STATUS_CONNECTED:
		case MMC_STATUS_UNKNOWN:
			return MMC_OK;

		case MMC_STATUS_DISCONNECTED:
		case MMC_STATUS_FAILED:

			return mmc_server_connect(pool, mmc, io, udp);
	}

	return MMC_REQUEST_FAILURE;
}
/* }}} */

mmc_t *mmc_pool_find_next(mmc_pool_t *pool, const char *key, unsigned int key_len, mmc_queue_t *skip_servers, unsigned int *last_index) /*
	finds the next server in the failover sequence {{{ */
{
	mmc_t *mmc;
	char keytmp[MMC_MAX_KEY_LEN + MAX_LENGTH_OF_LONG + 1];
	unsigned int keytmp_len;

	/* find the next server not present in the skip-list */
	do {
		keytmp_len = sprintf(keytmp, "%s-%d", key, (*last_index)++);
		mmc = pool->hash->find_server(pool->hash_state, keytmp, keytmp_len);
	} while (mmc_queue_contains(skip_servers, mmc) && *last_index < MEMCACHE_G(max_failover_attempts));

	return mmc;
}

mmc_t *mmc_pool_find(mmc_pool_t *pool, const char *key, unsigned int key_len) /*
	maps a key to a non-failed server {{{ */
{
	mmc_t *mmc = pool->hash->find_server(pool->hash_state, key, key_len);

	/* check validity and try to failover otherwise */
	if (!mmc_server_valid(mmc) && MEMCACHE_G(allow_failover)) {
		unsigned int last_index = 0;

		do {
			mmc = mmc_pool_find_next(pool, key, key_len, NULL, &last_index);
		} while (!mmc_server_valid(mmc) && last_index < MEMCACHE_G(max_failover_attempts));
	}

	return mmc;
}
/* }}} */

int mmc_pool_failover_handler(mmc_pool_t *pool, mmc_t *mmc, mmc_request_t *request, void *param) /*
	uses request->key to reschedule request to other server {{{ */
{
	if (MEMCACHE_G(allow_failover) && request->failed_index < MEMCACHE_G(max_failover_attempts) && request->failed_servers.len < pool->num_servers) {
		do {
			mmc_queue_push(&(request->failed_servers), mmc);
			mmc = mmc_pool_find_next(pool, request->key, request->key_len, &(request->failed_servers), &(request->failed_index));
		} while (!mmc_server_valid(mmc) && request->failed_index < MEMCACHE_G(max_failover_attempts) && request->failed_servers.len < pool->num_servers);

		return mmc_pool_schedule(pool, mmc, request);
	}

	mmc_pool_release(pool, request);
	return MMC_REQUEST_FAILURE;
}
/* }}}*/

int mmc_pool_failover_handler_null(mmc_pool_t *pool, mmc_t *mmc, mmc_request_t *request, void *param) /*
	always returns failure {{{ */
{
	mmc_pool_release(pool, request);
	return MMC_REQUEST_FAILURE;
}
/* }}}*/

static int mmc_pool_response_handler_null(mmc_t *mmc, mmc_request_t *request, int response, const char *message, unsigned int message_len, void *param) /*
	always returns done {{{ */
{
	return MMC_REQUEST_DONE;
}
/* }}}*/

static inline mmc_request_t *mmc_pool_request_alloc(mmc_pool_t *pool, int protocol,
	mmc_request_failover_handler failover_handler, void *failover_handler_param) /* {{{ */
{
	mmc_request_t *request = mmc_queue_pop(&(pool->free_requests));
	if (request == NULL) {
		request = pool->protocol->create_request();
	}
	else {
		pool->protocol->reset_request(request);
	}

	request->protocol = protocol;

	if (protocol == MMC_PROTO_UDP) {
		mmc_udp_header_t header = {0};
		smart_string_appendl(&(request->sendbuf.value), (const char *)&header, sizeof(header));
	}

	request->failover_handler = failover_handler != NULL ? failover_handler : mmc_pool_failover_handler_null;
	request->failover_handler_param = failover_handler_param;

	return request;
}
/* }}} */

mmc_request_t *mmc_pool_request(mmc_pool_t *pool, int protocol,
	mmc_request_response_handler response_handler, void *response_handler_param,
	mmc_request_failover_handler failover_handler, void *failover_handler_param) /*
	allocates a request, must be added to pool using mmc_pool_schedule or released with mmc_pool_release {{{ */
{
	mmc_request_t *request = mmc_pool_request_alloc(pool, protocol, failover_handler, failover_handler_param);
	request->response_handler = response_handler;
	request->response_handler_param = response_handler_param;
	return request;
}
/* }}} */

mmc_request_t *mmc_pool_request_get(mmc_pool_t *pool, int protocol,
	mmc_request_value_handler value_handler, void *value_handler_param,
	mmc_request_failover_handler failover_handler, void *failover_handler_param) /*
	allocates a request, must be added to pool using mmc_pool_schedule or released with mmc_pool_release {{{ */
{
	mmc_request_t *request = mmc_pool_request(
		pool, protocol,
		mmc_pool_response_handler_null, NULL,
		failover_handler, failover_handler_param);

	request->value_handler = value_handler;
	request->value_handler_param = value_handler_param;
	return request;
}
/* }}} */

mmc_request_t *mmc_pool_clone_request(mmc_pool_t *pool, mmc_request_t *request) /*
	clones a request, must be added to pool using mmc_pool_schedule or released with mmc_pool_release {{{ */
{
	mmc_request_t *clone = mmc_pool_request_alloc(pool, request->protocol, NULL, NULL);

	clone->response_handler = request->response_handler;
	clone->response_handler_param = request->response_handler_param;
	clone->value_handler = request->value_handler;
	clone->value_handler_param = request->value_handler_param;

	/* copy payload parser */
	clone->parse = request->parse;

	/* copy key */
	memcpy(clone->key, request->key, request->key_len);
	clone->key_len = request->key_len;

	/* copy sendbuf */
	mmc_buffer_alloc(&(clone->sendbuf), request->sendbuf.value.len);
	memcpy(clone->sendbuf.value.c, request->sendbuf.value.c, request->sendbuf.value.len);
	clone->sendbuf.value.len = request->sendbuf.value.len;

	/* copy protocol specific values */
	pool->protocol->clone_request(clone, request);

	return clone;
}
/* }}} */

static int mmc_pool_slot_send(mmc_pool_t *pool, mmc_t *mmc, mmc_request_t *request, int handle_failover) /* {{{ */
{
	if (request != NULL) {
		/* select protocol strategy and open connection */
		if (request->protocol == MMC_PROTO_UDP && mmc->udp.port &&
			request->sendbuf.value.len <= mmc->udp.chunk_size &&
			mmc_pool_open(pool, mmc, &(mmc->udp), 1) == MMC_OK)
		{
			request->io = &(mmc->udp);
			request->read = mmc_request_read_udp;

			/* initialize udp header */
			request->udp.reqid = mmc->reqid++;
			request->udp.seqid = 0;
			request->udp.total = 0;

			((mmc_udp_header_t *)request->sendbuf.value.c)->reqid = htons(request->udp.reqid);
			((mmc_udp_header_t *)request->sendbuf.value.c)->total = htons(1);
		}
		else if (mmc_pool_open(pool, mmc, &(mmc->tcp), 0) == MMC_OK) {
			/* skip udp header */
			if (request->protocol == MMC_PROTO_UDP) {
				request->sendbuf.idx += sizeof(mmc_udp_header_t);
			}

			request->io = &(mmc->tcp);
			request->read = NULL;
		}
		else {
			mmc->sendreq = NULL;
			if (handle_failover) {
				return request->failover_handler(pool, mmc, request, request->failover_handler_param);
			}
			return MMC_REQUEST_FAILURE;
		}
	}

	mmc->sendreq = request;
	return MMC_OK;
}
/* }}} */

int mmc_pool_schedule(mmc_pool_t *pool, mmc_t *mmc, mmc_request_t *request) /*
	schedules a request against a server, return MMC_OK on success {{{ */
{
	if (!mmc_server_valid(mmc)) {
		/* delegate to failover handler if connect fails */
		return request->failover_handler(pool, mmc, request, request->failover_handler_param);
	}

	/* reset sendbuf to start position */
	request->sendbuf.idx = 0;
	/* reset readbuf entirely */
	mmc_buffer_reset(&(request->readbuf));

	/* push request into sendreq slot if available */
	if (mmc->sendreq == NULL) {
		if (mmc_pool_slot_send(pool, mmc, request, 0) != MMC_OK) {
			return request->failover_handler(pool, mmc, request, request->failover_handler_param);
		}
		mmc_queue_push(pool->sending, mmc);
	}
	else {
		mmc_queue_push(&(mmc->sendqueue), request);
	}

	/* push request into readreq slot if available */
	if (mmc->readreq == NULL) {
		mmc->readreq = request;
		mmc_queue_push(pool->reading, mmc);
	}
	else {
		mmc_queue_push(&(mmc->readqueue), request);
	}

	return MMC_OK;
}
/* }}} */

int mmc_pool_schedule_key(mmc_pool_t *pool, const char *key, unsigned int key_len, mmc_request_t *request, unsigned int redundancy) /*
	schedules a request against a server selected by the provided key, return MMC_OK on success {{{ */
{
	if (redundancy > 1) {
		int i, result;
		mmc_t *mmc;

		unsigned int last_index = 0;
		mmc_queue_t skip_servers = {0};

		/* schedule the first request */
		mmc = mmc_pool_find(pool, key, key_len);
		result = mmc_pool_schedule(pool, mmc, request);

		/* clone and schedule redundancy-1 additional requests */
		for (i=0; i < redundancy-1 && i < pool->num_servers-1; i++) {
			mmc_queue_push(&skip_servers, mmc);
			mmc = mmc_pool_find_next(pool, key, key_len, &skip_servers, &last_index);

			if (mmc_server_valid(mmc)) {
				mmc_pool_schedule(pool, mmc, mmc_pool_clone_request(pool, request));
			}
		}

		mmc_queue_free(&skip_servers);
		return result;
	}

	return mmc_pool_schedule(pool, mmc_pool_find(pool, key, key_len), request);
}
/* }}} */

int mmc_pool_schedule_get(
	mmc_pool_t *pool, int protocol, int op, zval *zkey,
	mmc_request_value_handler value_handler, void *value_handler_param,
	mmc_request_failover_handler failover_handler, void *failover_handler_param,
	mmc_request_t *failed_request) /*
	schedules a get command against a server {{{ */
{
	mmc_t *mmc;
	char key[MMC_MAX_KEY_LEN + 1];
	unsigned int key_len;

	if (mmc_prepare_key(zkey, key, &key_len) != MMC_OK) {
		php_error_docref(NULL, E_WARNING, "Invalid key");
		return MMC_REQUEST_FAILURE;
	}

	mmc = mmc_pool_find(pool, key, key_len);
	if (!mmc_server_valid(mmc)) {
		return MMC_REQUEST_FAILURE;
	}

	if (mmc->buildreq == NULL) {
		mmc_queue_push(&(pool->pending), mmc);

		mmc->buildreq = mmc_pool_request_get(
			pool, protocol, value_handler, value_handler_param,
			failover_handler, failover_handler_param);

		if (failed_request != NULL) {
			mmc_queue_copy(&(mmc->buildreq->failed_servers), &(failed_request->failed_servers));
			mmc->buildreq->failed_index = failed_request->failed_index;
		}

		pool->protocol->begin_get(mmc->buildreq, op);
	}
	else if (protocol == MMC_PROTO_UDP && mmc->buildreq->sendbuf.value.len + key_len + 3 > MMC_MAX_UDP_LEN) {
		/* datagram if full, schedule for delivery */
		pool->protocol->end_get(mmc->buildreq);
		mmc_pool_schedule(pool, mmc, mmc->buildreq);

		/* begin sending requests immediatly */
		mmc_pool_select(pool);

		mmc->buildreq = mmc_pool_request_get(
			pool, protocol, value_handler, value_handler_param,
			failover_handler, failover_handler_param);

		if (failed_request != NULL) {
			mmc_queue_copy(&(mmc->buildreq->failed_servers), &(failed_request->failed_servers));
			mmc->buildreq->failed_index = failed_request->failed_index;
		}

		pool->protocol->begin_get(mmc->buildreq, op);
	}

	pool->protocol->append_get(mmc->buildreq, zkey, key, key_len);
	return MMC_OK;
}
/* }}} */

static inline void mmc_pool_switch(mmc_pool_t *pool) {
	/* switch sending and reading queues */
	if (pool->sending == &(pool->_sending1)) {
		pool->sending = &(pool->_sending2);
		pool->reading = &(pool->_reading2);
	}
	else {
		pool->sending = &(pool->_sending1);
		pool->reading = &(pool->_reading1);
	}

	/* reset queues so they can be re-populated */
	mmc_queue_reset(pool->sending);
	mmc_queue_reset(pool->reading);
}

static int mmc_select_failure(mmc_pool_t *pool, mmc_t *mmc, mmc_request_t *request, int result) /* {{{ */
{
	if (result == 0) {
		/* timeout expired, non-responsive server */
		if (mmc_server_failure(mmc, request->io, "Network timeout", 0) == MMC_REQUEST_RETRY) {
			return MMC_REQUEST_RETRY;
		}
	}
	else {
		char buf[1024];
		const char *message;
		long err = php_socket_errno();

		if (err) {
			message = php_socket_strerror(err, buf, 1024);
		}
		else {
			message = "Unknown select() error";
		}

		mmc_server_seterror(mmc, message, errno);
	}

	/* hard failure, deactivate connection */
	mmc_server_deactivate(pool, mmc);
	return MMC_REQUEST_FAILURE;
}
/* }}} */

static void mmc_select_retry(mmc_pool_t *pool, mmc_t *mmc, mmc_request_t *request) /*
	removes request from send/read queues and calls failover {{{ */
{
	/* clear out failed request from queues */
	mmc_queue_remove(&(mmc->sendqueue), request);
	mmc_queue_remove(&(mmc->readqueue), request);

	/* shift next request into send slot */
	if (mmc->sendreq == request) {
		mmc_pool_slot_send(pool, mmc, mmc_queue_pop(&(mmc->sendqueue)), 1);

		/* clear out connection from send queue if no new request was slotted */
		if (!mmc->sendreq) {
			mmc_queue_remove(pool->sending, mmc);
		}
	}

	/* shift next request into read slot */
	if (mmc->readreq == request) {
		mmc->readreq = mmc_queue_pop(&(mmc->readqueue));

		/* clear out connection from read queue if no new request was slotted */
		if (!mmc->readreq) {
			mmc_queue_remove(pool->reading, mmc);
		}
	}

	request->failover_handler(pool, mmc, request, request->failover_handler_param);
}
/* }}} */

void mmc_pool_select(mmc_pool_t *pool) /*
	runs one select() round on all scheduled requests {{{ */
{
	int i, fd, result;
	mmc_t *mmc;
	mmc_queue_t *sending, *reading;

	/* help complete previous run */
	if (pool->in_select) {
		if (pool->sending == &(pool->_sending1)) {
			sending = &(pool->_sending2);
			reading = &(pool->_reading2);
		}
		else {
			sending = &(pool->_sending1);
			reading = &(pool->_reading1);
		}
	}
	else {
		int nfds = 0;
		struct timeval tv = pool->timeout;

		sending = pool->sending;
		reading = pool->reading;
		mmc_pool_switch(pool);

		FD_ZERO(&(pool->wfds));
		FD_ZERO(&(pool->rfds));

		for (i=0; i < sending->len; i++) {
			mmc = mmc_queue_item(sending, i);
			if (mmc->sendreq->io->fd > nfds) {
				nfds = mmc->sendreq->io->fd;
			}
			FD_SET(mmc->sendreq->io->fd, &(pool->wfds));
		}

		for (i=0; i < reading->len; i++) {
			mmc = mmc_queue_item(reading, i);
			if (mmc->readreq->io->fd > nfds) {
				nfds = mmc->readreq->io->fd;
			}
			FD_SET(mmc->readreq->io->fd, &(pool->rfds));
		}

		result = select(nfds + 1, &(pool->rfds), &(pool->wfds), NULL, &tv);

		/* if select timed out */
		if (result <= 0) {
			for (i=0; i < sending->len; i++) {
				mmc = (mmc_t *)mmc_queue_item(sending, i);
				
				/* remove sending request */
				if (!FD_ISSET(mmc->sendreq->io->fd, &(pool->wfds))) {
					mmc_queue_remove(sending, mmc);
					mmc_queue_remove(reading, mmc);
					i--;

					if (mmc_select_failure(pool, mmc, mmc->sendreq, result) == MMC_REQUEST_RETRY) {
						/* allow request to try and send again */
						mmc_select_retry(pool, mmc, mmc->sendreq);
					}
				}
			}

			for (i=0; i < reading->len; i++) {
				mmc = (mmc_t *)mmc_queue_item(reading, i);
				
				/* remove reading request */
				if (!FD_ISSET(mmc->readreq->io->fd, &(pool->rfds))) {
					mmc_queue_remove(sending, mmc);
					mmc_queue_remove(reading, mmc);
					i--;

					if (mmc_select_failure(pool, mmc, mmc->readreq, result) == MMC_REQUEST_RETRY) {
						/* allow request to try and read again */
						mmc_select_retry(pool, mmc, mmc->readreq);
					}
				}
			}
		}

		pool->in_select = 1;
	}

	for (i=0; i < sending->len; i++) {
		mmc = mmc_queue_item(sending, i);

		/* skip servers which have failed */
		if (!mmc->sendreq) {
			continue;
		}

		if (FD_ISSET(mmc->sendreq->io->fd, &(pool->wfds))) {
			fd = mmc->sendreq->io->fd;

			/* clear bit for reentrancy reasons */
			FD_CLR(fd, &(pool->wfds));

			/* until stream buffer is empty */
			do {
				/* delegate to request send handler */
				result = mmc_request_send(mmc, mmc->sendreq);

				/* check if someone has helped complete our run */
				if (!pool->in_select) {
					return;
				}

				switch (result) {
					case MMC_REQUEST_FAILURE:
						/* take server offline and failover requests */
						mmc_server_deactivate(pool, mmc);

						/* server is failed, remove from read queue */
						mmc_queue_remove(reading, mmc);
						break;

					case MMC_REQUEST_RETRY:
						/* allow request to reschedule itself */
						mmc_select_retry(pool, mmc, mmc->sendreq);
						break;

					case MMC_REQUEST_DONE:
						/* shift next request into send slot */
						mmc_pool_slot_send(pool, mmc, mmc_queue_pop(&(mmc->sendqueue)), 1);
						break;

					case MMC_REQUEST_MORE:
						/* send more data to socket */
						break;

					default:
						php_error_docref(NULL, E_ERROR, "Invalid return value, bailing out");
				}
			} while (mmc->sendreq != NULL && (result == MMC_REQUEST_DONE || result == MMC_REQUEST_AGAIN));

			if (result == MMC_REQUEST_MORE) {
				/* add server to read queue once more */
				mmc_queue_push(pool->sending, mmc);
			}
		}
		else {
			/* add server to send queue once more */
			mmc_queue_push(pool->sending, mmc);
		}

		if ( ! pool->sending->len && ( mmc->sendreq != NULL || mmc->sendqueue.len ) ) {
			php_error_docref( NULL, E_WARNING, "mmc_pool_select() failed to cleanup when sending! Sendqueue: %d", mmc->sendqueue.len );
		}
	}

	for (i=0; i < reading->len; i++) {
		mmc = mmc_queue_item(reading, i);

		/* skip servers which have failed */
		if (!mmc->readreq) {
			continue;
		}

		if (FD_ISSET(mmc->readreq->io->fd, &(pool->rfds))) {
			fd = mmc->readreq->io->fd;

			/* clear bit for reentrancy reasons */
			FD_CLR(fd, &(pool->rfds));

			/* fill read buffer if needed */
			if (mmc->readreq->read != NULL) {
				result = mmc->readreq->read(mmc, mmc->readreq);

				if (result != MMC_OK) {
					switch (result) {
						case MMC_REQUEST_FAILURE:
							/* take server offline and failover requests */
							mmc_server_deactivate(pool, mmc);
							break;

						case MMC_REQUEST_RETRY:
							/* allow request to reschedule itself */
							mmc_select_retry(pool, mmc, mmc->readreq);
							break;

						case MMC_REQUEST_MORE:
							/* add server to read queue once more */
							mmc_queue_push(pool->reading, mmc);
							break;

						default:
							php_error_docref(NULL, E_ERROR, "Invalid return value, bailing out");
					}

					/* skip to next request */
					continue;
				}
			}

			/* until stream buffer is empty */
			do {
				/* delegate to request response handler */
				result = mmc->readreq->parse(mmc, mmc->readreq);

				/* check if someone has helped complete our run */
				if (!pool->in_select) {
					return;
				}

				switch (result) {
					case MMC_REQUEST_FAILURE:
						/* take server offline and failover requests */
						mmc_server_deactivate(pool, mmc);
						break;

					case MMC_REQUEST_RETRY:
						/* allow request to reschedule itself */
						mmc_select_retry(pool, mmc, mmc->readreq);
						break;

					case MMC_REQUEST_DONE:
						/* might have completed without having sent all data (e.g. object too large errors) */
						if (mmc->sendreq == mmc->readreq) {
							/* disconnect stream since data may have been sent before we received the SERVER_ERROR */
							mmc_server_disconnect(mmc, mmc->readreq->io);

							/* shift next request into send slot */
							mmc_pool_slot_send(pool, mmc, mmc_queue_pop(&(mmc->sendqueue)), 1);

							/* clear out connection from send queue if no new request was slotted */
							if (!mmc->sendreq) {
								mmc_queue_remove(pool->sending, mmc);
							}
						}

						/* release completed request */
						mmc_pool_release(pool, mmc->readreq);

						/* shift next request into read slot */
						mmc->readreq = mmc_queue_pop(&(mmc->readqueue));
						break;

					case MMC_REQUEST_MORE:
						/* read more data from socket */
						if (php_stream_eof(mmc->readreq->io->stream)) {
							result = mmc_server_failure(mmc, mmc->readreq->io, "Read failed (socket was unexpectedly closed)", 0);
							if (result == MMC_REQUEST_FAILURE) {
								/* take server offline and failover requests */
								mmc_server_deactivate(pool, mmc);
							} else {
								mmc_select_retry(pool, mmc, mmc->readreq);
							}
						}
						break;

					case MMC_REQUEST_AGAIN:
						/* request wants another loop */
						break;

					default:
						php_error_docref(NULL, E_ERROR, "Invalid return value, bailing out");
				}
			} while (mmc->readreq != NULL && (result == MMC_REQUEST_DONE || result == MMC_REQUEST_AGAIN));

			if (result == MMC_REQUEST_MORE) {
				/* add server to read queue once more */
				mmc_queue_push(pool->reading, mmc);
			}
		}
		else {
			/* add server to read queue once more */
			mmc_queue_push(pool->reading, mmc);
		}

		if ( ! pool->reading->len && ( mmc->readreq != NULL || mmc->readqueue.len ) ) {
			php_error_docref( NULL, E_WARNING, "mmc_pool_select() failed to cleanup when reading! Readqueue: %d", mmc->readqueue.len );
		}
	}

	pool->in_select = 0;
}
/* }}} */

void mmc_pool_schedule_pending(mmc_pool_t *pool) {
	mmc_t *mmc;
	while ((mmc = mmc_queue_pop(&(pool->pending))) != NULL) {
		pool->protocol->end_get(mmc->buildreq);
		mmc_pool_schedule(pool, mmc, mmc->buildreq);
		mmc->buildreq = NULL;
	}
}

void mmc_pool_run(mmc_pool_t *pool)  /*
	runs all scheduled requests to completion {{{ */
{
	mmc_t *mmc;

	mmc_pool_schedule_pending(pool);

	while (pool->reading->len || pool->sending->len) {
		mmc_pool_select(pool);
		mmc_pool_schedule_pending(pool);
	}
}
/* }}} */

MMC_POOL_INLINE int mmc_prepare_key_ex(const char *key, unsigned int key_len, char *result, unsigned int *result_len, char *prefix)  /* {{{ */
{
	unsigned int i, j, prefix_len=0;

	if (key_len == 0) {
		return MMC_REQUEST_FAILURE;
	}

	if (prefix) {
		prefix_len = strlen(prefix);
	}

	*result_len = (prefix_len + key_len) < MMC_MAX_KEY_LEN ? (prefix_len + key_len) : MMC_MAX_KEY_LEN;
	result[*result_len] = '\0';

	if (prefix_len) {
		for (i=0; i<prefix_len; i++) {
			result[i] = ((unsigned char)prefix[i]) > ' ' ? prefix[i] : '_';
		}

		for (j=0; j+i<*result_len; j++) {
			result[j+i] = ((unsigned char)key[j]) > ' ' ? key[j] : '_';
		}

		result[*result_len] = '\0';
	} else {
		for (i=0; i<*result_len; i++) {
			result[i] = ((unsigned char)key[i]) > ' ' ? key[i] : '_';
		}
	}

	return MMC_OK;
}
/* }}} */

MMC_POOL_INLINE int mmc_prepare_key(zval *key, char *result, unsigned int *result_len)  /* {{{ */
{
	if (Z_TYPE_P(key) == IS_STRING) {
		return mmc_prepare_key_ex(Z_STRVAL_P(key), Z_STRLEN_P(key), result, result_len, MEMCACHE_G(key_prefix));
	} else {
		int res;
		zval keytmp = *key;

		zval_copy_ctor(&keytmp);
		convert_to_string(&keytmp);

		res = mmc_prepare_key_ex(Z_STRVAL(keytmp), Z_STRLEN(keytmp), result, result_len, MEMCACHE_G(key_prefix));

		zval_dtor(&keytmp);
		return res;
	}
}
/* }}} */


/*
 * Local variables:
 * tab-width: 4
 * c-basic-offset: 4
 * End:
 * vim600: noet sw=4 ts=4 fdm=marker
 * vim<600: noet sw=4 ts=4
 */

Youez - 2016 - github.com/yon3zu
LinuXploit