sd-future: make src/basic blocking helpers fiber-aware

Some helpers in src/basic — ppoll_usec_full() (used by fd_wait_for_event()),
loop_read(), loop_read_exact(), loop_write_full() and
pidref_wait_for_terminate_full() — block the calling thread. That's the
right behaviour outside a fiber but not inside one, where blocking the
thread also stalls every other fiber running on the same event loop.
Rewriting every caller to pick a fiber or non-fiber variant explicitly
would be a lot of churn and would split otherwise-shared code paths in
two.

Instead, the helpers detect at runtime whether they're running on a fiber
and dispatch to a suspending variant when they are. FiberOps in
fiber-ops.h holds five function pointers (ppoll, read, write, timeout,
cancel_wait_unref); a fiber_ops global constant is populated whenever we 
enter a fiber with functions that delegate to suspending variants of common
syscalls. With this approach, the variants themselves stay in libsystemd
which is required because they make use of sd-event.

- loop_read()/loop_read_exact() take the fiber read hook on a fiber
  unless the caller asked for a non-blocking attempt (do_poll=false) and
  the fd is already non-blocking — in that case we fall through to read()
  to preserve the existing return-EAGAIN-immediately semantic. The hook
  itself suspends on EAGAIN until data is available, so neither the
  do_poll knob nor the explicit fd_wait_for_event() retry loop are
  needed on the fiber path.

- loop_write_full() likewise takes the fiber write hook on a fiber,
  except when timeout=0 with an already-non-blocking fd (preserving the
  fast-return-EAGAIN semantic). The fiber path runs inside a
  FIBER_OPS_TIMEOUT() scope so the caller's timeout is honoured via a
  deadline future, mirroring SD_FIBER_TIMEOUT() but reachable from
  src/basic without pulling in sd-future.h.

- pidref_wait_for_terminate_full() polls the pidfd via fd_wait_for_event()
  before each waitid() when either a finite timeout is set or we're on a
  fiber, and requires pidref->fd >= 0 in those cases (returning
  -ENOMEDIUM otherwise — extending the rule that already applied to
  finite timeouts). The poll suspends the fiber via the ppoll hook above;
  the subsequent waitid() doesn't block because the pidfd is already
  signalled.
This commit is contained in:
Daan De Meyer
2026-04-25 22:06:54 +02:00
parent cf4c65afa8
commit 7bc793e21f
9 changed files with 763 additions and 23 deletions

View File

@@ -112,6 +112,7 @@ typedef enum UnitType UnitType;
typedef enum WaitFlags WaitFlags;
typedef struct Fiber Fiber;
typedef struct FiberOps FiberOps;
typedef struct Hashmap Hashmap;
typedef struct HashmapBase HashmapBase;
typedef struct IteratedCache IteratedCache;

51
src/basic/fiber-ops.c Normal file
View File

@@ -0,0 +1,51 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include <poll.h>
#include <threads.h>
#include <unistd.h>
#include "errno-util.h"
#include "fiber-ops.h"
static thread_local const FiberOps *fiber_ops = NULL;
bool fiber_ops_is_set(void) {
return fiber_ops != NULL;
}
void fiber_ops_set(const FiberOps *ops) {
fiber_ops = ops;
}
int fiber_ops_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask) {
if (fiber_ops)
return fiber_ops->ppoll(fds, n_fds, timeout, sigmask);
return RET_NERRNO(ppoll(fds, n_fds, timeout, sigmask));
}
ssize_t fiber_ops_read(int fd, void *buf, size_t count) {
if (fiber_ops)
return fiber_ops->read(fd, buf, count);
ssize_t n = read(fd, buf, count);
return n < 0 ? -errno : n;
}
ssize_t fiber_ops_write(int fd, const void *buf, size_t count) {
if (fiber_ops)
return fiber_ops->write(fd, buf, count);
return RET_NERRNO(write(fd, buf, count));
}
sd_future* fiber_ops_timeout(uint64_t timeout) {
assert(fiber_ops);
return fiber_ops->timeout(timeout);
}
sd_future* fiber_ops_cancel_wait_unref(sd_future *f) {
assert(fiber_ops);
return fiber_ops->cancel_wait_unref(f);
}

34
src/basic/fiber-ops.h Normal file
View File

