/*	$NetBSD: netmgr.c,v 1.1.2.2 2024/02/24 13:07:29 martin Exp $	*/

/*
 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
 *
 * SPDX-License-Identifier: MPL-2.0
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
 *
 * See the COPYRIGHT file distributed with this work for additional
 * information regarding copyright ownership.
 */

#include <inttypes.h>
#include <unistd.h>
#include <uv.h>
#ifdef HAVE_LIBCTRACE
#include <execinfo.h>
#endif /* ifdef HAVE_LIBCTRACE */

#include <isc/atomic.h>
#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
#include <isc/list.h>
#include <isc/log.h>
#include <isc/magic.h>
#include <isc/mem.h>
#include <isc/netmgr.h>
#include <isc/print.h>
#include <isc/quota.h>
#include <isc/random.h>
#include <isc/refcount.h>
#include <isc/region.h>
#include <isc/result.h>
#include <isc/sockaddr.h>
#include <isc/stats.h>
#include <isc/strerr.h>
#include <isc/task.h>
#include <isc/thread.h>
#include <isc/util.h>

#include "netmgr-int.h"
#include "netmgr_p.h"
#include "openssl_shim.h"
#include "trampoline_p.h"
#include "uv-compat.h"

/*%
 * How many isc_nmhandles and isc_nm_uvreqs will we be
 * caching for reuse in a socket.
 */
#define ISC_NM_HANDLES_STACK_SIZE 600
#define ISC_NM_REQS_STACK_SIZE	  600

/*%
 * Shortcut index arrays to get access to statistics counters.
 */

static const isc_statscounter_t udp4statsindex[] = {
	isc_sockstatscounter_udp4open,
	isc_sockstatscounter_udp4openfail,
	isc_sockstatscounter_udp4close,
	isc_sockstatscounter_udp4bindfail,
	isc_sockstatscounter_udp4connectfail,
	isc_sockstatscounter_udp4connect,
	-1,
	-1,
	isc_sockstatscounter_udp4sendfail,
	isc_sockstatscounter_udp4recvfail,
	isc_sockstatscounter_udp4active
};

static const isc_statscounter_t udp6statsindex[] = {
	isc_sockstatscounter_udp6open,
	isc_sockstatscounter_udp6openfail,
	isc_sockstatscounter_udp6close,
	isc_sockstatscounter_udp6bindfail,
	isc_sockstatscounter_udp6connectfail,
	isc_sockstatscounter_udp6connect,
	-1,
	-1,
	isc_sockstatscounter_udp6sendfail,
	isc_sockstatscounter_udp6recvfail,
	isc_sockstatscounter_udp6active
};

static const isc_statscounter_t tcp4statsindex[] = {
	isc_sockstatscounter_tcp4open,	      isc_sockstatscounter_tcp4openfail,
	isc_sockstatscounter_tcp4close,	      isc_sockstatscounter_tcp4bindfail,
	isc_sockstatscounter_tcp4connectfail, isc_sockstatscounter_tcp4connect,
	isc_sockstatscounter_tcp4acceptfail,  isc_sockstatscounter_tcp4accept,
	isc_sockstatscounter_tcp4sendfail,    isc_sockstatscounter_tcp4recvfail,
	isc_sockstatscounter_tcp4active
};

static const isc_statscounter_t tcp6statsindex[] = {
	isc_sockstatscounter_tcp6open,	      isc_sockstatscounter_tcp6openfail,
	isc_sockstatscounter_tcp6close,	      isc_sockstatscounter_tcp6bindfail,
	isc_sockstatscounter_tcp6connectfail, isc_sockstatscounter_tcp6connect,
	isc_sockstatscounter_tcp6acceptfail,  isc_sockstatscounter_tcp6accept,
	isc_sockstatscounter_tcp6sendfail,    isc_sockstatscounter_tcp6recvfail,
	isc_sockstatscounter_tcp6active
};

#if 0
/* XXX: not currently used */
static const isc_statscounter_t unixstatsindex[] = {
	isc_sockstatscounter_unixopen,
	isc_sockstatscounter_unixopenfail,
	isc_sockstatscounter_unixclose,
	isc_sockstatscounter_unixbindfail,
	isc_sockstatscounter_unixconnectfail,
	isc_sockstatscounter_unixconnect,
	isc_sockstatscounter_unixacceptfail,
	isc_sockstatscounter_unixaccept,
	isc_sockstatscounter_unixsendfail,
	isc_sockstatscounter_unixrecvfail,
	isc_sockstatscounter_unixactive
};
#endif /* if 0 */

/*
 * libuv is not thread safe, but has mechanisms to pass messages
 * between threads. Each socket is owned by a thread. For UDP
 * sockets we have a set of sockets for each interface and we can
 * choose a sibling and send the message directly. For TCP, or if
 * we're calling from a non-networking thread, we need to pass the
 * request using async_cb.
 */

#if defined(HAVE_THREAD_LOCAL)
#include <threads.h>
static thread_local int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
#elif defined(HAVE___THREAD)
static __thread int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
#elif HAVE___DECLSPEC_THREAD
__declspec(thread) int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN;
#endif /* if defined(HAVE_THREAD_LOCAL) */

static void
nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG);
static void
nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle);
static isc_threadresult_t
nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);

static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type);
static void
wait_for_priority_queue(isc__networker_t *worker);
static void
drain_queue(isc__networker_t *worker, netievent_type_t type);

static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0);

static void
isc__nm_threadpool_initialize(uint32_t workers);
static void
isc__nm_work_cb(uv_work_t *req);
static void
isc__nm_after_work_cb(uv_work_t *req, int status);

void
isc__nmsocket_reset(isc_nmsocket_t *sock);

/*%<
 * Issue a 'handle closed' callback on the socket.
 */

static void
nmhandle_detach_cb(isc_nmhandle_t **handlep FLARG);

int
isc_nm_tid(void) {
	return (isc__nm_tid_v);
}

bool
isc__nm_in_netthread(void) {
	return (isc__nm_tid_v >= 0);
}

#ifdef WIN32
static void
isc__nm_winsock_initialize(void) {
	WORD wVersionRequested = MAKEWORD(2, 2);
	WSADATA wsaData;
	int result;

	result = WSAStartup(wVersionRequested, &wsaData);
	if (result != 0) {
		char strbuf[ISC_STRERRORSIZE];
		strerror_r(result, strbuf, sizeof(strbuf));
		UNEXPECTED_ERROR(__FILE__, __LINE__,
				 "WSAStartup() failed with error code %lu: %s",
				 result, strbuf);
	}

	/*
	 * Confirm that the WinSock DLL supports version 2.2.  Note that if the
	 * DLL supports versions greater than 2.2 in addition to 2.2, it will
	 * still return 2.2 in wVersion since that is the version we requested.
	 */
	if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) {
		UNEXPECTED_ERROR(__FILE__, __LINE__,
				 "Unusable WinSock DLL version: %u.%u",
				 LOBYTE(wsaData.wVersion),
				 HIBYTE(wsaData.wVersion));
	}
}

static void
isc__nm_winsock_destroy(void) {
	WSACleanup();
}
#endif /* WIN32 */

static void
isc__nm_threadpool_initialize(uint32_t nworkers) {
	char buf[11];
	int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
			     &(size_t){ sizeof(buf) });
	if (r == UV_ENOENT) {
		snprintf(buf, sizeof(buf), "%" PRIu32, nworkers);
		uv_os_setenv("UV_THREADPOOL_SIZE", buf);
	}
}

#if HAVE_DECL_UV_UDP_MMSG_FREE
#define MINIMAL_UV_VERSION UV_VERSION(1, 40, 0)
#elif HAVE_DECL_UV_UDP_RECVMMSG
#define MAXIMAL_UV_VERSION UV_VERSION(1, 39, 99)
#define MINIMAL_UV_VERSION UV_VERSION(1, 37, 0)
#elif _WIN32
#define MINIMAL_UV_VERSION UV_VERSION(1, 0, 0)
#else
#define MAXIMAL_UV_VERSION UV_VERSION(1, 34, 99)
#define MINIMAL_UV_VERSION UV_VERSION(1, 0, 0)
#endif

void
isc__netmgr_create(isc_mem_t *mctx, uint32_t nworkers, isc_nm_t **netmgrp) {
	isc_nm_t *mgr = NULL;
	char name[32];

	REQUIRE(nworkers > 0);

#ifdef MAXIMAL_UV_VERSION
	if (uv_version() > MAXIMAL_UV_VERSION) {
		isc_error_fatal(__FILE__, __LINE__,
				"libuv version too new: running with libuv %s "
				"when compiled with libuv %s will lead to "
				"libuv failures",
				uv_version_string(), UV_VERSION_STRING);
	}
#endif /* MAXIMAL_UV_VERSION */

	if (uv_version() < MINIMAL_UV_VERSION) {
		isc_error_fatal(__FILE__, __LINE__,
				"libuv version too old: running with libuv %s "
				"when compiled with libuv %s will lead to "
				"libuv failures",
				uv_version_string(), UV_VERSION_STRING);
	}

#ifdef WIN32
	isc__nm_winsock_initialize();
#endif /* WIN32 */

	isc__nm_threadpool_initialize(nworkers);

	mgr = isc_mem_get(mctx, sizeof(*mgr));
	*mgr = (isc_nm_t){
		.nworkers = nworkers * 2,
		.nlisteners = nworkers,
	};

	isc_mem_attach(mctx, &mgr->mctx);
	isc_mutex_init(&mgr->lock);
	isc_condition_init(&mgr->wkstatecond);
	isc_condition_init(&mgr->wkpausecond);
	isc_refcount_init(&mgr->references, 1);
	atomic_init(&mgr->maxudp, 0);
	atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED);
	atomic_init(&mgr->workers_paused, 0);
	atomic_init(&mgr->paused, false);
	atomic_init(&mgr->closing, false);
#if HAVE_SO_REUSEPORT_LB
	mgr->load_balance_sockets = true;
#else
	mgr->load_balance_sockets = false;
#endif

#ifdef NETMGR_TRACE
	ISC_LIST_INIT(mgr->active_sockets);
#endif

	/*
	 * Default TCP timeout values.
	 * May be updated by isc_nm_tcptimeouts().
	 */
	atomic_init(&mgr->init, 30000);
	atomic_init(&mgr->idle, 30000);
	atomic_init(&mgr->keepalive, 30000);
	atomic_init(&mgr->advertised, 30000);

	isc_barrier_init(&mgr->pausing, mgr->nworkers);
	isc_barrier_init(&mgr->resuming, mgr->nworkers);

	mgr->workers = isc_mem_get(mctx,
				   mgr->nworkers * sizeof(isc__networker_t));
	for (int i = 0; i < mgr->nworkers; i++) {
		isc__networker_t *worker = &mgr->workers[i];
		int r;

		*worker = (isc__networker_t){
			.mgr = mgr,
			.id = i,
		};

		r = uv_loop_init(&worker->loop);
		UV_RUNTIME_CHECK(uv_loop_init, r);

		worker->loop.data = &mgr->workers[i];

		r = uv_async_init(&worker->loop, &worker->async, async_cb);
		UV_RUNTIME_CHECK(uv_async_init, r);

		for (size_t type = 0; type < NETIEVENT_MAX; type++) {
			isc_mutex_init(&worker->ievents[type].lock);
			isc_condition_init(&worker->ievents[type].cond);
			ISC_LIST_INIT(worker->ievents[type].list);
		}

		worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
		worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);

		/*
		 * We need to do this here and not in nm_thread to avoid a
		 * race - we could exit isc_nm_start, launch nm_destroy,
		 * and nm_thread would still not be up.
		 */
		mgr->workers_running++;
		isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread);

		snprintf(name, sizeof(name), "net-%d", i);
		isc_thread_setname(worker->thread, name);
	}

	mgr->magic = NM_MAGIC;
	*netmgrp = mgr;
}

/*
 * Free the resources of the network manager.
 */
