/* $NetBSD: udp.c,v 1.1 2024/02/18 20:57:55 christos Exp $ */ /* * Copyright (C) Internet Systems Consortium, Inc. ("ISC") * * SPDX-License-Identifier: MPL-2.0 * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, you can obtain one at https://mozilla.org/MPL/2.0/. * * See the COPYRIGHT file distributed with this work for additional * information regarding copyright ownership. */ #include <unistd.h> #include <uv.h> #include <isc/atomic.h> #include <isc/barrier.h> #include <isc/buffer.h> #include <isc/condition.h> #include <isc/errno.h> #include <isc/magic.h> #include <isc/mem.h> #include <isc/netmgr.h> #include <isc/random.h> #include <isc/refcount.h> #include <isc/region.h> #include <isc/result.h> #include <isc/sockaddr.h> #include <isc/thread.h> #include <isc/util.h> #include "netmgr-int.h" #include "uv-compat.h" static isc_result_t udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, isc_sockaddr_t *peer); static void udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags); static void udp_send_cb(uv_udp_send_t *req, int status); static void udp_close_cb(uv_handle_t *handle); static void read_timer_close_cb(uv_handle_t *handle); static void udp_close_direct(isc_nmsocket_t *sock); static void stop_udp_parent(isc_nmsocket_t *sock); static void stop_udp_child(isc_nmsocket_t *sock); static uv_os_sock_t isc__nm_udp_lb_socket(isc_nm_t *mgr, sa_family_t sa_family) { isc_result_t result; uv_os_sock_t sock; result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock); RUNTIME_CHECK(result == ISC_R_SUCCESS); (void)isc__nm_socket_incoming_cpu(sock); (void)isc__nm_socket_disable_pmtud(sock, sa_family); result = isc__nm_socket_reuse(sock); RUNTIME_CHECK(result == ISC_R_SUCCESS); #ifndef _WIN32 if (mgr->load_balance_sockets) { result = isc__nm_socket_reuse_lb(sock); RUNTIME_CHECK(result == ISC_R_SUCCESS); } #endif return (sock); } static void start_udp_child(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nmsocket_t *sock, uv_os_sock_t fd, int tid) { isc_nmsocket_t *csock; isc__netievent_udplisten_t *ievent = NULL; csock = &sock->children[tid]; isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface); csock->parent = sock; csock->iface = sock->iface; csock->reading = true; csock->recv_cb = sock->recv_cb; csock->recv_cbarg = sock->recv_cbarg; csock->extrahandlesize = sock->extrahandlesize; csock->tid = tid; #ifdef _WIN32 UNUSED(fd); csock->fd = isc__nm_udp_lb_socket(mgr, iface->type.sa.sa_family); #else if (mgr->load_balance_sockets) { UNUSED(fd); csock->fd = isc__nm_udp_lb_socket(mgr, iface->type.sa.sa_family); } else { csock->fd = dup(fd); } #endif REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_udplisten(mgr, csock); isc__nm_maybe_enqueue_ievent(&mgr->workers[tid], (isc__netievent_t *)ievent); } static void enqueue_stoplistening(isc_nmsocket_t *sock) { isc__netievent_udpstop_t *ievent = isc__nm_get_netievent_udpstop(sock->mgr, sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } isc_result_t isc_nm_listenudp(isc_nm_t *mgr, isc_sockaddr_t *iface, isc_nm_recv_cb_t cb, void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) { isc_result_t result = ISC_R_SUCCESS; isc_nmsocket_t *sock = NULL; size_t children_size = 0; REQUIRE(VALID_NM(mgr)); uv_os_sock_t fd = -1; /* * We are creating mgr->nlisteners duplicated sockets, one * socket for each worker thread. */ sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t)); isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface); atomic_init(&sock->rchildren, 0); #if defined(WIN32) sock->nchildren = 1; #else sock->nchildren = mgr->nlisteners; #endif children_size = sock->nchildren * sizeof(sock->children[0]); sock->children = isc_mem_get(mgr->mctx, children_size); memset(sock->children, 0, children_size); sock->recv_cb = cb; sock->recv_cbarg = cbarg; sock->extrahandlesize = extrahandlesize; sock->result = ISC_R_UNSET; sock->tid = 0; sock->fd = -1; #ifndef _WIN32 if (!mgr->load_balance_sockets) { fd = isc__nm_udp_lb_socket(mgr, iface->type.sa.sa_family); } #endif isc_barrier_init(&sock->startlistening, sock->nchildren); for (size_t i = 0; i < sock->nchildren; i++) { if ((int)i == isc_nm_tid()) { continue; } start_udp_child(mgr, iface, sock, fd, i); } if (isc__nm_in_netthread()) { start_udp_child(mgr, iface, sock, fd, isc_nm_tid()); } #ifndef _WIN32 if (!mgr->load_balance_sockets) { isc__nm_closesocket(fd); } #endif LOCK(&sock->lock); while (atomic_load(&sock->rchildren) != sock->nchildren) { WAIT(&sock->cond, &sock->lock); } result = sock->result; atomic_store(&sock->active, true); UNLOCK(&sock->lock); INSIST(result != ISC_R_UNSET); if (result == ISC_R_SUCCESS) { REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren); *sockp = sock; } else { atomic_store(&sock->active, false); enqueue_stoplistening(sock); isc_nmsocket_close(&sock); } return (result); } /* * Asynchronous 'udplisten' call handler: start listening on a UDP socket. */ void isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udplisten_t *ievent = (isc__netievent_udplisten_t *)ev0; isc_nmsocket_t *sock = NULL; int r, uv_bind_flags = 0; int uv_init_flags = 0; sa_family_t sa_family; isc_result_t result = ISC_R_UNSET; isc_nm_t *mgr = NULL; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); REQUIRE(VALID_NMSOCK(ievent->sock->parent)); sock = ievent->sock; sa_family = sock->iface.type.sa.sa_family; mgr = sock->mgr; REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->parent != NULL); REQUIRE(sock->tid == isc_nm_tid()); #if HAVE_DECL_UV_UDP_RECVMMSG uv_init_flags |= UV_UDP_RECVMMSG; #endif r = uv_udp_init_ex(&worker->loop, &sock->uv_handle.udp, uv_init_flags); UV_RUNTIME_CHECK(uv_udp_init_ex, r); uv_handle_set_data(&sock->uv_handle.handle, sock); /* This keeps the socket alive after everything else is gone */ isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL }); r = uv_timer_init(&worker->loop, &sock->read_timer); UV_RUNTIME_CHECK(uv_timer_init, r); uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock); LOCK(&sock->parent->lock); r = uv_udp_open(&sock->uv_handle.udp, sock->fd); if (r < 0) { isc__nm_closesocket(sock->fd); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); goto done; } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]); if (sa_family == AF_INET6) { uv_bind_flags |= UV_UDP_IPV6ONLY; } #ifdef _WIN32 r = isc_uv_udp_freebind(&sock->uv_handle.udp, &sock->parent->iface.type.sa, uv_bind_flags); if (r < 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); goto done; } #else if (mgr->load_balance_sockets) { r = isc_uv_udp_freebind(&sock->uv_handle.udp, &sock->parent->iface.type.sa, uv_bind_flags); if (r < 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); goto done; } } else { if (sock->parent->fd == -1) { /* This thread is first, bind the socket */ r = isc_uv_udp_freebind(&sock->uv_handle.udp, &sock->parent->iface.type.sa, uv_bind_flags); if (r < 0) { isc__nm_incstats(sock->mgr, STATID_BINDFAIL); goto done; } sock->parent->uv_handle.udp.flags = sock->uv_handle.udp.flags; sock->parent->fd = sock->fd; } else { /* The socket is already bound, just copy the flags */ sock->uv_handle.udp.flags = sock->parent->uv_handle.udp.flags; } } #endif #ifdef ISC_RECV_BUFFER_SIZE uv_recv_buffer_size(&sock->uv_handle.handle, &(int){ ISC_RECV_BUFFER_SIZE }); #endif #ifdef ISC_SEND_BUFFER_SIZE uv_send_buffer_size(&sock->uv_handle.handle, &(int){ ISC_SEND_BUFFER_SIZE }); #endif r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb, udp_recv_cb); if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); goto done; } atomic_store(&sock->listening, true); done: result = isc__nm_uverr2result(r); atomic_fetch_add(&sock->parent->rchildren, 1); if (sock->parent->result == ISC_R_UNSET) { sock->parent->result = result; } SIGNAL(&sock->parent->cond); UNLOCK(&sock->parent->lock); isc_barrier_wait(&sock->parent->startlistening); } void isc__nm_udp_stoplistening(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udplistener); if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, true)) { UNREACHABLE(); } if (!isc__nm_in_netthread()) { enqueue_stoplistening(sock); } else { stop_udp_parent(sock); } } /* * Asynchronous 'udpstop' call handler: stop listening on a UDP socket. */ void isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpstop_t *ievent = (isc__netievent_udpstop_t *)ev0; isc_nmsocket_t *sock = ievent->sock; UNUSED(worker); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); if (sock->parent != NULL) { stop_udp_child(sock); return; } stop_udp_parent(sock); } /* * udp_recv_cb handles incoming UDP packet from uv. The buffer here is * reused for a series of packets, so we need to allocate a new one. * This new one can be reused to send the response then. */ static void udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); isc__nm_uvreq_t *req = NULL; uint32_t maxudp; isc_sockaddr_t sockaddr; isc_result_t result; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->reading); /* * When using recvmmsg(2), if no errors occur, there will be a final * callback with nrecv set to 0, addr set to NULL and the buffer * pointing at the initially allocated data with the UV_UDP_MMSG_CHUNK * flag cleared and the UV_UDP_MMSG_FREE flag set. */ #if HAVE_DECL_UV_UDP_MMSG_FREE if ((flags & UV_UDP_MMSG_FREE) == UV_UDP_MMSG_FREE) { INSIST(nrecv == 0); INSIST(addr == NULL); goto free; } #else UNUSED(flags); #endif /* * - If we're simulating a firewall blocking UDP packets * bigger than 'maxudp' bytes for testing purposes. */ maxudp = atomic_load(&sock->mgr->maxudp); if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) { /* * We need to keep the read_cb intact in case, so the * readtimeout_cb can trigger and not crash because of * missing read_req. */ goto free; } /* * - If addr == NULL, in which case it's the end of stream; * we can free the buffer and bail. */ if (addr == NULL) { isc__nm_failed_read_cb(sock, ISC_R_EOF, false); goto free; } /* * - If the socket is no longer active. */ if (!isc__nmsocket_active(sock)) { isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false); goto free; } if (nrecv < 0) { isc__nm_failed_read_cb(sock, isc__nm_uverr2result(nrecv), false); goto free; } result = isc_sockaddr_fromsockaddr(&sockaddr, addr); RUNTIME_CHECK(result == ISC_R_SUCCESS); req = isc__nm_get_read_req(sock, &sockaddr); /* * The callback will be called synchronously, because result is * ISC_R_SUCCESS, so we are ok of passing the buf directly. */ req->uvbuf.base = buf->base; req->uvbuf.len = nrecv; sock->recv_read = false; REQUIRE(!sock->processing); sock->processing = true; isc__nm_readcb(sock, req, ISC_R_SUCCESS); sock->processing = false; free: #if HAVE_DECL_UV_UDP_MMSG_CHUNK /* * When using recvmmsg(2), chunks will have the UV_UDP_MMSG_CHUNK flag * set, those must not be freed. */ if ((flags & UV_UDP_MMSG_CHUNK) == UV_UDP_MMSG_CHUNK) { return; } #endif /* * When using recvmmsg(2), if a UDP socket error occurs, nrecv will be < * 0. In either scenario, the callee can now safely free the provided * buffer. */ if (nrecv < 0) { /* * The buffer may be a null buffer on error. */ if (buf->base == NULL && buf->len == 0) { return; } } isc__nm_free_uvbuf(sock, buf); } /* * Send the data in 'region' to a peer via a UDP socket. We try to find * a proper sibling/child socket so that we won't have to jump to * another thread. */ void isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb, void *cbarg) { isc_nmsocket_t *sock = handle->sock; isc_nmsocket_t *rsock = NULL; isc_sockaddr_t *peer = &handle->peer; isc__nm_uvreq_t *uvreq = NULL; uint32_t maxudp = atomic_load(&sock->mgr->maxudp); int ntid; INSIST(sock->type == isc_nm_udpsocket); /* * We're simulating a firewall blocking UDP packets bigger than * 'maxudp' bytes, for testing purposes. * * The client would ordinarily have unreferenced the handle * in the callback, but that won't happen in this case, so * we need to do so here. */ if (maxudp != 0 && region->length > maxudp) { isc_nmhandle_detach(&handle); return; } if (atomic_load(&sock->client)) { /* * When we are sending from the client socket, we directly use * the socket provided. */ rsock = sock; goto send; } else { /* * When we are sending from the server socket, we either use the * socket associated with the network thread we are in, or we * use the thread from the socket associated with the handle. */ INSIST(sock->parent != NULL); #if defined(WIN32) /* On Windows, we have only a single listening listener */ rsock = sock; #else if (isc__nm_in_netthread()) { ntid = isc_nm_tid(); } else { ntid = sock->tid; } rsock = &sock->parent->children[ntid]; #endif } send: uvreq = isc__nm_uvreq_get(rsock->mgr, rsock); uvreq->uvbuf.base = (char *)region->base; uvreq->uvbuf.len = region->length; isc_nmhandle_attach(handle, &uvreq->handle); uvreq->cb.send = cb; uvreq->cbarg = cbarg; if (isc_nm_tid() == rsock->tid) { REQUIRE(rsock->tid == isc_nm_tid()); isc__netievent_udpsend_t ievent = { .sock = rsock, .req = uvreq, .peer = *peer }; isc__nm_async_udpsend(NULL, (isc__netievent_t *)&ievent); } else { isc__netievent_udpsend_t *ievent = isc__nm_get_netievent_udpsend(sock->mgr, rsock); ievent->peer = *peer; ievent->req = uvreq; isc__nm_enqueue_ievent(&sock->mgr->workers[rsock->tid], (isc__netievent_t *)ievent); } } /* * Asynchronous 'udpsend' event handler: send a packet on a UDP socket. */ void isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) { isc_result_t result; isc__netievent_udpsend_t *ievent = (isc__netievent_udpsend_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *uvreq = ievent->req; REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->tid == isc_nm_tid()); UNUSED(worker); if (isc__nmsocket_closing(sock)) { isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED); return; } result = udp_send_direct(sock, uvreq, &ievent->peer); if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); isc__nm_failed_send_cb(sock, uvreq, result); } } static void udp_send_cb(uv_udp_send_t *req, int status) { isc_result_t result = ISC_R_SUCCESS; isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req); isc_nmsocket_t *sock = NULL; REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); sock = uvreq->sock; REQUIRE(sock->tid == isc_nm_tid()); if (status < 0) { result = isc__nm_uverr2result(status); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); } isc__nm_sendcb(sock, uvreq, result, false); } /* * udp_send_direct sends buf to a peer on a socket. Sock has to be in * the same thread as the callee. */ static isc_result_t udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, isc_sockaddr_t *peer) { const struct sockaddr *sa = &peer->type.sa; int r; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(req)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_udpsocket); if (isc__nmsocket_closing(sock)) { return (ISC_R_CANCELED); } #if UV_VERSION_HEX >= UV_VERSION(1, 27, 0) /* * If we used uv_udp_connect() (and not the shim version for * older versions of libuv), then the peer address has to be * set to NULL or else uv_udp_send() could fail or assert, * depending on the libuv version. */ if (atomic_load(&sock->connected)) { sa = NULL; } #endif r = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp, &req->uvbuf, 1, sa, udp_send_cb); if (r < 0) { return (isc__nm_uverr2result(r)); } return (ISC_R_SUCCESS); } static isc_result_t udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; int uv_bind_flags = UV_UDP_REUSEADDR; isc_result_t result = ISC_R_UNSET; int tries = 3; int r; REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->tid == isc_nm_tid()); worker = &sock->mgr->workers[isc_nm_tid()]; atomic_store(&sock->connecting, true); r = uv_udp_init(&worker->loop, &sock->uv_handle.udp); UV_RUNTIME_CHECK(uv_udp_init, r); uv_handle_set_data(&sock->uv_handle.handle, sock); r = uv_timer_init(&worker->loop, &sock->read_timer); UV_RUNTIME_CHECK(uv_timer_init, r); uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock); r = uv_udp_open(&sock->uv_handle.udp, sock->fd); if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); goto done; } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]); if (sock->iface.type.sa.sa_family == AF_INET6) { uv_bind_flags |= UV_UDP_IPV6ONLY; } r = uv_udp_bind(&sock->uv_handle.udp, &sock->iface.type.sa, uv_bind_flags); if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); goto done; } #ifdef ISC_RECV_BUFFER_SIZE uv_recv_buffer_size(&sock->uv_handle.handle, &(int){ ISC_RECV_BUFFER_SIZE }); #endif #ifdef ISC_SEND_BUFFER_SIZE uv_send_buffer_size(&sock->uv_handle.handle, &(int){ ISC_SEND_BUFFER_SIZE }); #endif /* * On FreeBSD the UDP connect() call sometimes results in a * spurious transient EADDRINUSE. Try a few more times before * giving up. */ do { r = isc_uv_udp_connect(&sock->uv_handle.udp, &req->peer.type.sa); } while (r == UV_EADDRINUSE && --tries > 0); if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECTFAIL]); goto done; } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); atomic_store(&sock->connecting, false); atomic_store(&sock->connected, true); done: result = isc__nm_uverr2result(r); LOCK(&sock->lock); sock->result = result; SIGNAL(&sock->cond); if (!atomic_load(&sock->active)) { WAIT(&sock->scond, &sock->lock); } INSIST(atomic_load(&sock->active)); UNLOCK(&sock->lock); return (result); } /* * Asynchronous 'udpconnect' call handler: open a new UDP socket and * call the 'open' callback with a handle. */ void isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpconnect_t *ievent = (isc__netievent_udpconnect_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *req = ievent->req; isc_result_t result; UNUSED(worker); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->parent == NULL); REQUIRE(sock->tid == isc_nm_tid()); result = udp_connect_direct(sock, req); if (result != ISC_R_SUCCESS) { atomic_store(&sock->active, false); isc__nm_udp_close(sock); isc__nm_connectcb(sock, req, result, true); } else { /* * The callback has to be called after the socket has been * initialized */ isc__nm_connectcb(sock, req, ISC_R_SUCCESS, true); } /* * The sock is now attached to the handle. */ isc__nmsocket_detach(&sock); } void isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout, size_t extrahandlesize) { isc_result_t result = ISC_R_SUCCESS; isc_nmsocket_t *sock = NULL; isc__netievent_udpconnect_t *event = NULL; isc__nm_uvreq_t *req = NULL; sa_family_t sa_family; REQUIRE(VALID_NM(mgr)); REQUIRE(local != NULL); REQUIRE(peer != NULL); sa_family = peer->type.sa.sa_family; sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t)); isc__nmsocket_init(sock, mgr, isc_nm_udpsocket, local); sock->connect_cb = cb; sock->connect_cbarg = cbarg; sock->read_timeout = timeout; sock->extrahandlesize = extrahandlesize; sock->peer = *peer; sock->result = ISC_R_UNSET; atomic_init(&sock->client, true); req = isc__nm_uvreq_get(mgr, sock); req->cb.connect = cb; req->cbarg = cbarg; req->peer = *peer; req->local = *local; req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface); result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock->fd); if (result != ISC_R_SUCCESS) { if (isc__nm_in_netthread()) { sock->tid = isc_nm_tid(); } isc__nmsocket_clearcb(sock); isc__nm_connectcb(sock, req, result, true); atomic_store(&sock->closed, true); isc__nmsocket_detach(&sock); return; } result = isc__nm_socket_reuse(sock->fd); RUNTIME_CHECK(result == ISC_R_SUCCESS || result == ISC_R_NOTIMPLEMENTED); result = isc__nm_socket_reuse_lb(sock->fd); RUNTIME_CHECK(result == ISC_R_SUCCESS || result == ISC_R_NOTIMPLEMENTED); (void)isc__nm_socket_incoming_cpu(sock->fd); (void)isc__nm_socket_disable_pmtud(sock->fd, sa_family); event = isc__nm_get_netievent_udpconnect(mgr, sock, req); if (isc__nm_in_netthread()) { atomic_store(&sock->active, true); sock->tid = isc_nm_tid(); isc__nm_async_udpconnect(&mgr->workers[sock->tid], (isc__netievent_t *)event); isc__nm_put_netievent_udpconnect(mgr, event); } else { atomic_init(&sock->active, false); sock->tid = isc_random_uniform(mgr->nlisteners); isc__nm_enqueue_ievent(&mgr->workers[sock->tid], (isc__netievent_t *)event); } LOCK(&sock->lock); while (sock->result == ISC_R_UNSET) { WAIT(&sock->cond, &sock->lock); } atomic_store(&sock->active, true); BROADCAST(&sock->scond); UNLOCK(&sock->lock); } void isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); REQUIRE(VALID_NMSOCK(sock)); udp_recv_cb(handle, nrecv, buf, addr, flags); /* * If a caller calls isc_nm_read() on a listening socket, we can * get here, but we MUST NOT stop reading from the listener * socket. The only difference between listener and connected * sockets is that the former has sock->parent set and later * does not. */ if (!sock->parent) { isc__nm_stop_reading(sock); } } void isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(result != ISC_R_SUCCESS); if (atomic_load(&sock->client)) { isc__nmsocket_timer_stop(sock); isc__nm_stop_reading(sock); if (!sock->recv_read) { goto destroy; } sock->recv_read = false; if (sock->recv_cb != NULL) { isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); isc__nmsocket_clearcb(sock); isc__nm_readcb(sock, req, result); } destroy: isc__nmsocket_prep_destroy(sock); return; } /* * For UDP server socket, we don't have child socket via * "accept", so we: * - we continue to read * - we don't clear the callbacks * - we don't destroy it (only stoplistening could do that) */ if (!sock->recv_read) { return; } sock->recv_read = false; if (sock->recv_cb != NULL) { isc__nm_uvreq_t *req = isc__nm_get_read_req(sock, NULL); isc__nm_readcb(sock, req, result); } } /* * Asynchronous 'udpread' call handler: start or resume reading on a * socket; pause reading and call the 'recv' callback after each * datagram. */ void isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc_result_t result; UNUSED(worker); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); if (isc__nmsocket_closing(sock)) { result = ISC_R_CANCELED; } else { result = isc__nm_start_reading(sock); } if (result != ISC_R_SUCCESS) { sock->reading = true; isc__nm_failed_read_cb(sock, result, false); return; } isc__nmsocket_timer_start(sock); } void isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); isc_nmsocket_t *sock = handle->sock; REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->statichandle == handle); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(!sock->recv_read); sock->recv_cb = cb; sock->recv_cbarg = cbarg; sock->recv_read = true; if (!sock->reading && sock->tid == isc_nm_tid()) { isc__netievent_udpread_t ievent = { .sock = sock }; isc__nm_async_udpread(NULL, (isc__netievent_t *)&ievent); } else { isc__netievent_udpread_t *ievent = isc__nm_get_netievent_udpread(sock->mgr, sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } } static void udp_stop_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); uv_handle_set_data(handle, NULL); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(atomic_load(&sock->closing)); if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false }, true)) { UNREACHABLE(); } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); atomic_store(&sock->listening, false); isc__nmsocket_detach(&sock); } static void udp_close_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); uv_handle_set_data(handle, NULL); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(atomic_load(&sock->closing)); if (!atomic_compare_exchange_strong(&sock->closed, &(bool){ false }, true)) { UNREACHABLE(); } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); if (sock->server != NULL) { isc__nmsocket_detach(&sock->server); } atomic_store(&sock->connected, false); atomic_store(&sock->listening, false); isc__nmsocket_prep_destroy(sock); } static void read_timer_close_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); uv_handle_set_data(handle, NULL); if (sock->parent) { uv_close(&sock->uv_handle.handle, udp_stop_cb); } else { uv_close(&sock->uv_handle.handle, udp_close_cb); } } static void stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->tid == isc_nm_tid()); if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, true)) { return; } udp_close_direct(sock); atomic_fetch_sub(&sock->parent->rchildren, 1); isc_barrier_wait(&sock->parent->stoplistening); } static void stop_udp_parent(isc_nmsocket_t *sock) { isc_nmsocket_t *csock = NULL; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_udplistener); isc_barrier_init(&sock->stoplistening, sock->nchildren); for (size_t i = 0; i < sock->nchildren; i++) { csock = &sock->children[i]; REQUIRE(VALID_NMSOCK(csock)); if ((int)i == isc_nm_tid()) { /* * We need to schedule closing the other sockets first */ continue; } atomic_store(&csock->active, false); enqueue_stoplistening(csock); } csock = &sock->children[isc_nm_tid()]; atomic_store(&csock->active, false); stop_udp_child(csock); atomic_store(&sock->closed, true); isc__nmsocket_prep_destroy(sock); } static void udp_close_direct(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock); uv_close((uv_handle_t *)&sock->read_timer, read_timer_close_cb); } void isc__nm_async_udpclose(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpclose_t *ievent = (isc__netievent_udpclose_t *)ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); UNUSED(worker); udp_close_direct(sock); } void isc__nm_udp_close(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(!isc__nmsocket_active(sock)); if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, true)) { return; } if (sock->tid == isc_nm_tid()) { udp_close_direct(sock); } else { isc__netievent_udpclose_t *ievent = isc__nm_get_netievent_udpclose(sock->mgr, sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } } void isc__nm_udp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_udpsocket); /* * If the socket is active, mark it inactive and * continue. If it isn't active, stop now. */ if (!isc__nmsocket_deactivate(sock)) { return; } /* * If the socket is connecting, the cancel will happen in the * async_udpconnect() due socket being inactive now. */ if (atomic_load(&sock->connecting)) { return; } /* * When the client detaches the last handle, the * sock->statichandle would be NULL, in that case, nobody is * interested in the callback. */ if (sock->statichandle != NULL) { isc__nm_failed_read_cb(sock, ISC_R_CANCELED, false); return; } /* * Otherwise, we just send the socket to abyss... */ if (sock->parent == NULL) { isc__nmsocket_prep_destroy(sock); } } void isc__nm_udp_cancelread(isc_nmhandle_t *handle) { isc_nmsocket_t *sock = NULL; isc__netievent_udpcancel_t *ievent = NULL; REQUIRE(VALID_NMHANDLE(handle)); sock = handle->sock; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udpsocket); ievent = isc__nm_get_netievent_udpcancel(sock->mgr, sock, handle); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } void isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpcancel_t *ievent = (isc__netievent_udpcancel_t *)ev0; isc_nmsocket_t *sock = NULL; UNUSED(worker); REQUIRE(VALID_NMSOCK(ievent->sock)); sock = ievent->sock; REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(atomic_load(&sock->client)); isc__nm_failed_read_cb(sock, ISC_R_EOF, false); }