@@ -0,0 +1,34 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#pragma once
#include "basic-forward.h"
typedef struct sd_future sd_future;
/* Hooks installed on a fiber so that functions in src/basic can transparently defer to the suspending
* variants in sd-future when invoked from a running fiber. Populated by sd_fiber_new() with pointers to the
* implementations in fiber-ops.c. */
typedef struct FiberOps {
int (*ppoll)(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask);
ssize_t (*read)(int fd, void *buf, size_t count);
ssize_t (*write)(int fd, const void *buf, size_t count);
sd_future* (*timeout)(uint64_t timeout);
sd_future* (*cancel_wait_unref)(sd_future *f);
} FiberOps;
bool fiber_ops_is_set(void);
void fiber_ops_set(const FiberOps *fiber_ops);
int fiber_ops_ppoll(struct pollfd *fds, size_t n_fds, const struct timespec *timeout, const sigset_t *sigmask);
ssize_t fiber_ops_read(int fd, void *buf, size_t count);
ssize_t fiber_ops_write(int fd, const void *buf, size_t count);
/* Mirror of SD_FIBER_TIMEOUT() for code under src/basic that doesn't include sd-future.h: dispatches
* through FiberOps so the actual sd_fiber_timeout() implementation lives in libsystemd. */
sd_future* fiber_ops_timeout(uint64_t timeout);
sd_future* fiber_ops_cancel_wait_unref(sd_future *f);
DEFINE_TRIVIAL_CLEANUP_FUNC(sd_future*, fiber_ops_cancel_wait_unref);
#define FIBER_OPS_TIMEOUT(timeout) _FIBER_OPS_TIMEOUT(UNIQ, (timeout))
#define _FIBER_OPS_TIMEOUT(uniq, timeout) \
_unused_ _cleanup_(fiber_ops_cancel_wait_unrefp) sd_future *UNIQ_T(_fot_, uniq) = fiber_ops_timeout(timeout)

View File