static void
nm_destroy(isc_nm_t **mgr0) {
	REQUIRE(VALID_NM(*mgr0));
	REQUIRE(!isc__nm_in_netthread());

	isc_nm_t *mgr = *mgr0;
	*mgr0 = NULL;

	isc_refcount_destroy(&mgr->references);

	mgr->magic = 0;

	for (int i = 0; i < mgr->nworkers; i++) {
		isc__networker_t *worker = &mgr->workers[i];
		isc__netievent_t *event = isc__nm_get_netievent_stop(mgr);
		isc__nm_enqueue_ievent(worker, event);
	}

	LOCK(&mgr->lock);
	while (mgr->workers_running > 0) {
		WAIT(&mgr->wkstatecond, &mgr->lock);
	}
	UNLOCK(&mgr->lock);

	for (int i = 0; i < mgr->nworkers; i++) {
		isc__networker_t *worker = &mgr->workers[i];
		int r;

		r = uv_loop_close(&worker->loop);
		UV_RUNTIME_CHECK(uv_loop_close, r);

		for (size_t type = 0; type < NETIEVENT_MAX; type++) {
			INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
			isc_condition_destroy(&worker->ievents[type].cond);
			isc_mutex_destroy(&worker->ievents[type].lock);
		}

		isc_mem_put(mgr->mctx, worker->sendbuf,
			    ISC_NETMGR_SENDBUF_SIZE);
		isc_mem_put(mgr->mctx, worker->recvbuf,
			    ISC_NETMGR_RECVBUF_SIZE);
		isc_thread_join(worker->thread, NULL);
	}

	if (mgr->stats != NULL) {
		isc_stats_detach(&mgr->stats);
	}

	isc_barrier_destroy(&mgr->resuming);
	isc_barrier_destroy(&mgr->pausing);

	isc_condition_destroy(&mgr->wkstatecond);
	isc_condition_destroy(&mgr->wkpausecond);
	isc_mutex_destroy(&mgr->lock);

	isc_mem_put(mgr->mctx, mgr->workers,
		    mgr->nworkers * sizeof(isc__networker_t));
	isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr));

#ifdef WIN32
	isc__nm_winsock_destroy();
#endif /* WIN32 */
}

static void
enqueue_pause(isc__networker_t *worker) {
	isc__netievent_pause_t *event =
		isc__nm_get_netievent_pause(worker->mgr);
	isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event);
}

static void
isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0) {
	UNUSED(ev0);
	REQUIRE(worker->paused == false);

	worker->paused = true;
	uv_stop(&worker->loop);
}

void
isc_nm_pause(isc_nm_t *mgr) {
	REQUIRE(VALID_NM(mgr));
	REQUIRE(!atomic_load(&mgr->paused));

	isc__nm_acquire_interlocked_force(mgr);

	if (isc__nm_in_netthread()) {
		REQUIRE(isc_nm_tid() == 0);
	}

	for (int i = 0; i < mgr->nworkers; i++) {
		isc__networker_t *worker = &mgr->workers[i];
		if (i == isc_nm_tid()) {
			isc__nm_async_pause(worker, NULL);
		} else {
			enqueue_pause(worker);
		}
	}

	if (isc__nm_in_netthread()) {
		atomic_fetch_add(&mgr->workers_paused, 1);
		isc_barrier_wait(&mgr->pausing);
	}

	LOCK(&mgr->lock);
	while (atomic_load(&mgr->workers_paused) != mgr->workers_running) {
		WAIT(&mgr->wkstatecond, &mgr->lock);
	}
	UNLOCK(&mgr->lock);

	REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false },
					       true));
}

static void
enqueue_resume(isc__networker_t *worker) {
	isc__netievent_resume_t *event =
		isc__nm_get_netievent_resume(worker->mgr);
	isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event);
}

static void
isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0) {
	UNUSED(ev0);
	REQUIRE(worker->paused == true);

	worker->paused = false;
}

void
isc_nm_resume(isc_nm_t *mgr) {
	REQUIRE(VALID_NM(mgr));
	REQUIRE(atomic_load(&mgr->paused));

	if (isc__nm_in_netthread()) {
		REQUIRE(isc_nm_tid() == 0);
		drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIORITY);
	}

	for (int i = 0; i < mgr->nworkers; i++) {
		isc__networker_t *worker = &mgr->workers[i];
		if (i == isc_nm_tid()) {
			isc__nm_async_resume(worker, NULL);
		} else {
			enqueue_resume(worker);
		}
	}

	if (isc__nm_in_netthread()) {
		drain_queue(&mgr->workers[isc_nm_tid()], NETIEVENT_PRIVILEGED);

		atomic_fetch_sub(&mgr->workers_paused, 1);
		isc_barrier_wait(&mgr->resuming);
	}

	LOCK(&mgr->lock);
	while (atomic_load(&mgr->workers_paused) != 0) {
		WAIT(&mgr->wkstatecond, &mgr->lock);
	}
	UNLOCK(&mgr->lock);

	REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true },
					       false));

	isc__nm_drop_interlocked(mgr);
}

void
isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst) {
	REQUIRE(VALID_NM(mgr));
	REQUIRE(dst != NULL && *dst == NULL);

	isc_refcount_increment(&mgr->references);

	*dst = mgr;
}

void
isc_nm_detach(isc_nm_t **mgr0) {
	isc_nm_t *mgr = NULL;

	REQUIRE(mgr0 != NULL);
	REQUIRE(VALID_NM(*mgr0));

	mgr = *mgr0;
	*mgr0 = NULL;

	if (isc_refcount_decrement(&mgr->references) == 1) {
		nm_destroy(&mgr);
	}
}

void
isc__netmgr_shutdown(isc_nm_t *mgr) {
	REQUIRE(VALID_NM(mgr));

	atomic_store(&mgr->closing, true);
	for (int i = 0; i < mgr->nworkers; i++) {
		isc__netievent_t *event = NULL;
		event = isc__nm_get_netievent_shutdown(mgr);
		isc__nm_enqueue_ievent(&mgr->workers[i], event);
	}
}

void
isc__netmgr_destroy(isc_nm_t **netmgrp) {
	isc_nm_t *mgr = NULL;
	int counter = 0;

	REQUIRE(VALID_NM(*netmgrp));

	mgr = *netmgrp;

	/*
	 * Close active connections.
	 */
	isc__netmgr_shutdown(mgr);

	/*
	 * Wait for the manager to be dereferenced elsewhere.
	 */
	while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) {
		uv_sleep(10);
	}

#ifdef NETMGR_TRACE
	if (isc_refcount_current(&mgr->references) > 1) {
		isc__nm_dump_active(mgr);
		UNREACHABLE();
	}
#endif

	/*
	 * Now just patiently wait
	 */
	while (isc_refcount_current(&mgr->references) > 1) {
		uv_sleep(10);
	}

	/*
	 * Detach final reference.
	 */
	isc_nm_detach(netmgrp);
}

void
isc_nm_maxudp(isc_nm_t *mgr, uint32_t maxudp) {
	REQUIRE(VALID_NM(mgr));

	atomic_store(&mgr->maxudp, maxudp);
}

void
isc_nmhandle_setwritetimeout(isc_nmhandle_t *handle, uint64_t write_timeout) {
	REQUIRE(VALID_NMHANDLE(handle));
	REQUIRE(VALID_NMSOCK(handle->sock));

	handle->sock->write_timeout = write_timeout;
}

void
isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
		   uint32_t keepalive, uint32_t advertised) {
	REQUIRE(VALID_NM(mgr));

	atomic_store(&mgr->init, init);
	atomic_store(&mgr->idle, idle);
	atomic_store(&mgr->keepalive, keepalive);
	atomic_store(&mgr->advertised, advertised);
}

bool
isc_nm_getloadbalancesockets(isc_nm_t *mgr) {
	REQUIRE(VALID_NM(mgr));

	return (mgr->load_balance_sockets);
}

void
isc_nm_setloadbalancesockets(isc_nm_t *mgr, bool enabled) {
	REQUIRE(VALID_NM(mgr));

#if HAVE_SO_REUSEPORT_LB
	mgr->load_balance_sockets = enabled;
#else
	UNUSED(enabled);
#endif
}

void
isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
		   uint32_t *keepalive, uint32_t *advertised) {
	REQUIRE(VALID_NM(mgr));

	if (initial != NULL) {
		*initial = atomic_load(&mgr->init);
	}

	if (idle != NULL) {
		*idle = atomic_load(&mgr->idle);
	}

	if (keepalive != NULL) {
		*keepalive = atomic_load(&mgr->keepalive);
	}

	if (advertised != NULL) {
		*advertised = atomic_load(&mgr->advertised);
	}
}

/*
 * nm_thread is a single worker thread, that runs uv_run event loop
 * until asked to stop.
 *
 * There are four queues for asynchronous events:
 *
 * 1. priority queue - netievents on the priority queue are run even when
 *    the taskmgr enters exclusive mode and the netmgr is paused.  This
 *    is needed to properly start listening on the interfaces, free
 *    resources on shutdown, or resume from a pause.
 *
 * 2. privileged task queue - only privileged tasks are queued here and
 *    this is the first queue that gets processed when network manager
 *    is unpaused using isc_nm_resume().  All netmgr workers need to
 *    clean the privileged task queue before they all proceed to normal
 *    operation.  Both task queues are processed when the workers are
 *    shutting down.
 *
 * 3. task queue - only (traditional) tasks are scheduled here, and this
 *    queue and the privileged task queue are both processed when the
 *    netmgr workers are finishing.  This is needed to process the task
 *    shutdown events.
 *
 * 4. normal queue - this is the queue with netmgr events, e.g. reading,
 *    sending, callbacks, etc.
 */

static isc_threadresult_t
nm_thread(isc_threadarg_t worker0) {
	isc__networker_t *worker = (isc__networker_t *)worker0;
	isc_nm_t *mgr = worker->mgr;

	isc__nm_tid_v = worker->id;

	while (true) {
		/*
		 * uv_run() runs async_cb() in a loop, which processes
		 * all four event queues until a "pause" or "stop" event
		 * is encountered. On pause, we process only priority and
		 * privileged events until resuming.
		 */
		int r = uv_run(&worker->loop, UV_RUN_DEFAULT);
		INSIST(r > 0 || worker->finished);

		if (worker->paused) {
			INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid());

			atomic_fetch_add(&mgr->workers_paused, 1);
			if (isc_barrier_wait(&mgr->pausing) != 0) {
				LOCK(&mgr->lock);
				SIGNAL(&mgr->wkstatecond);
				UNLOCK(&mgr->lock);
			}

			while (worker->paused) {
				wait_for_priority_queue(worker);
			}

			/*
			 * All workers must drain the privileged event
			 * queue before we resume from pause.
			 */
			drain_queue(worker, NETIEVENT_PRIVILEGED);

			atomic_fetch_sub(&mgr->workers_paused, 1);
			if (isc_barrier_wait(&mgr->resuming) != 0) {
				LOCK(&mgr->lock);
				SIGNAL(&mgr->wkstatecond);
				UNLOCK(&mgr->lock);
			}
		}

		if (r == 0) {
			INSIST(worker->finished);
			break;
		}

		INSIST(!worker->finished);
	}

	/*
	 * We are shutting down.  Drain the queues.
	 */
	drain_queue(worker, NETIEVENT_PRIVILEGED);
	drain_queue(worker, NETIEVENT_TASK);

	for (size_t type = 0; type < NETIEVENT_MAX; type++) {
		LOCK(&worker->ievents[type].lock);
		INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
		UNLOCK(&worker->ievents[type].lock);
	}

	LOCK(&mgr->lock);
	mgr->workers_running--;
	SIGNAL(&mgr->wkstatecond);
	UNLOCK(&mgr->lock);

	return ((isc_threadresult_t)0);
}

static bool
process_all_queues(isc__networker_t *worker) {
	bool reschedule = false;
	/*
	 * The queue processing functions will return false when the
	 * system is pausing or stopping and we don't want to process
	 * the other queues in such case, but we need the async event
	 * to be rescheduled in the next uv_run().
	 */
	for (size_t type = 0; type < NETIEVENT_MAX; type++) {
		isc_result_t result = process_queue(worker, type);
		switch (result) {
		case ISC_R_SUSPEND:
			reschedule = true;
			break;
		case ISC_R_EMPTY:
			/* empty queue */
			break;
		case ISC_R_SUCCESS:
			reschedule = true;
			break;
		default:
			UNREACHABLE();
		}
	}

	return (reschedule);
}

/*
 * async_cb() is a universal callback for 'async' events sent to event loop.
 * It's the only way to safely pass data to the libuv event loop. We use a
 * single async event and a set of lockless queues of 'isc__netievent_t'
 * structures passed from other threads.
 */
static void
async_cb(uv_async_t *handle) {
	isc__networker_t *worker = (isc__networker_t *)handle->loop->data;

	if (process_all_queues(worker)) {
		/*
		 * If we didn't process all the events, we need to enqueue
		 * async_cb to be run in the next iteration of the uv_loop
		 */
		uv_async_send(handle);
	}
}

static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0) {
	UNUSED(ev0);
	worker->finished = true;
	/* Close the async handler */
	uv_close((uv_handle_t *)&worker->async, NULL);
}

void
isc_nm_task_enqueue(isc_nm_t *nm, isc_task_t *task, int threadid) {
	isc__netievent_t *event = NULL;
	int tid;
	isc__networker_t *worker = NULL;

	if (threadid == -1) {
		tid = (int)isc_random_uniform(nm->nlisteners);
	} else if (threadid == ISC_NM_TASK_SLOW_OFFSET) {
		tid = nm->nlisteners +
		      (int)isc_random_uniform(nm->nworkers - nm->nlisteners);
	} else if (threadid < ISC_NM_TASK_SLOW_OFFSET) {
		tid = nm->nlisteners + (ISC_NM_TASK_SLOW(threadid) %
					(nm->nworkers - nm->nlisteners));
	} else {
		tid = threadid % nm->nlisteners;
	}

	worker = &nm->workers[tid];

	if (isc_task_privileged(task)) {
		event = (isc__netievent_t *)
			isc__nm_get_netievent_privilegedtask(nm, task);
	} else {
		event = (isc__netievent_t *)isc__nm_get_netievent_task(nm,
								       task);
	}

	isc__nm_enqueue_ievent(worker, event);
}

#define isc__nm_async_privilegedtask(worker, ev0) \
	isc__nm_async_task(worker, ev0)

static void
isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
	isc__netievent_task_t *ievent = (isc__netievent_task_t *)ev0;
	isc_result_t result;

	UNUSED(worker);

	result = isc_task_run(ievent->task);

	switch (result) {
	case ISC_R_QUOTA:
		isc_task_ready(ievent->task);
		return;
	case ISC_R_SUCCESS:
		return;
	default:
		UNREACHABLE();
	}
}

static void
wait_for_priority_queue(isc__networker_t *worker) {
	isc_condition_t *cond = &worker->ievents[NETIEVENT_PRIORITY].cond;
	isc_mutex_t *lock = &worker->ievents[NETIEVENT_PRIORITY].lock;
	isc__netievent_list_t *list =
		&(worker->ievents[NETIEVENT_PRIORITY].list);

	LOCK(lock);
	while (ISC_LIST_EMPTY(*list)) {
		WAIT(cond, lock);
	}
	UNLOCK(lock);

	drain_queue(worker, NETIEVENT_PRIORITY);
}

static void
drain_queue(isc__networker_t *worker, netievent_type_t type) {
	bool empty = false;
	while (!empty) {
		if (process_queue(worker, type) == ISC_R_EMPTY) {
			LOCK(&worker->ievents[type].lock);
			empty = ISC_LIST_EMPTY(worker->ievents[type].list);
			UNLOCK(&worker->ievents[type].lock);
		}
	}
}

/*
 * The two macros here generate the individual cases for the process_netievent()
 * function.  The NETIEVENT_CASE(type) macro is the common case, and
 * NETIEVENT_CASE_NOMORE(type) is a macro that causes the loop in the
 * process_queue() to stop, e.g. it's only used for the netievent that
 * stops/pauses processing the enqueued netievents.
 */
#define NETIEVENT_CASE(type)                                               \
	case netievent_##type: {                                           \
		isc__nm_async_##type(worker, ievent);                      \
		isc__nm_put_netievent_##type(                              \
			worker->mgr, (isc__netievent_##type##_t *)ievent); \
		return (true);                                             \
	}

#define NETIEVENT_CASE_NOMORE(type)                                \
	case netievent_##type: {                                   \
		isc__nm_async_##type(worker, ievent);              \
		isc__nm_put_netievent_##type(worker->mgr, ievent); \
		return (false);                                    \
	}

static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) {
	REQUIRE(worker->id == isc_nm_tid());

	switch (ievent->type) {
		/* Don't process more ievents when we are stopping */
		NETIEVENT_CASE_NOMORE(stop);

		NETIEVENT_CASE(privilegedtask);
		NETIEVENT_CASE(task);

		NETIEVENT_CASE(udpconnect);
		NETIEVENT_CASE(udplisten);
		NETIEVENT_CASE(udpstop);
		NETIEVENT_CASE(udpsend);
		NETIEVENT_CASE(udpread);
		NETIEVENT_CASE(udpcancel);
		NETIEVENT_CASE(udpclose);

		NETIEVENT_CASE(tcpaccept);
		NETIEVENT_CASE(tcpconnect);
		NETIEVENT_CASE(tcplisten);
		NETIEVENT_CASE(tcpstartread);
		NETIEVENT_CASE(tcppauseread);
		NETIEVENT_CASE(tcpsend);
		NETIEVENT_CASE(tcpstop);
		NETIEVENT_CASE(tcpcancel);
		NETIEVENT_CASE(tcpclose);

		NETIEVENT_CASE(tcpdnsaccept);
		NETIEVENT_CASE(tcpdnslisten);
		NETIEVENT_CASE(tcpdnsconnect);
		NETIEVENT_CASE(tcpdnssend);
		NETIEVENT_CASE(tcpdnscancel);
		NETIEVENT_CASE(tcpdnsclose);
		NETIEVENT_CASE(tcpdnsread);
		NETIEVENT_CASE(tcpdnsstop);

		NETIEVENT_CASE(connectcb);
		NETIEVENT_CASE(readcb);
		NETIEVENT_CASE(sendcb);

		NETIEVENT_CASE(close);
		NETIEVENT_CASE(detach);

		NETIEVENT_CASE(shutdown);
		NETIEVENT_CASE(resume);
		NETIEVENT_CASE_NOMORE(pause);
	default:
		UNREACHABLE();
	}
	return (true);
}

static isc_result_t
process_queue(isc__networker_t *worker, netievent_type_t type) {
	isc__netievent_t *ievent = NULL;
	isc__netievent_list_t list;

	ISC_LIST_INIT(list);

	LOCK(&worker->ievents[type].lock);
	ISC_LIST_MOVE(list, worker->ievents[type].list);
	UNLOCK(&worker->ievents[type].lock);

	ievent = ISC_LIST_HEAD(list);
	if (ievent == NULL) {
		/* There's nothing scheduled */
		return (ISC_R_EMPTY);
	}

	while (ievent != NULL) {
		isc__netievent_t *next = ISC_LIST_NEXT(ievent, link);
		ISC_LIST_DEQUEUE(list, ievent, link);

		if (!process_netievent(worker, ievent)) {
			/* The netievent told us to stop */
			if (!ISC_LIST_EMPTY(list)) {
				/*
				 * Reschedule the rest of the unprocessed
				 * events.
				 */
				LOCK(&worker->ievents[type].lock);
				ISC_LIST_PREPENDLIST(worker->ievents[type].list,
						     list, link);
				UNLOCK(&worker->ievents[type].lock);
			}
			return (ISC_R_SUSPEND);
		}

		ievent = next;
	}

	/* We processed at least one */
	return (ISC_R_SUCCESS);
}

void *
isc__nm_get_netievent(isc_nm_t *mgr, isc__netievent_type type) {
	isc__netievent_storage_t *event = isc_mem_get(mgr->mctx,
						      sizeof(*event));

	*event = (isc__netievent_storage_t){ .ni.type = type };
	ISC_LINK_INIT(&(event->ni), link);
	return (event);
}

void
isc__nm_put_netievent(isc_nm_t *mgr, void *ievent) {
	isc_mem_put(mgr->mctx, ievent, sizeof(isc__netievent_storage_t));
}

NETIEVENT_SOCKET_DEF(tcpclose);
NETIEVENT_SOCKET_DEF(tcplisten);
NETIEVENT_SOCKET_DEF(tcppauseread);
NETIEVENT_SOCKET_DEF(tcpstartread);
NETIEVENT_SOCKET_DEF(tcpstop);
NETIEVENT_SOCKET_DEF(udpclose);
NETIEVENT_SOCKET_DEF(udplisten);
NETIEVENT_SOCKET_DEF(udpread);
NETIEVENT_SOCKET_DEF(udpsend);
NETIEVENT_SOCKET_DEF(udpstop);

NETIEVENT_SOCKET_DEF(tcpdnsclose);
NETIEVENT_SOCKET_DEF(tcpdnsread);
NETIEVENT_SOCKET_DEF(tcpdnsstop);
NETIEVENT_SOCKET_DEF(tcpdnslisten);
NETIEVENT_SOCKET_REQ_DEF(tcpdnsconnect);
NETIEVENT_SOCKET_REQ_DEF(tcpdnssend);
NETIEVENT_SOCKET_HANDLE_DEF(tcpdnscancel);
NETIEVENT_SOCKET_QUOTA_DEF(tcpdnsaccept);

NETIEVENT_SOCKET_REQ_DEF(tcpconnect);
NETIEVENT_SOCKET_REQ_DEF(tcpsend);
NETIEVENT_SOCKET_REQ_DEF(udpconnect);
NETIEVENT_SOCKET_REQ_RESULT_DEF(connectcb);
NETIEVENT_SOCKET_REQ_RESULT_DEF(readcb);
NETIEVENT_SOCKET_REQ_RESULT_DEF(sendcb);

NETIEVENT_SOCKET_DEF(detach);
NETIEVENT_SOCKET_HANDLE_DEF(tcpcancel);
NETIEVENT_SOCKET_HANDLE_DEF(udpcancel);

NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept);

NETIEVENT_SOCKET_DEF(close);
NETIEVENT_DEF(pause);
NETIEVENT_DEF(resume);
NETIEVENT_DEF(shutdown);
NETIEVENT_DEF(stop);

NETIEVENT_TASK_DEF(task);
NETIEVENT_TASK_DEF(privilegedtask);

void
isc__nm_maybe_enqueue_ievent(isc__networker_t *worker,
			     isc__netievent_t *event) {
	/*
	 * If we are already in the matching nmthread, process the ievent
	 * directly.
	 */
	if (worker->id == isc_nm_tid()) {
		process_netievent(worker, event);
		return;
	}

	isc__nm_enqueue_ievent(worker, event);
}

void
isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
	netievent_type_t type;

	if (event->type > netievent_prio) {
		type = NETIEVENT_PRIORITY;
	} else {
		switch (event->type) {
		case netievent_prio:
			UNREACHABLE();
			break;
		case netievent_privilegedtask:
			type = NETIEVENT_PRIVILEGED;
			break;
		case netievent_task:
			type = NETIEVENT_TASK;
			break;
		default:
			type = NETIEVENT_NORMAL;
			break;
		}
	}

	/*
	 * We need to make sure this signal will be delivered and
	 * the queue will be processed.
	 */
	LOCK(&worker->ievents[type].lock);
	ISC_LIST_ENQUEUE(worker->ievents[type].list, event, link);
	if (type == NETIEVENT_PRIORITY) {
		SIGNAL(&worker->ievents[type].cond);
	}
	UNLOCK(&worker->ievents[type].lock);

	uv_async_send(&worker->async);
}

bool
isc__nmsocket_active(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));
	if (sock->parent != NULL) {
		return (atomic_load(&sock->parent->active));
	}

	return (atomic_load(&sock->active));
}

bool
isc__nmsocket_deactivate(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));

	if (sock->parent != NULL) {
		return (atomic_compare_exchange_strong(&sock->parent->active,
						       &(bool){ true }, false));
	}

	return (atomic_compare_exchange_strong(&sock->active, &(bool){ true },
					       false));
}

void
isc___nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target FLARG) {
	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(target != NULL && *target == NULL);

	isc_nmsocket_t *rsock = NULL;

	if (sock->parent != NULL) {
		rsock = sock->parent;
		INSIST(rsock->parent == NULL); /* sanity check */
	} else {
		rsock = sock;
	}

	NETMGR_TRACE_LOG("isc__nmsocket_attach():%p->references = %" PRIuFAST32
			 "\n",
			 rsock, isc_refcount_current(&rsock->references) + 1);

	isc_refcount_increment0(&rsock->references);

	*target = sock;
}

/*
 * Free all resources inside a socket (including its children if any).
 */