@@ -1,5 +1,6 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include <fcntl.h>
#include <poll.h>
#include <stdio.h>
#include <string.h>
@@ -8,6 +9,7 @@
#include <unistd.h>
#include "errno-util.h"
#include "fiber-ops.h"
#include "io-util.h"
#include "time-util.h"
@@ -69,25 +71,44 @@ ssize_t loop_read(int fd, void *buf, size_t nbytes, bool do_poll) {
if (nbytes > (size_t) SSIZE_MAX)
return -EINVAL;
/* do_poll == false means "don't wait, return what we have if EAGAIN". If the fd is already
* non-blocking, read() can't block the thread, so the non-fiber path satisfies that semantic
* correctly even from a fiber. Only use the fiber path when the fd is blocking (where read()
* would otherwise block the entire event loop). */
int flags = 0;
if (fiber_ops_is_set() && !do_poll) {
flags = fcntl(fd, F_GETFL);
if (flags < 0)
return -errno;
}
do {
ssize_t k;
k = read(fd, p, nbytes);
if (k < 0) {
if (errno == EINTR)
continue;
if (fiber_ops_is_set() && (do_poll || !FLAGS_SET(flags, O_NONBLOCK))) {
/* On a fiber the read op suspends on EAGAIN until data is available, so we don't
* need a separate poll step or the do_poll knob. */
k = fiber_ops_read(fd, p, nbytes);
if (k < 0)
return n > 0 ? n : k;
} else {
k = read(fd, p, nbytes);
if (k < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN && do_poll) {
if (errno == EAGAIN && do_poll) {
/* We knowingly ignore any return value here,
* and expect that any error/EOF is reported
* via read() */
/* We knowingly ignore any return value here,
* and expect that any error/EOF is reported
* via read() */
(void) fd_wait_for_event(fd, POLLIN, USEC_INFINITY);
continue;
(void) fd_wait_for_event(fd, POLLIN, USEC_INFINITY);
continue;
}
return n > 0 ? n : -errno;
}
return n > 0 ? n : -errno;
}
if (k == 0)
@@ -137,6 +158,37 @@ int loop_write_full(int fd, const void *buf, size_t nbytes, usec_t timeout) {
p = buf;
}
/* timeout == 0 means "don't wait, return -EAGAIN if not ready". If the fd is already
* non-blocking, write() can't block the thread, so the non-fiber path satisfies that
* semantic correctly even from a fiber. Only use the fiber path when the fd is blocking
* (where write() would otherwise block the entire event loop). */
int flags = 0;
if (fiber_ops_is_set() && timeout == 0) {
flags = fcntl(fd, F_GETFL);
if (flags < 0)
return -errno;
}
if (fiber_ops_is_set() && !FLAGS_SET(flags, O_NONBLOCK)) {
/* On a fiber the write op suspends on EAGAIN until the fd is writable; honor the
* caller's timeout via a deadline scope. */
FIBER_OPS_TIMEOUT(timestamp_is_set(timeout) ? timeout : USEC_INFINITY);
while (nbytes > 0) {
ssize_t k = fiber_ops_write(fd, p, nbytes);
if (k < 0)
return (int) k;
if (_unlikely_(nbytes > 0 && k == 0)) /* Can't really happen */
return -EIO;
assert((size_t) k <= nbytes);
p += k;
nbytes -= k;
}
return 0;
}
/* When timeout is 0 or USEC_INFINITY this is not used. But we initialize it to a sensible value. */
end = timestamp_is_set(timeout) ? usec_add(now(CLOCK_MONOTONIC), timeout) : USEC_INFINITY;
@@ -220,11 +272,9 @@ int ppoll_usec_full(struct pollfd *fds, size_t n_fds, usec_t timeout, const sigs
if (n_fds == 0 && timeout == 0)
return 0;
r = ppoll(fds, n_fds, timeout == USEC_INFINITY ? NULL : TIMESPEC_STORE(timeout), ss);
if (r < 0)
return -errno;
if (r == 0)
return 0;
r = fiber_ops_ppoll(fds, n_fds, timeout == USEC_INFINITY ? NULL : TIMESPEC_STORE(timeout), ss);
if (r <= 0)
return r;
for (size_t i = 0, n = r; i < n_fds && n > 0; i++) {
if (fds[i].revents == 0)

View File

@@ -36,6 +36,7 @@ basic_sources = files(
'ether-addr-util.c',
'extract-word.c',
'fd-util.c',
'fiber-ops.c',
'fileio.c',
'filesystems.c',
'format-ifname.c',

View File

@@ -7,6 +7,7 @@
#include "alloc-util.h"
#include "errno-util.h"
#include "fd-util.h"
#include "fiber-ops.h"
#include "format-util.h"
#include "hash-funcs.h"
#include "io-util.h"
@@ -466,16 +467,28 @@ int pidref_wait_for_terminate_full(PidRef *pidref, usec_t timeout, siginfo_t *re
if (pidref->pid == 1 || pidref_is_self(pidref))
return -ECHILD;
if (timeout != USEC_INFINITY && pidref->fd < 0)
if (pidref->fd < 0 && (timeout != USEC_INFINITY || fiber_ops_is_set()))
return -ENOMEDIUM;
usec_t ts = timeout == USEC_INFINITY ? USEC_INFINITY : usec_add(now(CLOCK_MONOTONIC), timeout);
/* Poll the pidfd before waitid() if either there's a finite timeout (so we can honor it) or
* we're on a fiber (so fd_wait_for_event() can suspend us instead of blocking the event loop
* inside waitid()). Otherwise let waitid() block directly. The precondition above guarantees
* pidref->fd >= 0 in both cases. */
bool poll_first = ts != USEC_INFINITY || fiber_ops_is_set();
for (;;) {
if (ts != USEC_INFINITY) {
usec_t left = usec_sub_unsigned(ts, now(CLOCK_MONOTONIC));
if (left == 0)
return -ETIMEDOUT;
if (poll_first) {
usec_t left;
if (ts == USEC_INFINITY)
left = USEC_INFINITY;
else {
left = usec_sub_unsigned(ts, now(CLOCK_MONOTONIC));
if (left == 0)
return -ETIMEDOUT;
}
r = fd_wait_for_event(pidref->fd, POLLIN, left);
if (r == 0)

View File

@@ -192,6 +192,7 @@ simple_tests += files(
'sd-device/test-sd-device-monitor.c',
'sd-future/test-fiber.c',
'sd-future/test-fiber-io.c',
'sd-future/test-fiber-ops.c',
'sd-hwdb/test-sd-hwdb.c',
'sd-id128/test-id128.c',
'sd-journal/test-audit-type.c',

View File

@@ -20,6 +20,7 @@
#include "architecture.h"
#include "errno-util.h"
#include "event-future.h"
#include "fiber-ops.h"
#include "log-context.h"
#include "log.h"
#include "memory-util.h"
@@ -270,8 +271,10 @@ static void reset_current_fiber(void) {
* current_fiber. Without this, the child of a fork() that happened mid-fiber would inherit the
* fiber's log prefix / context list in its thread-locals even though no fiber is running. */
Fiber *f = fiber_get_current();
if (f)
if (f) {
fiber_swap_log_state(f);
fiber_ops_set(NULL);
}
fiber_set_current(NULL);
}
@@ -307,9 +310,19 @@ static void fiber_resolve(sd_future *f) {
sd_future_resolve(f, fiber->result);
}
static const FiberOps fiber_ops = {
.ppoll = sd_fiber_ppoll,
.read = sd_fiber_read,
.write = sd_fiber_write,
.timeout = sd_fiber_timeout,
.cancel_wait_unref = sd_future_cancel_wait_unref,
};
static void fiber_enter(Fiber *fiber, Fiber *prev, void **fake_stack_save) {
fiber_set_current(fiber);
fiber_swap_log_state(fiber);
if (!prev)
fiber_ops_set(&fiber_ops);
struct iovec fiber_stack = fiber_stack_usable(&fiber->stack);
start_switch_stack(fake_stack_save, &fiber_stack);
@@ -318,6 +331,8 @@ static void fiber_enter(Fiber *fiber, Fiber *prev, void **fake_stack_save) {
static void fiber_leave(Fiber *fiber, Fiber *prev, void *fake_stack_save) {
finish_switch_stack(fake_stack_save);
if (!prev)
fiber_ops_set(NULL);
fiber_swap_log_state(fiber);
fiber_set_current(prev);
}

View File

@@ -0,0 +1,574 @@
/* SPDX-License-Identifier: LGPL-2.1-or-later */
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include "sd-event.h"
#include "sd-future.h"
#include "alloc-util.h"
#include "cleanup-util.h"
#include "fd-util.h"
#include "io-util.h"
#include "pidref.h"
#include "process-util.h"
#include "tests.h"
#include "time-util.h"
/* Test: wait_for_terminate basic functionality */
static int wait_simple_fiber(void *userdata) {
_cleanup_(pidref_done_sigkill_wait) PidRef pidref = PIDREF_NULL;
siginfo_t si;
int r;
/* Fork a child that exits immediately */
r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidref);
if (r < 0)
return r;
if (r == 0)
_exit(42);
/* Parent - wait for child */
r = pidref_wait_for_terminate(&pidref, &si);
if (r < 0)
return r;
pidref_done(&pidref);
/* Verify child exited with status 42 */
if (si.si_code != CLD_EXITED || si.si_status != 42)
return -EIO;
return 0;
}
TEST(wait_for_terminate_fiber_basic) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_(sd_future_unrefp) sd_future *f = NULL;
ASSERT_OK(sd_fiber_new(e, "wait-simple", wait_simple_fiber, NULL, /* destroy= */ NULL, &f));
ASSERT_OK(sd_event_loop(e));
ASSERT_OK(sd_future_result(f));
}
/* Test: wait_for_terminate with multiple children */
static int wait_multiple_fiber(void *userdata) {
PidRef pidrefs[3] = { PIDREF_NULL, PIDREF_NULL, PIDREF_NULL };
siginfo_t si;
int r;
/* Fork three children with different exit codes */
for (size_t i = 0; i < 3; i++) {
r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidrefs[i]);
if (r < 0)
goto cleanup;
if (r == 0)
/* Child process */
_exit(10 + i);
}
/* Wait for all three in order */
for (size_t i = 0; i < 3; i++) {
r = pidref_wait_for_terminate(&pidrefs[i], &si);
if (r < 0)
goto cleanup;
pidref_done(&pidrefs[i]);
if (si.si_code != CLD_EXITED || si.si_status != (int) (10 + i)) {
r = -EIO;
goto cleanup;
}
}
return 0;
cleanup:
for (size_t i = 0; i < 3; i++)
pidref_done(&pidrefs[i]);
return r;
}
TEST(wait_for_terminate_fiber_multiple) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_(sd_future_unrefp) sd_future *f = NULL;
ASSERT_OK(sd_fiber_new(e, "wait-multiple", wait_multiple_fiber, NULL, /* destroy= */ NULL, &f));
ASSERT_OK(sd_event_loop(e));
ASSERT_OK(sd_future_result(f));
}
static int concurrent_wait_fiber(void *userdata) {
_cleanup_(pidref_done_sigkill_wait) PidRef pidref = PIDREF_NULL;
siginfo_t si;
int r;
r = pidref_safe_fork("(test-child)", FORK_RESET_SIGNALS|FORK_DEATHSIG_SIGTERM|FORK_LOG, &pidref);
if (r < 0)
return r;
if (r == 0)
/* Child exits with specified status */
_exit(PTR_TO_INT(userdata));
r = pidref_wait_for_terminate(&pidref, &si);
if (r < 0)
return r;
pidref_done(&pidref);
if (si.si_code != CLD_EXITED || si.si_status != PTR_TO_INT(userdata))
return -EIO;
return 0;
}
TEST(wait_for_terminate_fiber_concurrent) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
sd_future *fibers[3] = {};
CLEANUP_ELEMENTS(fibers, sd_future_unref_array_clear);
/* Create 3 fibers, each waiting for a different child */
for (size_t i = 0; i < ELEMENTSOF(fibers); i++)
ASSERT_OK(sd_fiber_new(e, "concurrent-wait", concurrent_wait_fiber, INT_TO_PTR(20 + i), /* destroy= */ NULL, &fibers[i]));
ASSERT_OK(sd_event_loop(e));
/* All fibers should complete successfully */
for (size_t i = 0; i < ELEMENTSOF(fibers); i++)
ASSERT_OK(sd_future_result(fibers[i]));
}
typedef struct LoopIOContext {
int *pipefd;
const char *data;
size_t len;
int order;
} LoopIOContext;
static int loop_read_suspend_fiber(void *userdata) {
LoopIOContext *ctx = ASSERT_PTR(userdata);
char buf[64];
ASSERT_EQ(ctx->order, 0);
ctx->order = 1;
ssize_t n = loop_read(ctx->pipefd[0], buf, sizeof(buf), /* do_poll= */ true);
/* While we were suspended, the writer fiber should have run. */
ASSERT_EQ(ctx->order, 2);
if (n < 0)
return (int) n;
if ((size_t) n != ctx->len || memcmp(buf, ctx->data, ctx->len) != 0)
return -EIO;
return (int) n;
}
static int loop_write_suspend_fiber(void *userdata) {
LoopIOContext *ctx = ASSERT_PTR(userdata);
ASSERT_EQ(ctx->order, 1);
ctx->order = 2;
int r = loop_write(ctx->pipefd[1], ctx->data, ctx->len);
if (r < 0)
return r;
/* Close the write end so the reader sees EOF after reading the data. */
ctx->pipefd[1] = safe_close(ctx->pipefd[1]);
return 0;
}
/* Test: two fibers cooperatively pass a small payload through a blocking pipe using the suspending
* loop helpers. Exercises the non-blocking flip, event-loop yielding, and the blocking-mode restore. */
TEST(loop_read_write_suspend) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
static const char payload[] = "loop-suspend";
LoopIOContext ctx = {
.pipefd = pipefd,
.data = payload,
.len = sizeof(payload) - 1,
};
_cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL;
ASSERT_OK(sd_fiber_new(e, "loop-read", loop_read_suspend_fiber, &ctx, /* destroy= */ NULL, &fr));
ASSERT_OK(sd_future_set_priority(fr, 0));
ASSERT_OK(sd_fiber_new(e, "loop-write", loop_write_suspend_fiber, &ctx, /* destroy= */ NULL, &fw));
ASSERT_OK(sd_future_set_priority(fw, 1));
ASSERT_OK(sd_event_loop(e));
ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.len);
ASSERT_OK_ZERO(sd_future_result(fw));
/* The read fd started out blocking and loop_read() must have restored it before returning. */
ASSERT_OK_ZERO(fcntl(pipefd[0], F_GETFL) & O_NONBLOCK);
}
static int loop_read_exact_short_fiber(void *userdata) {
int fd = PTR_TO_INT(userdata);
char buf[16];
/* Requesting more bytes than the peer writes should return -EIO once EOF is hit. */
return loop_read_exact(fd, buf, sizeof(buf), /* do_poll= */ true);
}
/* Test: loop_read_exact() returns -EIO when the peer closes early. */
TEST(loop_read_exact_short) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
_cleanup_(sd_future_unrefp) sd_future *f = NULL;
ASSERT_OK(sd_fiber_new(e, "loop-read-exact", loop_read_exact_short_fiber,
INT_TO_PTR(pipefd[0]), /* destroy= */ NULL, &f));
/* Write a few bytes and close the write end — less than the fiber asked for. */
ASSERT_OK_EQ_ERRNO(write(pipefd[1], "abc", 3), (ssize_t) 3);
pipefd[1] = safe_close(pipefd[1]);
ASSERT_OK(sd_event_loop(e));
ASSERT_ERROR(sd_future_result(f), EIO);
}
typedef struct LoopWriteTimeoutContext {
int fd;
int result;
} LoopWriteTimeoutContext;
static int loop_write_timeout_fiber(void *userdata) {
LoopWriteTimeoutContext *ctx = ASSERT_PTR(userdata);
/* Try to write much more than the pipe buffer can hold with a short timeout. The write will
* succeed partially and then hit -ETIME after exhausting the timeout while blocked. */
static const char big_buf[128 * 1024] = { 0 };
ctx->result = loop_write_full(ctx->fd, big_buf, sizeof(big_buf), 100 * USEC_PER_MSEC);
return 0;
}
/* Test: loop_write_full() returns -ETIME when the peer never drains. */
TEST(loop_write_full_timeout) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
/* Shrink the pipe buffer to its minimum (one page) so the 128K write below is guaranteed to block
* regardless of the architecture's page size. The default pipe buffer is 16 pages, which on
* 64K-page architectures (e.g. ppc64le) is 1 MiB — enough to absorb the entire write without ever
* blocking, defeating the purpose of the timeout. */
ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1));
LoopWriteTimeoutContext ctx = { .fd = pipefd[1], .result = 0 };
_cleanup_(sd_future_unrefp) sd_future *f = NULL;
ASSERT_OK(sd_fiber_new(e, "loop-write-timeout", loop_write_timeout_fiber, &ctx, /* destroy= */ NULL, &f));
ASSERT_OK(sd_event_loop(e));
ASSERT_OK_ZERO(sd_future_result(f));
ASSERT_ERROR(ctx.result, ETIME);
}
typedef struct PpollDispatchContext {
int *pipefd;
int order;
} PpollDispatchContext;
static int ppoll_dispatch_read_fiber(void *userdata) {
PpollDispatchContext *ctx = ASSERT_PTR(userdata);
struct pollfd pfd = {
.fd = ctx->pipefd[0],
.events = POLLIN,
};
ASSERT_EQ(ctx->order, 0);
ctx->order = 1;
/* Direct ppoll_usec() call from a fiber must dispatch through sd_fiber_poll(), suspending the
* fiber instead of blocking the entire thread. If dispatch fails, the writer fiber never gets a
* chance to run and the test deadlocks. */
int r = ppoll_usec(&pfd, 1, USEC_INFINITY);
if (r < 0)
return r;
ASSERT_EQ(ctx->order, 2);
if (r != 1 || !FLAGS_SET(pfd.revents, POLLIN))
return -EIO;
return 0;
}
static int ppoll_dispatch_write_fiber(void *userdata) {
PpollDispatchContext *ctx = ASSERT_PTR(userdata);
ASSERT_EQ(ctx->order, 1);
ctx->order = 2;
if (write(ctx->pipefd[1], "x", 1) < 0)
return -errno;
return 0;
}
/* Test: ppoll_usec() called from a fiber dispatches through the FiberOps hook to sd_fiber_poll(),
* yielding to the event loop instead of blocking. */
TEST(ppoll_usec_dispatch) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
PpollDispatchContext ctx = { .pipefd = pipefd };
_cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL;
ASSERT_OK(sd_fiber_new(e, "ppoll-read", ppoll_dispatch_read_fiber, &ctx, /* destroy= */ NULL, &fr));
ASSERT_OK(sd_future_set_priority(fr, 0));
ASSERT_OK(sd_fiber_new(e, "ppoll-write", ppoll_dispatch_write_fiber, &ctx, /* destroy= */ NULL, &fw));
ASSERT_OK(sd_future_set_priority(fw, 1));
ASSERT_OK(sd_event_loop(e));
ASSERT_OK(sd_future_result(fr));
ASSERT_OK(sd_future_result(fw));
}
static int loop_write_zero_timeout_nonblock_fiber(void *userdata) {
int fd = PTR_TO_INT(userdata);
/* Fill the pipe so the next write would block. The fd is non-blocking, so on a fiber
* loop_write_full(timeout=0) must take the non-fiber path and return -EAGAIN immediately
* rather than suspending. */
static const char big_buf[128 * 1024] = { 0 };
return loop_write_full(fd, big_buf, sizeof(big_buf), /* timeout= */ 0);
}
/* Test: timeout == 0 on a non-blocking fd from a fiber preserves the "don't wait" semantic and
* returns -EAGAIN when the pipe buffer is full, instead of suspending the fiber. */
TEST(loop_write_zero_timeout_nonblock) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1));
_cleanup_(sd_future_unrefp) sd_future *f = NULL;
ASSERT_OK(sd_fiber_new(e, "loop-write-zt-nb", loop_write_zero_timeout_nonblock_fiber,
INT_TO_PTR(pipefd[1]), /* destroy= */ NULL, &f));
ASSERT_OK(sd_event_loop(e));
ASSERT_ERROR(sd_future_result(f), EAGAIN);
}
typedef struct LoopWriteZeroBlockingContext {
int *pipefd;
size_t total;
int order;
} LoopWriteZeroBlockingContext;
static int loop_write_zero_blocking_writer_fiber(void *userdata) {
LoopWriteZeroBlockingContext *ctx = ASSERT_PTR(userdata);
ASSERT_EQ(ctx->order, 0);
ctx->order = 1;
/* timeout == 0 on a *blocking* fd from a fiber: the fast EAGAIN return isn't possible, so
* loop_write_full() takes the fiber path. The reader fiber drains the pipe, letting our
* write complete via fiber suspension/resume. */
_cleanup_free_ char *big_buf = malloc0(ctx->total);
ASSERT_NOT_NULL(big_buf);
int r = loop_write_full(ctx->pipefd[1], big_buf, ctx->total, /* timeout= */ 0);
ASSERT_EQ(ctx->order, 2);
return r;
}
static int loop_write_zero_blocking_reader_fiber(void *userdata) {
LoopWriteZeroBlockingContext *ctx = ASSERT_PTR(userdata);
ASSERT_EQ(ctx->order, 1);
ctx->order = 2;
_cleanup_free_ char *buf = malloc(ctx->total);
ASSERT_NOT_NULL(buf);
ssize_t n = loop_read(ctx->pipefd[0], buf, ctx->total, /* do_poll= */ true);
if (n < 0)
return (int) n;
return (int) n;
}
/* Test: timeout == 0 on a blocking fd from a fiber takes the fiber path (suspends until the peer
* drains) instead of blocking the entire thread. */
TEST(loop_write_zero_timeout_blocking) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
ASSERT_OK_ERRNO(fcntl(pipefd[1], F_SETPIPE_SZ, 1));
/* F_SETPIPE_SZ rounds up to the kernel's pipe minimum (typically a page); query the actual
* size and write more than that, so the write must wait on the reader regardless of page size. */
int pipe_sz = fcntl(pipefd[1], F_GETPIPE_SZ);
ASSERT_OK_ERRNO(pipe_sz);
LoopWriteZeroBlockingContext ctx = { .pipefd = pipefd, .total = (size_t) pipe_sz * 2 };
_cleanup_(sd_future_unrefp) sd_future *fw = NULL, *fr = NULL;
ASSERT_OK(sd_fiber_new(e, "loop-write-zt-blk", loop_write_zero_blocking_writer_fiber,
&ctx, /* destroy= */ NULL, &fw));
ASSERT_OK(sd_future_set_priority(fw, 0));
ASSERT_OK(sd_fiber_new(e, "loop-read-zt-blk", loop_write_zero_blocking_reader_fiber,
&ctx, /* destroy= */ NULL, &fr));
ASSERT_OK(sd_future_set_priority(fr, 1));
ASSERT_OK(sd_event_loop(e));
ASSERT_OK(sd_future_result(fw));
ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.total);
}
static int loop_read_no_poll_nonblock_fiber(void *userdata) {
int fd = PTR_TO_INT(userdata);
char buf[64];
/* Empty non-blocking pipe + do_poll=false: on a fiber loop_read() must take the non-fiber
* path and return -EAGAIN immediately rather than suspending. */
return (int) loop_read(fd, buf, sizeof(buf), /* do_poll= */ false);
}
/* Test: do_poll == false on a non-blocking fd from a fiber preserves the "don't wait" semantic
* and returns -EAGAIN when no data is available, instead of suspending the fiber. */
TEST(loop_read_no_poll_nonblock) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC | O_NONBLOCK));
_cleanup_(sd_future_unrefp) sd_future *f = NULL;
ASSERT_OK(sd_fiber_new(e, "loop-read-np-nb", loop_read_no_poll_nonblock_fiber,
INT_TO_PTR(pipefd[0]), /* destroy= */ NULL, &f));
ASSERT_OK(sd_event_loop(e));
ASSERT_ERROR(sd_future_result(f), EAGAIN);
}
typedef struct LoopReadNoPollBlockingContext {
int *pipefd;
const char *data;
size_t len;
int order;
} LoopReadNoPollBlockingContext;
static int loop_read_no_poll_blocking_reader_fiber(void *userdata) {
LoopReadNoPollBlockingContext *ctx = ASSERT_PTR(userdata);
char buf[64];
ASSERT_EQ(ctx->order, 0);
ctx->order = 1;
/* do_poll == false on a *blocking* fd from a fiber: the fast EAGAIN return isn't possible,
* so loop_read() takes the fiber path and suspends until the writer fiber feeds data. */
ssize_t n = loop_read(ctx->pipefd[0], buf, sizeof(buf), /* do_poll= */ false);
ASSERT_EQ(ctx->order, 2);
if (n < 0)
return (int) n;
if ((size_t) n != ctx->len || memcmp(buf, ctx->data, ctx->len) != 0)
return -EIO;
return (int) n;
}
static int loop_read_no_poll_blocking_writer_fiber(void *userdata) {
LoopReadNoPollBlockingContext *ctx = ASSERT_PTR(userdata);
ASSERT_EQ(ctx->order, 1);
ctx->order = 2;
int r = loop_write(ctx->pipefd[1], ctx->data, ctx->len);
if (r < 0)
return r;
ctx->pipefd[1] = safe_close(ctx->pipefd[1]);
return 0;
}
/* Test: do_poll == false on a blocking fd from a fiber takes the fiber path (suspends until the
* peer feeds data) instead of blocking the entire thread. */
TEST(loop_read_no_poll_blocking) {
_cleanup_(sd_event_unrefp) sd_event *e = NULL;
ASSERT_OK(sd_event_new(&e));
ASSERT_OK(sd_event_set_exit_on_idle(e, true));
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
static const char payload[] = "no-poll";
LoopReadNoPollBlockingContext ctx = {
.pipefd = pipefd,
.data = payload,
.len = sizeof(payload) - 1,
};
_cleanup_(sd_future_unrefp) sd_future *fr = NULL, *fw = NULL;
ASSERT_OK(sd_fiber_new(e, "loop-read-np-blk", loop_read_no_poll_blocking_reader_fiber,
&ctx, /* destroy= */ NULL, &fr));
ASSERT_OK(sd_future_set_priority(fr, 0));
ASSERT_OK(sd_fiber_new(e, "loop-write-np-blk", loop_read_no_poll_blocking_writer_fiber,
&ctx, /* destroy= */ NULL, &fw));
ASSERT_OK(sd_future_set_priority(fw, 1));
ASSERT_OK(sd_event_loop(e));
ASSERT_OK_EQ(sd_future_result(fr), (int) ctx.len);
ASSERT_OK_ZERO(sd_future_result(fw));
}
/* Test: loop_*() helpers transparently fall back to blocking I/O when called outside any
* fiber context. */
TEST(loop_read_write_fallback) {
_cleanup_close_pair_ int pipefd[2] = EBADF_PAIR;
ASSERT_OK_ERRNO(pipe2(pipefd, O_CLOEXEC));
ASSERT_OK(loop_write(pipefd[1], "fallback", STRLEN("fallback")));
char buf[16];
ssize_t n = loop_read(pipefd[0], buf, STRLEN("fallback"), /* do_poll= */ true);
ASSERT_OK_EQ(n, (ssize_t) STRLEN("fallback"));
ASSERT_EQ(memcmp(buf, "fallback", STRLEN("fallback")), 0);
}
DEFINE_TEST_MAIN(LOG_DEBUG);