static void
nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) {
	isc_nmhandle_t *handle = NULL;
	isc__nm_uvreq_t *uvreq = NULL;

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(!isc__nmsocket_active(sock));

	NETMGR_TRACE_LOG("nmsocket_cleanup():%p->references = %" PRIuFAST32
			 "\n",
			 sock, isc_refcount_current(&sock->references));

	atomic_store(&sock->destroying, true);

	if (sock->parent == NULL && sock->children != NULL) {
		/*
		 * We shouldn't be here unless there are no active handles,
		 * so we can clean up and free the children.
		 */
		for (size_t i = 0; i < sock->nchildren; i++) {
			if (!atomic_load(&sock->children[i].destroying)) {
				nmsocket_cleanup(&sock->children[i],
						 false FLARG_PASS);
			}
		}

		/*
		 * This was a parent socket: destroy the listening
		 * barriers that synchronized the children.
		 */
		isc_barrier_destroy(&sock->startlistening);
		isc_barrier_destroy(&sock->stoplistening);

		/*
		 * Now free them.
		 */
		isc_mem_put(sock->mgr->mctx, sock->children,
			    sock->nchildren * sizeof(*sock));
		sock->children = NULL;
		sock->nchildren = 0;
	}
	if (sock->statsindex != NULL) {
		isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
	}

	sock->statichandle = NULL;

	if (sock->outerhandle != NULL) {
		isc__nmhandle_detach(&sock->outerhandle FLARG_PASS);
	}

	if (sock->outer != NULL) {
		isc___nmsocket_detach(&sock->outer FLARG_PASS);
	}

	while ((handle = isc_astack_pop(sock->inactivehandles)) != NULL) {
		nmhandle_free(sock, handle);
	}

	if (sock->buf != NULL) {
		isc_mem_free(sock->mgr->mctx, sock->buf);
	}

	if (sock->quota != NULL) {
		isc_quota_detach(&sock->quota);
	}

	sock->pquota = NULL;

	isc_astack_destroy(sock->inactivehandles);

	while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) {
		isc_mem_put(sock->mgr->mctx, uvreq, sizeof(*uvreq));
	}

	isc_astack_destroy(sock->inactivereqs);
	sock->magic = 0;

	isc_condition_destroy(&sock->scond);
	isc_condition_destroy(&sock->cond);
	isc_mutex_destroy(&sock->lock);
#ifdef NETMGR_TRACE
	LOCK(&sock->mgr->lock);
	ISC_LIST_UNLINK(sock->mgr->active_sockets, sock, active_link);
	UNLOCK(&sock->mgr->lock);
#endif
	if (dofree) {
		isc_nm_t *mgr = sock->mgr;
		isc_mem_put(mgr->mctx, sock, sizeof(*sock));
		isc_nm_detach(&mgr);
	} else {
		isc_nm_detach(&sock->mgr);
	}
}

static void
nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG) {
	int active_handles;
	bool destroy = false;

	NETMGR_TRACE_LOG("%s():%p->references = %" PRIuFAST32 "\n", __func__,
			 sock, isc_refcount_current(&sock->references));

	if (sock->parent != NULL) {
		/*
		 * This is a child socket and cannot be destroyed except
		 * as a side effect of destroying the parent, so let's go
		 * see if the parent is ready to be destroyed.
		 */
		nmsocket_maybe_destroy(sock->parent FLARG_PASS);
		return;
	}

	/*
	 * This is a parent socket (or a standalone). See whether the
	 * children have active handles before deciding whether to
	 * accept destruction.
	 */
	LOCK(&sock->lock);
	if (atomic_load(&sock->active) || atomic_load(&sock->destroying) ||
	    !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0)
	{
		UNLOCK(&sock->lock);
		return;
	}

	active_handles = atomic_load(&sock->ah);
	if (sock->children != NULL) {
		for (size_t i = 0; i < sock->nchildren; i++) {
			LOCK(&sock->children[i].lock);
			active_handles += atomic_load(&sock->children[i].ah);
			UNLOCK(&sock->children[i].lock);
		}
	}

	if (active_handles == 0 || sock->statichandle != NULL) {
		destroy = true;
	}

	NETMGR_TRACE_LOG("%s:%p->active_handles = %d, .statichandle = %p\n",
			 __func__, sock, active_handles, sock->statichandle);

	if (destroy) {
		atomic_store(&sock->destroying, true);
		UNLOCK(&sock->lock);
		nmsocket_cleanup(sock, true FLARG_PASS);
	} else {
		UNLOCK(&sock->lock);
	}
}

void
isc___nmsocket_prep_destroy(isc_nmsocket_t *sock FLARG) {
	REQUIRE(sock->parent == NULL);

	NETMGR_TRACE_LOG("isc___nmsocket_prep_destroy():%p->references = "
			 "%" PRIuFAST32 "\n",
			 sock, isc_refcount_current(&sock->references));

	/*
	 * The final external reference to the socket is gone. We can try
	 * destroying the socket, but we have to wait for all the inflight
	 * handles to finish first.
	 */
	atomic_store(&sock->active, false);

	/*
	 * If the socket has children, they'll need to be marked inactive
	 * so they can be cleaned up too.
	 */
	if (sock->children != NULL) {
		for (size_t i = 0; i < sock->nchildren; i++) {
			atomic_store(&sock->children[i].active, false);
		}
	}

	/*
	 * If we're here then we already stopped listening; otherwise
	 * we'd have a hanging reference from the listening process.
	 *
	 * If it's a regular socket we may need to close it.
	 */
	if (!atomic_load(&sock->closed)) {
		switch (sock->type) {
		case isc_nm_udpsocket:
			isc__nm_udp_close(sock);
			return;
		case isc_nm_tcpsocket:
			isc__nm_tcp_close(sock);
			return;
		case isc_nm_tcpdnssocket:
			isc__nm_tcpdns_close(sock);
			return;
		default:
			break;
		}
	}

	nmsocket_maybe_destroy(sock FLARG_PASS);
}

void
isc___nmsocket_detach(isc_nmsocket_t **sockp FLARG) {
	REQUIRE(sockp != NULL && *sockp != NULL);
	REQUIRE(VALID_NMSOCK(*sockp));

	isc_nmsocket_t *sock = *sockp, *rsock = NULL;
	*sockp = NULL;

	/*
	 * If the socket is a part of a set (a child socket) we are
	 * counting references for the whole set at the parent.
	 */
	if (sock->parent != NULL) {
		rsock = sock->parent;
		INSIST(rsock->parent == NULL); /* Sanity check */
	} else {
		rsock = sock;
	}

	NETMGR_TRACE_LOG("isc__nmsocket_detach():%p->references = %" PRIuFAST32
			 "\n",
			 rsock, isc_refcount_current(&rsock->references) - 1);

	if (isc_refcount_decrement(&rsock->references) == 1) {
		isc___nmsocket_prep_destroy(rsock FLARG_PASS);
	}
}

void
isc_nmsocket_close(isc_nmsocket_t **sockp) {
	REQUIRE(sockp != NULL);
	REQUIRE(VALID_NMSOCK(*sockp));
	REQUIRE((*sockp)->type == isc_nm_udplistener ||
		(*sockp)->type == isc_nm_tcplistener ||
		(*sockp)->type == isc_nm_tcpdnslistener);

	isc__nmsocket_detach(sockp);
}

void
isc___nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
		    isc_sockaddr_t *iface FLARG) {
	uint16_t family;

	REQUIRE(sock != NULL);
	REQUIRE(mgr != NULL);
	REQUIRE(iface != NULL);

	family = iface->type.sa.sa_family;

	*sock = (isc_nmsocket_t){ .type = type,
				  .iface = *iface,
				  .fd = -1,
				  .inactivehandles = isc_astack_new(
					  mgr->mctx, ISC_NM_HANDLES_STACK_SIZE),
				  .inactivereqs = isc_astack_new(
					  mgr->mctx, ISC_NM_REQS_STACK_SIZE) };

#if NETMGR_TRACE
	sock->backtrace_size = backtrace(sock->backtrace, TRACE_SIZE);
	ISC_LINK_INIT(sock, active_link);
	ISC_LIST_INIT(sock->active_handles);
	LOCK(&mgr->lock);
	ISC_LIST_APPEND(mgr->active_sockets, sock, active_link);
	UNLOCK(&mgr->lock);
#endif

	isc_nm_attach(mgr, &sock->mgr);
	sock->uv_handle.handle.data = sock;

	ISC_LINK_INIT(&sock->quotacb, link);

	switch (type) {
	case isc_nm_udpsocket:
	case isc_nm_udplistener:
		if (family == AF_INET) {
			sock->statsindex = udp4statsindex;
		} else {
			sock->statsindex = udp6statsindex;
		}
		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
		break;
	case isc_nm_tcpsocket:
	case isc_nm_tcplistener:
	case isc_nm_tcpdnssocket:
	case isc_nm_tcpdnslistener:
		if (family == AF_INET) {
			sock->statsindex = tcp4statsindex;
		} else {
			sock->statsindex = tcp6statsindex;
		}
		isc__nm_incstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
		break;
	default:
		break;
	}

	isc_mutex_init(&sock->lock);
	isc_condition_init(&sock->cond);
	isc_condition_init(&sock->scond);
	isc_refcount_init(&sock->references, 1);

	NETMGR_TRACE_LOG("isc__nmsocket_init():%p->references = %" PRIuFAST32
			 "\n",
			 sock, isc_refcount_current(&sock->references));

	atomic_init(&sock->active, true);
	atomic_init(&sock->sequential, false);
	atomic_init(&sock->readpaused, false);
	atomic_init(&sock->closing, false);
	atomic_init(&sock->listening, 0);
	atomic_init(&sock->closed, 0);
	atomic_init(&sock->destroying, 0);
	atomic_init(&sock->ah, 0);
	atomic_init(&sock->client, 0);
	atomic_init(&sock->connecting, false);
	atomic_init(&sock->keepalive, false);
	atomic_init(&sock->connected, false);
	atomic_init(&sock->timedout, false);

	atomic_init(&sock->active_child_connections, 0);

	sock->magic = NMSOCK_MAGIC;
}

void
isc__nmsocket_clearcb(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(!isc__nm_in_netthread() || sock->tid == isc_nm_tid());

	sock->recv_cb = NULL;
	sock->recv_cbarg = NULL;
	sock->accept_cb = NULL;
	sock->accept_cbarg = NULL;
	sock->connect_cb = NULL;
	sock->connect_cbarg = NULL;
}

void
isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) {
	isc__networker_t *worker = NULL;

	REQUIRE(VALID_NMSOCK(sock));

	worker = &sock->mgr->workers[sock->tid];
	REQUIRE(buf->base == worker->recvbuf);

	worker->recvbuf_inuse = false;
}

static isc_nmhandle_t *
alloc_handle(isc_nmsocket_t *sock) {
	isc_nmhandle_t *handle =
		isc_mem_get(sock->mgr->mctx,
			    sizeof(isc_nmhandle_t) + sock->extrahandlesize);

	*handle = (isc_nmhandle_t){ .magic = NMHANDLE_MAGIC };
#ifdef NETMGR_TRACE
	ISC_LINK_INIT(handle, active_link);
#endif
	isc_refcount_init(&handle->references, 1);

	return (handle);
}

isc_nmhandle_t *
isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
		   isc_sockaddr_t *local FLARG) {
	isc_nmhandle_t *handle = NULL;

	REQUIRE(VALID_NMSOCK(sock));

	handle = isc_astack_pop(sock->inactivehandles);

	if (handle == NULL) {
		handle = alloc_handle(sock);
	} else {
		isc_refcount_init(&handle->references, 1);
		INSIST(VALID_NMHANDLE(handle));
	}

	NETMGR_TRACE_LOG(
		"isc__nmhandle_get():handle %p->references = %" PRIuFAST32 "\n",
		handle, isc_refcount_current(&handle->references));

	isc___nmsocket_attach(sock, &handle->sock FLARG_PASS);

#if NETMGR_TRACE
	handle->backtrace_size = backtrace(handle->backtrace, TRACE_SIZE);
#endif

	if (peer != NULL) {
		handle->peer = *peer;
	} else {
		handle->peer = sock->peer;
	}

	if (local != NULL) {
		handle->local = *local;
	} else {
		handle->local = sock->iface;
	}

	(void)atomic_fetch_add(&sock->ah, 1);

#ifdef NETMGR_TRACE
	LOCK(&sock->lock);
	ISC_LIST_APPEND(sock->active_handles, handle, active_link);
	UNLOCK(&sock->lock);
#endif

	switch (sock->type) {
	case isc_nm_udpsocket:
	case isc_nm_tcpdnssocket:
		if (!atomic_load(&sock->client)) {
			break;
		}
		FALLTHROUGH;
	case isc_nm_tcpsocket:
		INSIST(sock->statichandle == NULL);

		/*
		 * statichandle must be assigned, not attached;
		 * otherwise, if a handle was detached elsewhere
		 * it could never reach 0 references, and the
		 * handle and socket would never be freed.
		 */
		sock->statichandle = handle;
		break;
	default:
		break;
	}

	return (handle);
}

void
isc__nmhandle_attach(isc_nmhandle_t *handle, isc_nmhandle_t **handlep FLARG) {
	REQUIRE(VALID_NMHANDLE(handle));
	REQUIRE(handlep != NULL && *handlep == NULL);

	NETMGR_TRACE_LOG("isc__nmhandle_attach():handle %p->references = "
			 "%" PRIuFAST32 "\n",
			 handle, isc_refcount_current(&handle->references) + 1);

	isc_refcount_increment(&handle->references);
	*handlep = handle;
}

bool
isc_nmhandle_is_stream(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	return (handle->sock->type == isc_nm_tcpsocket ||
		handle->sock->type == isc_nm_tcpdnssocket);
}

static void
nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
	size_t extra = sock->extrahandlesize;

	isc_refcount_destroy(&handle->references);

	if (handle->dofree != NULL) {
		handle->dofree(handle->opaque);
	}

	*handle = (isc_nmhandle_t){ .magic = 0 };

	isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra);
}

static void
nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
	bool reuse = false;

	/*
	 * We do all of this under lock to avoid races with socket
	 * destruction.  We have to do this now, because at this point the
	 * socket is either unused or still attached to event->sock.
	 */
	LOCK(&sock->lock);

#ifdef NETMGR_TRACE
	ISC_LIST_UNLINK(sock->active_handles, handle, active_link);
#endif

	INSIST(atomic_fetch_sub(&sock->ah, 1) > 0);

#if !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__
	if (atomic_load(&sock->active)) {
		reuse = isc_astack_trypush(sock->inactivehandles, handle);
	}
#endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
	if (!reuse) {
		nmhandle_free(sock, handle);
	}
	UNLOCK(&sock->lock);
}

void
isc__nmhandle_detach(isc_nmhandle_t **handlep FLARG) {
	isc_nmsocket_t *sock = NULL;
	isc_nmhandle_t *handle = NULL;

	REQUIRE(handlep != NULL);
	REQUIRE(VALID_NMHANDLE(*handlep));

	handle = *handlep;
	*handlep = NULL;

	/*
	 * If the closehandle_cb is set, it needs to run asynchronously to
	 * ensure correct ordering of the isc__nm_process_sock_buffer().
	 */
	sock = handle->sock;
	if (sock->tid == isc_nm_tid() && sock->closehandle_cb == NULL) {
		nmhandle_detach_cb(&handle FLARG_PASS);
	} else {
		isc__netievent_detach_t *event =
			isc__nm_get_netievent_detach(sock->mgr, sock);
		/*
		 * we are using implicit "attach" as the last reference
		 * need to be destroyed explicitly in the async callback
		 */
		event->handle = handle;
		FLARG_IEVENT_PASS(event);
		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
				       (isc__netievent_t *)event);
	}
}

void
isc__nmsocket_shutdown(isc_nmsocket_t *sock);

static void
nmhandle_detach_cb(isc_nmhandle_t **handlep FLARG) {
	isc_nmsocket_t *sock = NULL;
	isc_nmhandle_t *handle = NULL;

	REQUIRE(handlep != NULL);
	REQUIRE(VALID_NMHANDLE(*handlep));

	handle = *handlep;
	*handlep = NULL;

	NETMGR_TRACE_LOG("isc__nmhandle_detach():%p->references = %" PRIuFAST32
			 "\n",
			 handle, isc_refcount_current(&handle->references) - 1);

	if (isc_refcount_decrement(&handle->references) > 1) {
		return;
	}

	/* We need an acquire memory barrier here */
	(void)isc_refcount_current(&handle->references);

	sock = handle->sock;
	handle->sock = NULL;

	if (handle->doreset != NULL) {
		handle->doreset(handle->opaque);
	}

	nmhandle_deactivate(sock, handle);

	/*
	 * The handle is gone now. If the socket has a callback configured
	 * for that (e.g., to perform cleanup after request processing),
	 * call it now, or schedule it to run asynchronously.
	 */
	if (sock->closehandle_cb != NULL) {
		if (sock->tid == isc_nm_tid()) {
			sock->closehandle_cb(sock);
		} else {
			isc__netievent_close_t *event =
				isc__nm_get_netievent_close(sock->mgr, sock);
			isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
					       (isc__netievent_t *)event);
		}
	}

	if (handle == sock->statichandle) {
		/* statichandle is assigned, not attached. */
		sock->statichandle = NULL;
	}

	isc___nmsocket_detach(&sock FLARG_PASS);
}

void *
isc_nmhandle_getdata(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	return (handle->opaque);
}

int
isc_nmhandle_getfd(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	return (handle->sock->fd);
}

void
isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
		     isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree) {
	REQUIRE(VALID_NMHANDLE(handle));

	handle->opaque = arg;
	handle->doreset = doreset;
	handle->dofree = dofree;
}

void
isc__nm_alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
	REQUIRE(len <= NM_BIG_BUF);

	if (sock->buf == NULL) {
		/* We don't have the buffer at all */
		size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF;
		sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len);
		sock->buf_size = alloc_len;
	} else {
		/* We have the buffer but it's too small */
		sock->buf = isc_mem_reallocate(sock->mgr->mctx, sock->buf,
					       NM_BIG_BUF);
		sock->buf_size = NM_BIG_BUF;
	}
}

void
isc__nm_failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
		       isc_result_t eresult) {
	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(VALID_UVREQ(req));

	if (req->cb.send != NULL) {
		isc__nm_sendcb(sock, req, eresult, true);
	} else {
		isc__nm_uvreq_put(&req, sock);
	}
}

void
isc__nm_failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
	REQUIRE(sock->accepting);
	REQUIRE(sock->server);

	/*
	 * Detach the quota early to make room for other connections;
	 * otherwise it'd be detached later asynchronously, and clog
	 * the quota unnecessarily.
	 */
	if (sock->quota != NULL) {
		isc_quota_detach(&sock->quota);
	}

	isc__nmsocket_detach(&sock->server);

	sock->accepting = false;

	switch (eresult) {
	case ISC_R_NOTCONNECTED:
		/* IGNORE: The client disconnected before we could accept */
		break;
	default:
		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
			      ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR,
			      "Accepting TCP connection failed: %s",
			      isc_result_totext(eresult));
	}
}

void
isc__nm_failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
			  isc_result_t eresult, bool async) {
	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(VALID_UVREQ(req));
	REQUIRE(sock->tid == isc_nm_tid());
	REQUIRE(req->cb.connect != NULL);

	isc__nmsocket_timer_stop(sock);
	uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);

	INSIST(atomic_compare_exchange_strong(&sock->connecting,
					      &(bool){ true }, false));

	isc__nmsocket_clearcb(sock);
	isc__nm_connectcb(sock, req, eresult, async);

	isc__nmsocket_prep_destroy(sock);
}

void
isc__nm_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result, bool async) {
	REQUIRE(VALID_NMSOCK(sock));
	UNUSED(async);

	switch (sock->type) {
	case isc_nm_udpsocket:
		isc__nm_udp_failed_read_cb(sock, result);
		return;
	case isc_nm_tcpsocket:
		isc__nm_tcp_failed_read_cb(sock, result);
		return;
	case isc_nm_tcpdnssocket:
		isc__nm_tcpdns_failed_read_cb(sock, result);
		return;
	default:
		UNREACHABLE();
	}
}

void
isc__nmsocket_connecttimeout_cb(uv_timer_t *timer) {
	uv_connect_t *uvreq = uv_handle_get_data((uv_handle_t *)timer);
	isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
	isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)uvreq);

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(sock->tid == isc_nm_tid());
	REQUIRE(atomic_load(&sock->connecting));
	REQUIRE(VALID_UVREQ(req));
	REQUIRE(VALID_NMHANDLE(req->handle));

	isc__nmsocket_timer_stop(sock);

	/*
	 * Mark the connection as timed out and shutdown the socket.
	 */

	INSIST(atomic_compare_exchange_strong(&sock->timedout, &(bool){ false },
					      true));
	isc__nmsocket_clearcb(sock);
	isc__nmsocket_shutdown(sock);
}

void
isc__nm_accept_connection_log(isc_result_t result, bool can_log_quota) {
	int level;

	switch (result) {
	case ISC_R_SUCCESS:
	case ISC_R_NOCONN:
		return;
	case ISC_R_QUOTA:
	case ISC_R_SOFTQUOTA:
		if (!can_log_quota) {
			return;
		}
		level = ISC_LOG_INFO;
		break;
	case ISC_R_NOTCONNECTED:
		level = ISC_LOG_INFO;
		break;
	default:
		level = ISC_LOG_ERROR;
	}

	isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
		      level, "Accepting TCP connection failed: %s",
		      isc_result_totext(result));
}

void
isc__nmsocket_writetimeout_cb(void *data, isc_result_t eresult) {
	isc__nm_uvreq_t *req = data;
	isc_nmsocket_t *sock = NULL;

	REQUIRE(eresult == ISC_R_TIMEDOUT);
	REQUIRE(VALID_UVREQ(req));
	REQUIRE(VALID_NMSOCK(req->sock));

	sock = req->sock;

	isc__nmsocket_reset(sock);
}

void
isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
	isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)timer);

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(sock->tid == isc_nm_tid());
	REQUIRE(sock->reading);

	if (atomic_load(&sock->client)) {
		uv_timer_stop(timer);

		if (sock->recv_cb != NULL) {
			isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL);
			isc__nm_readcb(sock, req, ISC_R_TIMEDOUT);
		}

		if (!isc__nmsocket_timer_running(sock)) {
			isc__nmsocket_clearcb(sock);
			isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false);
		}
	} else {
		isc__nm_failed_read_cb(sock, ISC_R_TIMEDOUT, false);
	}
}

void
isc__nmsocket_timer_restart(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));

	if (atomic_load(&sock->connecting)) {
		int r;

		if (sock->connect_timeout == 0) {
			return;
		}

		r = uv_timer_start(&sock->read_timer,
				   isc__nmsocket_connecttimeout_cb,
				   sock->connect_timeout + 10, 0);
		UV_RUNTIME_CHECK(uv_timer_start, r);

	} else {
		int r;

		if (sock->read_timeout == 0) {
			return;
		}

		r = uv_timer_start(&sock->read_timer,
				   isc__nmsocket_readtimeout_cb,
				   sock->read_timeout, 0);
		UV_RUNTIME_CHECK(uv_timer_start, r);
	}
}

bool
isc__nmsocket_timer_running(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));

	return (uv_is_active((uv_handle_t *)&sock->read_timer));
}

void
isc__nmsocket_timer_start(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));

	if (isc__nmsocket_timer_running(sock)) {
		return;
	}

	isc__nmsocket_timer_restart(sock);
}

void
isc__nmsocket_timer_stop(isc_nmsocket_t *sock) {
	int r;

	REQUIRE(VALID_NMSOCK(sock));

	/* uv_timer_stop() is idempotent, no need to check if running */

	r = uv_timer_stop(&sock->read_timer);
	UV_RUNTIME_CHECK(uv_timer_stop, r);
}

isc__nm_uvreq_t *
isc__nm_get_read_req(isc_nmsocket_t *sock, isc_sockaddr_t *sockaddr) {
	isc__nm_uvreq_t *req = NULL;

	req = isc__nm_uvreq_get(sock->mgr, sock);
	req->cb.recv = sock->recv_cb;
	req->cbarg = sock->recv_cbarg;

	switch (sock->type) {
	case isc_nm_tcpsocket:
		isc_nmhandle_attach(sock->statichandle, &req->handle);
		break;
	default:
		if (atomic_load(&sock->client)) {
			isc_nmhandle_attach(sock->statichandle, &req->handle);
		} else {
			req->handle = isc__nmhandle_get(sock, sockaddr, NULL);
		}
		break;
	}

	return (req);
}

/*%<
 * Allocator callback for read operations.
 *
 * Note this doesn't actually allocate anything, it just assigns the
 * worker's receive buffer to a socket, and marks it as "in use".
 */
void
isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
	isc_nmsocket_t *sock = uv_handle_get_data(handle);
	isc__networker_t *worker = NULL;

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(isc__nm_in_netthread());
	/*
	 * The size provided by libuv is only suggested size, and it always
	 * defaults to 64 * 1024 in the current versions of libuv (see
	 * src/unix/udp.c and src/unix/stream.c).
	 */
	UNUSED(size);

	worker = &sock->mgr->workers[sock->tid];
	INSIST(!worker->recvbuf_inuse);
	INSIST(worker->recvbuf != NULL);

	switch (sock->type) {
	case isc_nm_udpsocket:
		buf->len = ISC_NETMGR_UDP_RECVBUF_SIZE;
		break;
	case isc_nm_tcpsocket:
	case isc_nm_tcpdnssocket:
		buf->len = ISC_NETMGR_TCP_RECVBUF_SIZE;
		break;
	default:
		UNREACHABLE();
	}

	REQUIRE(buf->len <= ISC_NETMGR_RECVBUF_SIZE);
	buf->base = worker->recvbuf;

	worker->recvbuf_inuse = true;
}

isc_result_t
isc__nm_start_reading(isc_nmsocket_t *sock) {
	isc_result_t result = ISC_R_SUCCESS;
	int r;

	if (sock->reading) {
		return (ISC_R_SUCCESS);
	}

	switch (sock->type) {
	case isc_nm_udpsocket:
		r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb,
				      isc__nm_udp_read_cb);
		break;
	case isc_nm_tcpsocket:
		r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb,
				  isc__nm_tcp_read_cb);
		break;
	case isc_nm_tcpdnssocket:
		r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb,
				  isc__nm_tcpdns_read_cb);
		break;
	default:
		UNREACHABLE();
	}

	if (r != 0) {
		result = isc__nm_uverr2result(r);
	} else {
		sock->reading = true;
	}

	return (result);
}

void
isc__nm_stop_reading(isc_nmsocket_t *sock) {
	int r;

	if (!sock->reading) {
		return;
	}

	switch (sock->type) {
	case isc_nm_udpsocket:
		r = uv_udp_recv_stop(&sock->uv_handle.udp);
		UV_RUNTIME_CHECK(uv_udp_recv_stop, r);
		break;
	case isc_nm_tcpsocket:
	case isc_nm_tcpdnssocket:
		r = uv_read_stop(&sock->uv_handle.stream);
		UV_RUNTIME_CHECK(uv_read_stop, r);
		break;
	default:
		UNREACHABLE();
	}
	sock->reading = false;
}

bool
isc__nm_closing(isc_nmsocket_t *sock) {
	return (atomic_load(&sock->mgr->closing));
}

bool
isc__nmsocket_closing(isc_nmsocket_t *sock) {
	return (!isc__nmsocket_active(sock) || atomic_load(&sock->closing) ||
		atomic_load(&sock->mgr->closing) ||
		(sock->server != NULL && !isc__nmsocket_active(sock->server)));
}

static isc_result_t
processbuffer(isc_nmsocket_t *sock) {
	switch (sock->type) {
	case isc_nm_tcpdnssocket:
		return (isc__nm_tcpdns_processbuffer(sock));
	default:
		UNREACHABLE();
	}
}

/*
 * Process a DNS message.
 *
 * If we only have an incomplete DNS message, we don't touch any
 * timers. If we do have a full message, reset the timer.
 *
 * Stop reading if this is a client socket, or if the server socket
 * has been set to sequential mode, or the number of queries we are
 * processing simultaneously has reached the clients-per-connection
 * limit. In this case we'll be called again by resume_processing()
 * later.
 */
isc_result_t
isc__nm_process_sock_buffer(isc_nmsocket_t *sock) {
	for (;;) {
		int_fast32_t ah = atomic_load(&sock->ah);
		isc_result_t result = processbuffer(sock);
		switch (result) {
		case ISC_R_NOMORE:
			/*
			 * Don't reset the timer until we have a
			 * full DNS message.
			 */
			result = isc__nm_start_reading(sock);
			if (result != ISC_R_SUCCESS) {
				return (result);
			}
			/*
			 * Start the timer only if there are no externally used
			 * active handles, there's always one active handle
			 * attached internally to sock->recv_handle in
			 * accept_connection()
			 */
			if (ah == 1) {
				isc__nmsocket_timer_start(sock);
			}
			goto done;
		case ISC_R_CANCELED:
			isc__nmsocket_timer_stop(sock);
			isc__nm_stop_reading(sock);
			goto done;
		case ISC_R_SUCCESS:
			/*
			 * Stop the timer on the successful message read, this
			 * also allows to restart the timer when we have no more
			 * data.
			 */
			isc__nmsocket_timer_stop(sock);

			if (atomic_load(&sock->client) ||
			    atomic_load(&sock->sequential) ||
			    ah >= STREAM_CLIENTS_PER_CONN)
			{
				isc__nm_stop_reading(sock);
				goto done;
			}
			break;
		default:
			UNREACHABLE();
		}
	}
done:
	return (ISC_R_SUCCESS);
}

void
isc__nm_resume_processing(void *arg) {
	isc_nmsocket_t *sock = (isc_nmsocket_t *)arg;

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(sock->tid == isc_nm_tid());
	REQUIRE(!atomic_load(&sock->client));

	if (isc__nmsocket_closing(sock)) {
		return;
	}

	isc__nm_process_sock_buffer(sock);
}

void
isc_nmhandle_cleartimeout(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));
	REQUIRE(VALID_NMSOCK(handle->sock));

	switch (handle->sock->type) {
	default:
		handle->sock->read_timeout = 0;

		if (uv_is_active((uv_handle_t *)&handle->sock->read_timer)) {
			isc__nmsocket_timer_stop(handle->sock);
		}
	}
}

void
isc_nmhandle_settimeout(isc_nmhandle_t *handle, uint32_t timeout) {
	REQUIRE(VALID_NMHANDLE(handle));
	REQUIRE(VALID_NMSOCK(handle->sock));

	switch (handle->sock->type) {
	default:
		handle->sock->read_timeout = timeout;
		isc__nmsocket_timer_restart(handle->sock);
	}
}

void
isc_nmhandle_keepalive(isc_nmhandle_t *handle, bool value) {
	isc_nmsocket_t *sock = NULL;

	REQUIRE(VALID_NMHANDLE(handle));
	REQUIRE(VALID_NMSOCK(handle->sock));

	sock = handle->sock;

	switch (sock->type) {
	case isc_nm_tcpsocket:
	case isc_nm_tcpdnssocket:
		atomic_store(&sock->keepalive, value);
		sock->read_timeout = value ? atomic_load(&sock->mgr->keepalive)
					   : atomic_load(&sock->mgr->idle);
		sock->write_timeout = value ? atomic_load(&sock->mgr->keepalive)
					    : atomic_load(&sock->mgr->idle);
		break;
	default:
		/*
		 * For any other protocol, this is a no-op.
		 */
		return;
	}
}

void *
isc_nmhandle_getextra(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	return (handle->extra);
}

isc_sockaddr_t
isc_nmhandle_peeraddr(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	return (handle->peer);
}

isc_sockaddr_t
isc_nmhandle_localaddr(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	return (handle->local);
}

isc_nm_t *
isc_nmhandle_netmgr(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));
	REQUIRE(VALID_NMSOCK(handle->sock));

	return (handle->sock->mgr);
}

isc__nm_uvreq_t *
isc___nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock FLARG) {
	isc__nm_uvreq_t *req = NULL;

	REQUIRE(VALID_NM(mgr));
	REQUIRE(VALID_NMSOCK(sock));

	if (sock != NULL && isc__nmsocket_active(sock)) {
		/* Try to reuse one */
		req = isc_astack_pop(sock->inactivereqs);
	}

	if (req == NULL) {
		req = isc_mem_get(mgr->mctx, sizeof(*req));
	}

	*req = (isc__nm_uvreq_t){ .magic = 0 };
	ISC_LINK_INIT(req, link);
	req->uv_req.req.data = req;
	isc___nmsocket_attach(sock, &req->sock FLARG_PASS);
	req->magic = UVREQ_MAGIC;

	return (req);
}

void
isc___nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock FLARG) {
	isc__nm_uvreq_t *req = NULL;
	isc_nmhandle_t *handle = NULL;

	REQUIRE(req0 != NULL);
	REQUIRE(VALID_UVREQ(*req0));

	req = *req0;
	*req0 = NULL;

	INSIST(sock == req->sock);

	req->magic = 0;

	/*
	 * We need to save this first to make sure that handle,
	 * sock, and the netmgr won't all disappear.
	 */
	handle = req->handle;
	req->handle = NULL;

#if !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__
	if (!isc__nmsocket_active(sock) ||
	    !isc_astack_trypush(sock->inactivereqs, req))
	{
		isc_mem_put(sock->mgr->mctx, req, sizeof(*req));
	}
#else  /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
	isc_mem_put(sock->mgr->mctx, req, sizeof(*req));
#endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */

	if (handle != NULL) {
		isc__nmhandle_detach(&handle FLARG_PASS);
	}

	isc___nmsocket_detach(&sock FLARG_PASS);
}

void
isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
	    void *cbarg) {
	REQUIRE(VALID_NMHANDLE(handle));

	switch (handle->sock->type) {
	case isc_nm_udpsocket:
	case isc_nm_udplistener:
		isc__nm_udp_send(handle, region, cb, cbarg);
		break;
	case isc_nm_tcpsocket:
		isc__nm_tcp_send(handle, region, cb, cbarg);
		break;
	case isc_nm_tcpdnssocket:
		isc__nm_tcpdns_send(handle, region, cb, cbarg);
		break;
	default:
		UNREACHABLE();
	}
}

void
isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
	REQUIRE(VALID_NMHANDLE(handle));

	/*
	 * This is always called via callback (from accept or connect), and
	 * caller must attach to the handle, so the references always need to be
	 * at least 2.
	 */
	REQUIRE(isc_refcount_current(&handle->references) >= 2);

	switch (handle->sock->type) {
	case isc_nm_udpsocket:
		isc__nm_udp_read(handle, cb, cbarg);
		break;
	case isc_nm_tcpsocket:
		isc__nm_tcp_read(handle, cb, cbarg);
		break;
	case isc_nm_tcpdnssocket:
		isc__nm_tcpdns_read(handle, cb, cbarg);
		break;
	default:
		UNREACHABLE();
	}
}

void
isc_nm_cancelread(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	switch (handle->sock->type) {
	case isc_nm_udpsocket:
		isc__nm_udp_cancelread(handle);
		break;
	case isc_nm_tcpsocket:
		isc__nm_tcp_cancelread(handle);
		break;
	case isc_nm_tcpdnssocket:
		isc__nm_tcpdns_cancelread(handle);
		break;
	default:
		UNREACHABLE();
	}
}

void
isc_nm_pauseread(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	isc_nmsocket_t *sock = handle->sock;

	switch (sock->type) {
	case isc_nm_tcpsocket:
		isc__nm_tcp_pauseread(handle);
		break;
	default:
		UNREACHABLE();
	}
}

void
isc_nm_resumeread(isc_nmhandle_t *handle) {
	REQUIRE(VALID_NMHANDLE(handle));

	isc_nmsocket_t *sock = handle->sock;

	switch (sock->type) {
	case isc_nm_tcpsocket:
		isc__nm_tcp_resumeread(handle);
		break;
	default:
		UNREACHABLE();
	}
}

void
isc_nm_stoplistening(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));

	switch (sock->type) {
	case isc_nm_udplistener:
		isc__nm_udp_stoplistening(sock);
		break;
	case isc_nm_tcpdnslistener:
		isc__nm_tcpdns_stoplistening(sock);
		break;
	case isc_nm_tcplistener:
		isc__nm_tcp_stoplistening(sock);
		break;
	default:
		UNREACHABLE();
	}
}

void
isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
		  isc_result_t eresult, bool async) {
	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(VALID_UVREQ(uvreq));
	REQUIRE(VALID_NMHANDLE(uvreq->handle));

	if (!async) {
		isc__netievent_connectcb_t ievent = { .sock = sock,
						      .req = uvreq,
						      .result = eresult };
		isc__nm_async_connectcb(NULL, (isc__netievent_t *)&ievent);
	} else {
		isc__netievent_connectcb_t *ievent =
			isc__nm_get_netievent_connectcb(sock->mgr, sock, uvreq,
							eresult);
		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
				       (isc__netievent_t *)ievent);
	}
}

void
isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0) {
	isc__netievent_connectcb_t *ievent = (isc__netievent_connectcb_t *)ev0;
	isc_nmsocket_t *sock = ievent->sock;
	isc__nm_uvreq_t *uvreq = ievent->req;
	isc_result_t eresult = ievent->result;

	UNUSED(worker);

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(VALID_UVREQ(uvreq));
	REQUIRE(VALID_NMHANDLE(uvreq->handle));
	REQUIRE(ievent->sock->tid == isc_nm_tid());
	REQUIRE(uvreq->cb.connect != NULL);

	uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);

	isc__nm_uvreq_put(&uvreq, sock);
}

void
isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
	       isc_result_t eresult) {
	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(VALID_UVREQ(uvreq));
	REQUIRE(VALID_NMHANDLE(uvreq->handle));

	if (eresult == ISC_R_SUCCESS || eresult == ISC_R_TIMEDOUT) {
		isc__netievent_readcb_t ievent = { .sock = sock,
						   .req = uvreq,
						   .result = eresult };

		isc__nm_async_readcb(NULL, (isc__netievent_t *)&ievent);
	} else {
		isc__netievent_readcb_t *ievent = isc__nm_get_netievent_readcb(
			sock->mgr, sock, uvreq, eresult);
		isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
				       (isc__netievent_t *)ievent);
	}
}

void
isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0) {
	isc__netievent_readcb_t *ievent = (isc__netievent_readcb_t *)ev0;
	isc_nmsocket_t *sock = ievent->sock;
	isc__nm_uvreq_t *uvreq = ievent->req;
	isc_result_t eresult = ievent->result;
	isc_region_t region;

	UNUSED(worker);

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(VALID_UVREQ(uvreq));
	REQUIRE(VALID_NMHANDLE(uvreq->handle));
	REQUIRE(sock->tid == isc_nm_tid());

	region.base = (unsigned char *)uvreq->uvbuf.base;
	region.length = uvreq->uvbuf.len;

	uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);

	isc__nm_uvreq_put(&uvreq, sock);
}

void
isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
	       isc_result_t eresult, bool async) {
	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(VALID_UVREQ(uvreq));
	REQUIRE(VALID_NMHANDLE(uvreq->handle));

	if (!async) {
		isc__netievent_sendcb_t ievent = { .sock = sock,
						   .req = uvreq,
						   .result = eresult };
		isc__nm_async_sendcb(NULL, (isc__netievent_t *)&ievent);
		return;
	}

	isc__netievent_sendcb_t *ievent =
		isc__nm_get_netievent_sendcb(sock->mgr, sock, uvreq, eresult);
	isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
			       (isc__netievent_t *)ievent);
}

void
isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0) {
	isc__netievent_sendcb_t *ievent = (isc__netievent_sendcb_t *)ev0;
	isc_nmsocket_t *sock = ievent->sock;
	isc__nm_uvreq_t *uvreq = ievent->req;
	isc_result_t eresult = ievent->result;

	UNUSED(worker);

	REQUIRE(VALID_NMSOCK(sock));
	REQUIRE(VALID_UVREQ(uvreq));
	REQUIRE(VALID_NMHANDLE(uvreq->handle));
	REQUIRE(sock->tid == isc_nm_tid());

	uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg);

	isc__nm_uvreq_put(&uvreq, sock);
}

static void
isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0) {
	isc__netievent_close_t *ievent = (isc__netievent_close_t *)ev0;
	isc_nmsocket_t *sock = ievent->sock;

	REQUIRE(VALID_NMSOCK(ievent->sock));
	REQUIRE(sock->tid == isc_nm_tid());
	REQUIRE(sock->closehandle_cb != NULL);

	UNUSED(worker);

	ievent->sock->closehandle_cb(sock);
}

void
isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0) {
	isc__netievent_detach_t *ievent = (isc__netievent_detach_t *)ev0;
	FLARG_IEVENT(ievent);

	REQUIRE(VALID_NMSOCK(ievent->sock));
	REQUIRE(VALID_NMHANDLE(ievent->handle));
	REQUIRE(ievent->sock->tid == isc_nm_tid());

	UNUSED(worker);

	nmhandle_detach_cb(&ievent->handle FLARG_PASS);
}

static void
reset_shutdown(uv_handle_t *handle) {
	isc_nmsocket_t *sock = uv_handle_get_data(handle);

	isc__nmsocket_shutdown(sock);
	isc__nmsocket_detach(&sock);
}

void
isc__nmsocket_reset(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));

	switch (sock->type) {
	case isc_nm_tcpsocket:
	case isc_nm_tcpdnssocket:
		/*
		 * This can be called from the TCP write timeout.
		 */
		REQUIRE(sock->parent == NULL);
		break;
	default:
		UNREACHABLE();
		break;
	}

	if (!uv_is_closing(&sock->uv_handle.handle) &&
	    uv_is_active(&sock->uv_handle.handle))
	{
		/*
		 * The real shutdown will be handled in the respective
		 * close functions.
		 */
		isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL });
		int r = uv_tcp_close_reset(&sock->uv_handle.tcp,
					   reset_shutdown);
		UV_RUNTIME_CHECK(uv_tcp_close_reset, r);
	} else {
		isc__nmsocket_shutdown(sock);
	}
}

void
isc__nmsocket_shutdown(isc_nmsocket_t *sock) {
	REQUIRE(VALID_NMSOCK(sock));
	switch (sock->type) {
	case isc_nm_udpsocket:
		isc__nm_udp_shutdown(sock);
		break;
	case isc_nm_tcpsocket:
		isc__nm_tcp_shutdown(sock);
		break;
	case isc_nm_tcpdnssocket:
		isc__nm_tcpdns_shutdown(sock);
		break;
	case isc_nm_udplistener:
	case isc_nm_tcplistener:
	case isc_nm_tcpdnslistener:
		return;
	default:
		UNREACHABLE();
	}
}

static void
shutdown_walk_cb(uv_handle_t *handle, void *arg) {
	isc_nmsocket_t *sock = uv_handle_get_data(handle);
	UNUSED(arg);

	if (uv_is_closing(handle)) {
		return;
	}

	switch (handle->type) {
	case UV_UDP:
		isc__nmsocket_shutdown(sock);
		return;
	case UV_TCP:
		switch (sock->type) {
		case isc_nm_tcpsocket:
		case isc_nm_tcpdnssocket:
			if (sock->parent == NULL) {
				/* Reset the TCP connections on shutdown */
				isc__nmsocket_reset(sock);
				return;
			}
			FALLTHROUGH;
		default:
			isc__nmsocket_shutdown(sock);
		}

		return;
	default:
		return;
	}
}

void
isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) {
	UNUSED(ev0);

	uv_walk(&worker->loop, shutdown_walk_cb, NULL);
}

bool
isc__nm_acquire_interlocked(isc_nm_t *mgr) {
	if (!isc__nm_in_netthread()) {
		return (false);
	}

	LOCK(&mgr->lock);
	bool success = atomic_compare_exchange_strong(
		&mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED },
		isc_nm_tid());

	UNLOCK(&mgr->lock);
	return (success);
}

void
isc__nm_drop_interlocked(isc_nm_t *mgr) {
	if (!isc__nm_in_netthread()) {
		return;
	}

	LOCK(&mgr->lock);
	int tid = atomic_exchange(&mgr->interlocked,
				  ISC_NETMGR_NON_INTERLOCKED);
	INSIST(tid != ISC_NETMGR_NON_INTERLOCKED);
	BROADCAST(&mgr->wkstatecond);
	UNLOCK(&mgr->lock);
}

void
isc__nm_acquire_interlocked_force(isc_nm_t *mgr) {
	if (!isc__nm_in_netthread()) {
		return;
	}

	LOCK(&mgr->lock);
	while (!atomic_compare_exchange_strong(
		&mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED },
		isc_nm_tid()))
	{
		WAIT(&mgr->wkstatecond, &mgr->lock);
	}
	UNLOCK(&mgr->lock);
}

void
isc_nm_setstats(isc_nm_t *mgr, isc_stats_t *stats) {
	REQUIRE(VALID_NM(mgr));
	REQUIRE(mgr->stats == NULL);
	REQUIRE(isc_stats_ncounters(stats) == isc_sockstatscounter_max);

	isc_stats_attach(stats, &mgr->stats);
}

void
isc__nm_incstats(isc_nm_t *mgr, isc_statscounter_t counterid) {
	REQUIRE(VALID_NM(mgr));
	REQUIRE(counterid != -1);

	if (mgr->stats != NULL) {
		isc_stats_increment(mgr->stats, counterid);
	}
}

void
isc__nm_decstats(isc_nm_t *mgr, isc_statscounter_t counterid) {
	REQUIRE(VALID_NM(mgr));
	REQUIRE(counterid != -1);

	if (mgr->stats != NULL) {
		isc_stats_decrement(mgr->stats, counterid);
	}
}

isc_result_t
isc__nm_socket(int domain, int type, int protocol, uv_os_sock_t *sockp) {
#ifdef WIN32
	SOCKET sock;
	sock = socket(domain, type, protocol);
	if (sock == INVALID_SOCKET) {
		char strbuf[ISC_STRERRORSIZE];
		DWORD socket_errno = WSAGetLastError();
		switch (socket_errno) {
		case WSAEMFILE:
		case WSAENOBUFS:
			return (ISC_R_NORESOURCES);

		case WSAEPROTONOSUPPORT:
		case WSAEPFNOSUPPORT:
		case WSAEAFNOSUPPORT:
			return (ISC_R_FAMILYNOSUPPORT);
		default:
			strerror_r(socket_errno, strbuf, sizeof(strbuf));
			UNEXPECTED_ERROR(
				__FILE__, __LINE__,
				"socket() failed with error code %lu: %s",
				socket_errno, strbuf);
			return (ISC_R_UNEXPECTED);
		}
	}
#else
	int sock = socket(domain, type, protocol);
	if (sock < 0) {
		return (isc_errno_toresult(errno));
	}
#endif
	*sockp = (uv_os_sock_t)sock;
	return (ISC_R_SUCCESS);
}

void
isc__nm_closesocket(uv_os_sock_t sock) {
#ifdef WIN32
	closesocket(sock);
#else
	close(sock);
#endif
}

#define setsockopt_on(socket, level, name) \
	setsockopt(socket, level, name, &(int){ 1 }, sizeof(int))

#define setsockopt_off(socket, level, name) \
	setsockopt(socket, level, name, &(int){ 0 }, sizeof(int))

isc_result_t
isc__nm_socket_freebind(uv_os_sock_t fd, sa_family_t sa_family) {
	/*
	 * Set the IP_FREEBIND (or equivalent option) on the uv_handle.
	 */
#ifdef IP_FREEBIND
	UNUSED(sa_family);
	if (setsockopt_on(fd, IPPROTO_IP, IP_FREEBIND) == -1) {
		return (ISC_R_FAILURE);
	}
	return (ISC_R_SUCCESS);
#elif defined(IP_BINDANY) || defined(IPV6_BINDANY)
	if (sa_family == AF_INET) {
#if defined(IP_BINDANY)
		if (setsockopt_on(fd, IPPROTO_IP, IP_BINDANY) == -1) {
			return (ISC_R_FAILURE);
		}
		return (ISC_R_SUCCESS);
#endif
	} else if (sa_family == AF_INET6) {
#if defined(IPV6_BINDANY)
		if (setsockopt_on(fd, IPPROTO_IPV6, IPV6_BINDANY) == -1) {
			return (ISC_R_FAILURE);
		}
		return (ISC_R_SUCCESS);
#endif
	}
	return (ISC_R_NOTIMPLEMENTED);
#elif defined(SO_BINDANY)
	UNUSED(sa_family);
	if (setsockopt_on(fd, SOL_SOCKET, SO_BINDANY) == -1) {
		return (ISC_R_FAILURE);
	}
	return (ISC_R_SUCCESS);
#else
	UNUSED(fd);
	UNUSED(sa_family);
	return (ISC_R_NOTIMPLEMENTED);
#endif
}

isc_result_t
isc__nm_socket_reuse(uv_os_sock_t fd) {
	/*
	 * Generally, the SO_REUSEADDR socket option allows reuse of
	 * local addresses.
	 *
	 * On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some
	 * additional refinements for programs that use multicast.
	 *
	 * On Linux, SO_REUSEPORT has different semantics: it _shares_ the port
	 * rather than steal it from the current listener, so we don't use it
	 * here, but rather in isc__nm_socket_reuse_lb().
	 *
	 * On Windows, it also allows a socket to forcibly bind to a port in use
	 * by another socket.
	 */

#if defined(SO_REUSEPORT) && !defined(__linux__)
	if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT) == -1) {
		return (ISC_R_FAILURE);
	}
	return (ISC_R_SUCCESS);
#elif defined(SO_REUSEADDR)
	if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEADDR) == -1) {
		return (ISC_R_FAILURE);
	}
	return (ISC_R_SUCCESS);
#else
	UNUSED(fd);
	return (ISC_R_NOTIMPLEMENTED);
#endif
}

isc_result_t
isc__nm_socket_reuse_lb(uv_os_sock_t fd) {
	/*
	 * On FreeBSD 12+, SO_REUSEPORT_LB socket option allows sockets to be
	 * bound to an identical socket address. For UDP sockets, the use of
	 * this option can provide better distribution of incoming datagrams to
	 * multiple processes (or threads) as compared to the traditional
	 * technique of having multiple processes compete to receive datagrams
	 * on the same socket.
	 *
	 * On Linux, the same thing is achieved simply with SO_REUSEPORT.
	 */
#if defined(SO_REUSEPORT_LB)
	if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT_LB) == -1) {
		return (ISC_R_FAILURE);
	} else {
		return (ISC_R_SUCCESS);
	}
#elif defined(SO_REUSEPORT) && defined(__linux__)
	if (setsockopt_on(fd, SOL_SOCKET, SO_REUSEPORT) == -1) {
		return (ISC_R_FAILURE);
	} else {
		return (ISC_R_SUCCESS);
	}
#else
	UNUSED(fd);
	return (ISC_R_NOTIMPLEMENTED);
#endif
}

isc_result_t
isc__nm_socket_incoming_cpu(uv_os_sock_t fd) {
#ifdef SO_INCOMING_CPU
	if (setsockopt_on(fd, SOL_SOCKET, SO_INCOMING_CPU) == -1) {
		return (ISC_R_FAILURE);
	} else {
		return (ISC_R_SUCCESS);
	}
#else
	UNUSED(fd);
#endif
	return (ISC_R_NOTIMPLEMENTED);
}

isc_result_t
isc__nm_socket_disable_pmtud(uv_os_sock_t fd, sa_family_t sa_family) {
	/*
	 * Disable the Path MTU Discovery on IP packets
	 */
	if (sa_family == AF_INET6) {
#if defined(IPV6_DONTFRAG)
		if (setsockopt_off(fd, IPPROTO_IPV6, IPV6_DONTFRAG) == -1) {
			return (ISC_R_FAILURE);
		} else {
			return (ISC_R_SUCCESS);
		}
#elif defined(IPV6_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT)
		if (setsockopt(fd, IPPROTO_IPV6, IPV6_MTU_DISCOVER,
			       &(int){ IP_PMTUDISC_OMIT }, sizeof(int)) == -1)
		{
			return (ISC_R_FAILURE);
		} else {
			return (ISC_R_SUCCESS);
		}
#else
		UNUSED(fd);
#endif
	} else if (sa_family == AF_INET) {
#if defined(IP_DONTFRAG)
		if (setsockopt_off(fd, IPPROTO_IP, IP_DONTFRAG) == -1) {
			return (ISC_R_FAILURE);
		} else {
			return (ISC_R_SUCCESS);
		}
#elif defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_OMIT)
		if (setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER,
			       &(int){ IP_PMTUDISC_OMIT }, sizeof(int)) == -1)
		{
			return (ISC_R_FAILURE);
		} else {
			return (ISC_R_SUCCESS);
		}
#else
		UNUSED(fd);
#endif
	} else {
		return (ISC_R_FAMILYNOSUPPORT);
	}

	return (ISC_R_NOTIMPLEMENTED);
}

#if defined(_WIN32)
#define TIMEOUT_TYPE	DWORD
#define TIMEOUT_DIV	1000
#define TIMEOUT_OPTNAME TCP_MAXRT
#elif defined(TCP_CONNECTIONTIMEOUT)
#define TIMEOUT_TYPE	int
#define TIMEOUT_DIV	1000
#define TIMEOUT_OPTNAME TCP_CONNECTIONTIMEOUT
#elif defined(TCP_RXT_CONNDROPTIME)
#define TIMEOUT_TYPE	int
#define TIMEOUT_DIV	1000
#define TIMEOUT_OPTNAME TCP_RXT_CONNDROPTIME
#elif defined(TCP_USER_TIMEOUT)
#define TIMEOUT_TYPE	unsigned int
#define TIMEOUT_DIV	1
#define TIMEOUT_OPTNAME TCP_USER_TIMEOUT
#elif defined(TCP_KEEPINIT)
#define TIMEOUT_TYPE	int
#define TIMEOUT_DIV	1000
#define TIMEOUT_OPTNAME TCP_KEEPINIT
#endif

isc_result_t
isc__nm_socket_connectiontimeout(uv_os_sock_t fd, int timeout_ms) {
#if defined(TIMEOUT_OPTNAME)
	TIMEOUT_TYPE timeout = timeout_ms / TIMEOUT_DIV;

	if (timeout == 0) {
		timeout = 1;
	}

	if (setsockopt(fd, IPPROTO_TCP, TIMEOUT_OPTNAME, &timeout,
		       sizeof(timeout)) == -1)
	{
		return (ISC_R_FAILURE);
	}

	return (ISC_R_SUCCESS);
#else
	UNUSED(fd);
	UNUSED(timeout_ms);

	return (ISC_R_SUCCESS);
#endif
}

isc_result_t
isc__nm_socket_tcp_nodelay(uv_os_sock_t fd) {
#ifdef TCP_NODELAY
	if (setsockopt_on(fd, IPPROTO_TCP, TCP_NODELAY) == -1) {
		return (ISC_R_FAILURE);
	} else {
		return (ISC_R_SUCCESS);
	}
#else
	UNUSED(fd);
	return (ISC_R_SUCCESS);
#endif
}

static isc_threadresult_t
isc__nm_work_run(isc_threadarg_t arg) {
	isc__nm_work_t *work = (isc__nm_work_t *)arg;

	work->cb(work->data);

	return ((isc_threadresult_t)0);
}

static void
isc__nm_work_cb(uv_work_t *req) {
	isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);

	if (isc_tid_v == SIZE_MAX) {
		isc__trampoline_t *trampoline_arg =
			isc__trampoline_get(isc__nm_work_run, work);
		(void)isc__trampoline_run(trampoline_arg);
	} else {
		(void)isc__nm_work_run((isc_threadarg_t)work);
	}
}

static void
isc__nm_after_work_cb(uv_work_t *req, int status) {
	isc_result_t result = ISC_R_SUCCESS;
	isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
	isc_nm_t *netmgr = work->netmgr;

	if (status != 0) {
		result = isc__nm_uverr2result(status);
	}

	work->after_cb(work->data, result);

	isc_mem_put(netmgr->mctx, work, sizeof(*work));

	isc_nm_detach(&netmgr);
}

void
isc_nm_work_offload(isc_nm_t *netmgr, isc_nm_workcb_t work_cb,
		    isc_nm_after_workcb_t after_work_cb, void *data) {
	isc__networker_t *worker = NULL;
	isc__nm_work_t *work = NULL;
	int r;

	REQUIRE(isc__nm_in_netthread());
	REQUIRE(VALID_NM(netmgr));

	worker = &netmgr->workers[isc_nm_tid()];

	work = isc_mem_get(netmgr->mctx, sizeof(*work));
	*work = (isc__nm_work_t){
		.cb = work_cb,
		.after_cb = after_work_cb,
		.data = data,
	};

	isc_nm_attach(netmgr, &work->netmgr);

	uv_req_set_data((uv_req_t *)&work->req, work);

	r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb,
			  isc__nm_after_work_cb);
	UV_RUNTIME_CHECK(uv_queue_work, r);
}

void
isc_nm_timer_create(isc_nmhandle_t *handle, isc_nm_timer_cb cb, void *cbarg,
		    isc_nm_timer_t **timerp) {
	isc__networker_t *worker = NULL;
	isc_nmsocket_t *sock = NULL;
	isc_nm_timer_t *timer = NULL;
	int r;

	REQUIRE(isc__nm_in_netthread());
	REQUIRE(VALID_NMHANDLE(handle));
	REQUIRE(VALID_NMSOCK(handle->sock));

	sock = handle->sock;
	worker = &sock->mgr->workers[isc_nm_tid()];

	timer = isc_mem_get(sock->mgr->mctx, sizeof(*timer));
	*timer = (isc_nm_timer_t){ .cb = cb, .cbarg = cbarg };
	isc_refcount_init(&timer->references, 1);
	isc_nmhandle_attach(handle, &timer->handle);

	r = uv_timer_init(&worker->loop, &timer->timer);
	UV_RUNTIME_CHECK(uv_timer_init, r);

	uv_handle_set_data((uv_handle_t *)&timer->timer, timer);

	*timerp = timer;
}

void
isc_nm_timer_attach(isc_nm_timer_t *timer, isc_nm_timer_t **timerp) {
	REQUIRE(timer != NULL);
	REQUIRE(timerp != NULL && *timerp == NULL);

	isc_refcount_increment(&timer->references);
	*timerp = timer;
}

static void
timer_destroy(uv_handle_t *uvhandle) {
	isc_nm_timer_t *timer = uv_handle_get_data(uvhandle);
	isc_nmhandle_t *handle = timer->handle;
	isc_mem_t *mctx = timer->handle->sock->mgr->mctx;

	isc_mem_put(mctx, timer, sizeof(*timer));

	isc_nmhandle_detach(&handle);
}

void
isc_nm_timer_detach(isc_nm_timer_t **timerp) {
	isc_nm_timer_t *timer = NULL;
	isc_nmhandle_t *handle = NULL;

	REQUIRE(timerp != NULL && *timerp != NULL);

	timer = *timerp;
	*timerp = NULL;

	handle = timer->handle;

	REQUIRE(isc__nm_in_netthread());
	REQUIRE(VALID_NMHANDLE(handle));
	REQUIRE(VALID_NMSOCK(handle->sock));

	if (isc_refcount_decrement(&timer->references) == 1) {
		int r = uv_timer_stop(&timer->timer);
		UV_RUNTIME_CHECK(uv_timer_stop, r);
		uv_close((uv_handle_t *)&timer->timer, timer_destroy);
	}
}

static void
timer_cb(uv_timer_t *uvtimer) {
	isc_nm_timer_t *timer = uv_handle_get_data((uv_handle_t *)uvtimer);

	REQUIRE(timer->cb != NULL);

	timer->cb(timer->cbarg, ISC_R_TIMEDOUT);
}

void
isc_nm_timer_start(isc_nm_timer_t *timer, uint64_t timeout) {
	int r = uv_timer_start(&timer->timer, timer_cb, timeout, 0);
	UV_RUNTIME_CHECK(uv_timer_start, r);
}

void
isc_nm_timer_stop(isc_nm_timer_t *timer) {
	int r = uv_timer_stop(&timer->timer);
	UV_RUNTIME_CHECK(uv_timer_stop, r);
}

#ifdef NETMGR_TRACE
/*
 * Dump all active sockets in netmgr. We output to stderr
 * as the logger might be already shut down.
 */

static const char *
nmsocket_type_totext(isc_nmsocket_type type) {
	switch (type) {
	case isc_nm_udpsocket:
		return ("isc_nm_udpsocket");
	case isc_nm_udplistener:
		return ("isc_nm_udplistener");
	case isc_nm_tcpsocket:
		return ("isc_nm_tcpsocket");
	case isc_nm_tcplistener:
		return ("isc_nm_tcplistener");
	case isc_nm_tcpdnslistener:
		return ("isc_nm_tcpdnslistener");
	case isc_nm_tcpdnssocket:
		return ("isc_nm_tcpdnssocket");
	default:
		UNREACHABLE();
	}
}

static void
nmhandle_dump(isc_nmhandle_t *handle) {
	fprintf(stderr, "Active handle %p, refs %" PRIuFAST32 "\n", handle,
		isc_refcount_current(&handle->references));
	fprintf(stderr, "Created by:\n");
	backtrace_symbols_fd(handle->backtrace, handle->backtrace_size,
			     STDERR_FILENO);
	fprintf(stderr, "\n\n");
}

static void
nmsocket_dump(isc_nmsocket_t *sock) {
	isc_nmhandle_t *handle = NULL;

	LOCK(&sock->lock);
	fprintf(stderr, "\n=================\n");
	fprintf(stderr, "Active %s socket %p, type %s, refs %" PRIuFAST32 "\n",
		atomic_load(&sock->client) ? "client" : "server", sock,
		nmsocket_type_totext(sock->type),
		isc_refcount_current(&sock->references));
	fprintf(stderr,
		"Parent %p, listener %p, server %p, statichandle = "
		"%p\n",
		sock->parent, sock->listener, sock->server, sock->statichandle);
	fprintf(stderr, "Flags:%s%s%s%s%s\n",
		atomic_load(&sock->active) ? " active" : "",
		atomic_load(&sock->closing) ? " closing" : "",
		atomic_load(&sock->destroying) ? " destroying" : "",
		atomic_load(&sock->connecting) ? " connecting" : "",
		sock->accepting ? " accepting" : "");
	fprintf(stderr, "Created by:\n");
	backtrace_symbols_fd(sock->backtrace, sock->backtrace_size,
			     STDERR_FILENO);
	fprintf(stderr, "\n");

	for (handle = ISC_LIST_HEAD(sock->active_handles); handle != NULL;
	     handle = ISC_LIST_NEXT(handle, active_link))
	{
		static bool first = true;
		if (first) {
			fprintf(stderr, "Active handles:\n");
			first = false;
		}
		nmhandle_dump(handle);
	}

	fprintf(stderr, "\n");
	UNLOCK(&sock->lock);
}

void
isc__nm_dump_active(isc_nm_t *nm) {
	isc_nmsocket_t *sock = NULL;

	REQUIRE(VALID_NM(nm));

	LOCK(&nm->lock);
	for (sock = ISC_LIST_HEAD(nm->active_sockets); sock != NULL;
	     sock = ISC_LIST_NEXT(sock, active_link))
	{
		static bool first = true;
		if (first) {
			fprintf(stderr, "Outstanding sockets\n");
			first = false;
		}
		nmsocket_dump(sock);
	}
	UNLOCK(&nm->lock);
}
#endif
