计算机计时

https://fasttime.sourceforge.net/doc/internal.html

3 种 板载时钟机制

当前的桌面计算机有三种板载时钟机制来确定时间:

  • 1)电池供电的实时时钟即使在断电的情况下也能保持计时,其精确度堪比任何石英手表。获取此类时钟的时间成本相对较高,通常操作系统只会在启动过程中查询它。
  • 2)操作系统会设置一个定时器,以固定的时间间隔中断 CPU。每次中断时,内核都会增加一个计数器。Windows 和大多数 Linux 内核将此间隔设置为 10 毫秒。该时钟的频率漂移和抖动相对较低,但分辨率仅为 10 毫秒(取决于内核)。
  • 3)所有现代 CPU 都包含一个在每个时钟周期递增的寄存器(例如,如果您的处理器主频为 1.0 GHz,则每秒递增 10 亿次)。不同的架构赋予该寄存器不同的名称;在本文档中,我们将其称为 TSC 寄存器。
    • 该时钟具有非常高的分辨率,但由于晶体不稳定、温度和功率波动(可能由系统负载变化引起)以及明确的电源管理(速度限制),频率漂移相对较高。

来自 gettimeofday 的时间戳

gettimeofday 系统调用能够提供精度高达 1 纳秒的时间戳。

该时间由中断定时器的计数(在启动时初始化为实时时钟)获取。

任何低于 10 毫秒的精度均通过对 TSC 寄存器进行插值获得;由于内核没有关于 TSC 频率的准确信息,因此通常在启动时进行简单的校准。

1
2
3
4
5
6
7
8
9
10
// startup calibration
t1 = read_tsc();
t2 = read_interrupt_count();
sleep(1);
t3 = read_tsc();
t4 = read_interrupt_count();
tsc_rate = (t4 - t2) / (t3 - t1);

// gettimeofday evaluating sub 10ms time
time = read_tsc() * tsc_rate;

其结果是,尽管分辨率为 1 纳秒,但系统时间只能准确报告高达 10 毫秒分辨率的时间,并且任何较低的分辨率都基于 TSC 寄存器的启动时间校准。

在许多操作系统中,gettimeofday 被实现为系统调用,需要先切换到内核,然后再切换回来才能读取时间。除了性能明显下降(在奔腾等上下文切换成本高昂的平台上尤其如此)之外,如果内核利用了这种切换,应用程序还可能丢失其时间片。例如,对于需要在发送数据包之前立即获取时间片的网络应用程序来说,这可能会带来问题。

fasttime 的 原则

fasttime 实现基于 Network Time Protocol v4 的算法,以使用 TSC 寄存器提供准确的时间估计。

  • 用于将 TSC 计数转换为时间的校准会不断更新和完善,以解决频率漂移。
  • 这种重复校准不得受到系统负载的影响,并且应尽量减少系统负担。
  • 校准可以由守护进程完成,并且其最新的校准表可供实现 fasttime 库的所有用户进程使用。
  • 校准必须快速适应系统时间的变化;例如由 NTP 守护进程或手动用户干预带来的变化。
  • 该库应该在用户空间中运行,因此上下文切换时性能不会下降。

Derived Time 派生时间

fasttime 将 TSC 值与当前时间之间的关系建模为线性函数。校准过程会保留一个截距(intercept)和梯度(gradient),并将其应用于 TSC 值以得出当前时间。最初,这些值是使用一个非常简单的校准循环计算的,该循环类似于操作系统使用的循环;然后使用锁相环(phase-locked loop)对其进行迭代调整。

锁相环(PLL)算法将当前偏移量作为输入,并返回一个调整值,用于 fasttime 的梯度和截距。该实现大致基于 NTP4 的 PLL:

\begin{align} prediction_k &= offset_{k-1} + mu_k * (gradient_k - gradient_{k-1});\\ correction_k &= offset_k - prediction_k / 2;\\ interceptk_k &= correction_k;\\ gradient_k &= gain * correction_k / mu_k\\ \end{align}

其中 kk 是迭代次数,gaingain 是某个常数,mumu 是 TSC 值 与上一次迭代之间的差值。

两个因素决定了 PLL 的行为:迭代间隔时间(称为环路延迟(loop delay))和增益值(gain)。

较短的环路延迟允许派生时钟(derived clock)更快地收敛(converge)到系统时间,但更容易出现振荡(oscillation)和偏移测量误差。
较长的环路延迟通常可以提供更稳定的性能,但派生时钟中的任何误差都需要更长时间才能纠正。

PLL 增益也起着类似的作用:较高的增益会导致振荡,而较低的增益则需要更长的时间来稳定。

在 fasttime 中,当时钟稳定时,环路延迟会周期性地延长;当时钟不稳定时,环路延迟会缩短。

控制延迟加大或减小多少以及 PLL 增益的可以通过 fasttimed 的命令行参数指定。

Clock Sampling 时钟采样

从 fasttime 调用 gettimeofday 到 计算出时间(when the time is evaluated)之间存在延迟,并且在调用返回之前再次存在延迟。估算 派生时钟 与 系统时间 之间偏移量的一种简单方法是 对系统调用中的派生时间进行平均:

1
2
3
4
t1 = get_derived_time();  
t2 = gettimeofday();
t3 = get_derived_time();
offset = t2 - (t1 + t3) / 2;

如果 (t2 - t1) == (t3 - t2),即延迟对称,则 offset 为 0 ,gettimeofday 结果准确。

但有很多原因可能导致结果不准确,例如在上下文切换期间丢失了时间片(这种情况已在 Pentium 4 处理器上运行 Linux 时进行过测量,并确认至少会发生)。

[Shalunov 2000]描述了一种更好的方法,该方法只需假设延迟是随机对称分布的,即可进行多次采样并组合。该方法既在 fasttime 中实现,用于采样系统时间;也在演示应用程序中实现,用于测试 fasttime 的准确性。

Rate change amortisation 速率变动摊销

获取当前时间的主要应用之一是测量某个进程的运行时间。对于这类应用来说,时间的绝对准确性并不那么重要,只要速率稳定且正确即可。fasttime 通过尽可能避免引入剧烈的速率变化来适应这些程序。

当速率变化相对较小时(由于正常的 PLL 程序),变化会在几秒钟内逐渐完成。较大的误差仍会立即得到纠正(例如,系统时间的变化)。

Clock filtering 时钟滤波

由于操作系统对系统时钟的校准不佳,其返回值偶尔会出现 30-100 微秒左右的抖动(glitches)。这种抖动(jitter)通常不会被大多数应用程序察觉,但对于 fasttime 来说却很显著,因为这导致引入时钟振荡(oscillation),而振荡需要一些时间来校正。

为了解决这个问题以及预期的硬件抖动,在将偏移传递给 PLL 之前对偏移施加滤波器。该偏移量会与最近 10 个样本的中位数进行比较;如果偏移量超过该中位数一定量(例如 5 倍),则不会调整时钟。实际上,该样本会被丢弃,尽管它确实会对后续的滤波有所贡献,因此,当系统时间发生真正的变化时,它会在短时间内通过滤波器。

Shared memory protection 共享内存保护

FastTime 在客户端/服务器模型中运行,其中校准表格(calibration table)在单独的进程或线程中不断更新到客户端应用程序。在 fasttime 处于单独进程(fasttimed 守护进程)的情况下,POSIX 共享内存用于允许客户端访问校准。

保护共享内存的标准方法是使用互斥体、信号量或消息传递。在这种情况下,这些都不能应用,因为它需要客户端应用程序进行系统调用,这是不使用 gettimeofday 的主要动机之一。

相反,fasttime 维护一个校准表格的循环数组,其中只有一个在任何时候处于活动状态(这意味着它用于计算客户端的派生时间)。守护进程或校准线程更新未使用的校准表,然后以原子方式更新指向活动表格(active table)的索引。这种原子更新是通过 CPU 指令而不是系统调用完成的。

Terminology  术语

  • System time  系统时间
    • The current time, as returned by gettimeofday 当前时间,由 gettimeofday 返回
  • Derived time  派生时间
    • Current time, calculated by fasttime 当前时间,由 fasttime 计算
  • Offset  偏移
    • The difference between system and derived time 系统时间与派生时间的差值
  • Rate  率
    • The frequency, or speed of a clock. For system time this is ideally 1 sec/sec, but may vary due to wander or NTP adjustment. 时钟的频率或速度。对于系统时间,理想情况下为 1 秒/秒,但可能会因漂移或 NTP 调整而变化
  • Loop delay  循环延迟
    • Time between iterations of the PLL PLL 迭代之间的时间

References  引用

Clock Discipline Algorithms for the Network Time Protocol Version 4, D. Mills 1997
网络时间协议第 4 版的时钟规则算法 ,D. Mills 1997

Adaptive Hybrid Clock Discipline Algorithm for the Network Time Protocol, D. Mills 1998
网络时间协议的自适应混合时钟规则算法 ,D. Mills 1998

PC Based Precision Timing Without GPS, A. Pasztor & D. Veitch 2002
基于 PC 的无 GPS 精确计时 ,A. Pasztor 和 D. Veitch 2002

NTP Implementation and Assumptions about the Network, S. Shalunov 2000
NTP 实施和关于网络的假设 ,S. Shalunov 2000

Source code for NTP4 is also an excellent reference as it differs from the descriptions above.
NTP4 的源代码也是一个很好的参考,因为它与上面的描述不同。

benchmark

fast差距

标准库和

获取纳秒的方式一览

通用、够快:在现代 Linux 上,glibc 的 clock_gettime 通常通过 vDSO 在用户态完成,不走系统调用,已经非常快;std::chrono 常常就是对它的封装。std::chrono::steady_clock,等价于clock_gettime(CLOCK_MONOTONIC)。若想完全不受 NTP 微调影响,则用CLOCK_MONOTONIC_RAW

C++

1
2
3
4
5
6
7
8
#include <chrono>
#include <cstdint>

uint64_t now_ns() {
using namespace std::chrono;
auto t = steady_clock::now(); // 一般映射到 CLOCK_MONOTONIC
return duration_cast<nanoseconds>(t.time_since_epoch()).count();
}

POSIX(精细控制时钟源)

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <time.h>
#include <stdint.h>

static inline uint64_t clock_gettime_ns(clockid_t clk) {
struct timespec ts;
clock_gettime(clk, &ts); // glibc 多数情况走 vDSO
return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec;
}

// 例子:稳定单调时钟: 一般业务/性能测试 CLOCK_MONOTONIC 就足够且更通用。
uint64_t t1 = clock_gettime_ns(CLOCK_MONOTONIC);
// 例子:完全不受 NTP 微调影响: 做高精度基准、需要避免 NTP 频率微调时的轻微漂移。
uint64_t t2 = clock_gettime_ns(CLOCK_MONOTONIC_RAW);

极限、低开销、最快:读 TSC(RDTSC / RDTSCP)。前提:x86_64invariant TSC,确认 CPU 支持 constant_tsc/nonstop_tsc/proc/cpuinfo),尽量 绑核,对虚拟化/跨核迁移要谨慎。需要自己把“周期 到 纳秒”做一次标定。只用于同机短区间相对时间。

读周期计数(RDTSCP,带序列化)

1
2
3
4
5
6
7
8
9
#include <cstdint>
#ifdef __x86_64__
static inline uint64_t rdtsc() {
unsigned int lo, hi;
unsigned int aux;
asm volatile ("rdtscp" : "=a"(lo), "=d"(hi), "=c"(aux) ::);
return ((uint64_t)hi << 32) | lo;
}
#endif

把周期换算成纳秒(简单标定一次 TSC 频率)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <time.h>
#include <unistd.h>

double calibrate_tsc_ghz() {
// 用 MONOTONIC_RAW 做 100ms 的简易标定
struct timespec a,b;
uint64_t c0 = rdtsc();
clock_gettime(CLOCK_MONOTONIC_RAW, &a);
usleep(100000); // 100ms
uint64_t c1 = rdtsc();
clock_gettime(CLOCK_MONOTONIC_RAW, &b);
double dt_ns = (b.tv_sec - a.tv_sec)*1e9 + (b.tv_nsec - a.tv_nsec);
double cycles = double(c1 - c0);
return cycles / dt_ns; // GHz = cycles per ns
}

uint64_t tsc_cycles_to_ns(uint64_t cycles, double tsc_ghz) {
// ns = cycles / (GHz)
return (uint64_t)(cycles / tsc_ghz);
}

注意要点

  • 读区间时,常用模式:start=rdtsc(); … ; end=rdtsc();
    若对序列化严格:lfence; rdtsc;……rdtscp; lfence;
  • 线程绑核sched_setaffinity)可避免跨核不同步导致的抖动。
  • 仅做相对时间,不要当作“系统时间”。

要“对点墙钟”的真实时间戳(日志、审计) :用 CLOCK_REALTIME(受 NTP/chrony 调整); 若机器跑了 PTP 且网卡有 PHC:读 /dev/ptpX 对应的 clock(最准的对时)。

系统墙钟(受 NTP/chrony 校时):

1
uint64_t realtime_ns = clock_gettime_ns(CLOCK_REALTIME);

PTP 硬件时钟(更准,对时最强)

1
2
3
4
5
6
7
8
9
#include <sys/timex.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <linux/ptp_clock.h>

int fd = open("/dev/ptp0", O_RDONLY);
clockid_t clk = FD_TO_CLOCKID(fd);
uint64_t phc_ns = clock_gettime_ns(clk); // 读取网卡 PHC
close(fd);

若系统跑了 linuxptpptp4l/phc2sys),PHC 与系统时钟会保持纳秒级一致。

对点墙钟是什么

对比项 传统挂钟 智能对点墙钟
​核心功能​ 自行走时,依赖内部机芯(石英或机械) ​自动接收信号校准时间​​,消除累积误差
​对时方式​ 手动调节 自动(如GPS、NTP、电波)
​时间精度​ 有机芯本身存在的误差,会累积 ​极高​​,与标准时间源保持同步
​多钟同步​ 难以实现,各钟显示时间可能存在差异 ​轻松实现​​,所有时钟显示完全一致的时间
​典型应用​ 家居、普通办公室 ​学校、医院、车站、工厂、办公楼​​等需要统一时间的公共场所

纳秒时间戳怎么实现

我们通过记录 TSC,最后批量换算 ns 即可。

单次真实“纳秒时间戳”调用 <1 ns 不现实

  • clock_gettime(vDSO)通常十几到数十 ns;
  • RDTSC/RDTSCP 也要 ~5–15 ns(视 CPU/栅栏而定)。

解决:在热路径记录TSC(CPU 周期计数),把换算成 ns 的工作放到批量/后台。只要你的业务需要“本地单调时序”而不是“墙钟对时”,这就完美契合。

读 TSC(建议 RDTSCP + 绑核)

1
2
3
4
5
6
7
8
9
10
static inline __attribute__((always_inline)) uint64_t rdtsc_ordered() {
#if defined(__x86_64__)
unsigned aux, lo, hi;
asm volatile("rdtscp" : "=a"(lo), "=d"(hi), "=c"(aux) :: "memory");
// RDTSCP 自带乱序屏障,读后不会被重排到指令前
return (uint64_t(hi) << 32) | lo;
#else
# error "Use aarch64 cntvct_el0 or platform-specific counter"
#endif
}

运行前检查 CPU 有 invariant/constant_tsc;线程用 sched_setaffinity 绑核,避免跨核 TSC 偏差

固定点比例换算(高效、无浮点除法)

思路:ns = base_ns + ((tsc - base_tsc) * mult) >> shift
在启动/定时校准时,用 CLOCK_MONOTONIC_RAW 标定 mult/shift

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <time.h>
#include <atomic>

struct TscCalib {
uint64_t base_tsc;
uint64_t base_ns;
uint32_t mult; // cycles -> ns 的乘子(放大后右移)
uint32_t shift; // 右移位数
};

// 简易标定:100ms 窗口;生产里可做多次取中位数
inline TscCalib calibrate_tsc_fixedpoint() {
timespec a{}, b{};
// 读起始
clock_gettime(CLOCK_MONOTONIC_RAW, &a);
uint64_t t0 = rdtsc_ordered();
// 睡 100ms(或忙等更稳定)
struct timespec req{0, 100000000};
nanosleep(&req, nullptr);
uint64_t t1 = rdtsc_ordered();
clock_gettime(CLOCK_MONOTONIC_RAW, &b);

uint64_t dt_ns = uint64_t(b.tv_sec - a.tv_sec) * 1000000000ull
+ uint64_t(b.tv_nsec - a.tv_nsec);
uint64_t dt_cy = t1 - t0;

// 计算 (ns/cycle) 的定点表示:找个 shift 使 mult 不溢出
// 我们想要:ns ≈ (cycles * mult) >> shift
uint32_t shift = 24; // 经验值;可根据 dt_cy 调整
uint64_t mult64 = ((dt_ns << shift) + (dt_cy/2)) / dt_cy;

TscCalib c{};
c.base_tsc = t1;
c.base_ns = uint64_t(b.tv_sec) * 1000000000ull + b.tv_nsec;
c.mult = (uint32_t)mult64;
c.shift = shift;
return c;
}

inline uint64_t tsc_to_ns(uint64_t tsc, const TscCalib& c) {
uint64_t d = tsc - c.base_tsc;
// 128bit 乘法更稳;GCC/Clang 下内建 __int128
__uint128_t prod = (__uint128_t)d * c.mult;
return c.base_ns + (uint64_t)(prod >> c.shift);
}

热路径怎么存储:
只做tsc = rdtsc_ordered();tsc 写入环形队列 / 结构体。

大概 5 - 15 ns。

可以不在每次操作时都真的读取 TSC,而是每 N 次 / 每一批读一次,写同一个批次的时间共用一个时间戳(或首尾时间戳 + 序号插值)。这样平均到每次调用就会 小于 1 ns 了。

批量阶段:
取出tsc批量tsc_to_ns(),一次循环里用固定点乘法右移,吞吐很高(无 syscalls)。

若必须每次都带 ns 值:就只能接受~10 ns 级开销(TSC + 换算)或~20-60 ns (vDSO clock_gettime),做不到 <1 ns。

如果需要“对时”怎么办(墙钟/与行情时间对齐)

  • 本机“墙钟”用 CLOCK_REALTIME;但开销 > TSC。
  • 有 PTP 的交易网卡(X710/E810 等)可读 PHC(/dev/ptpX)做对时,再把 PHC 与 TSC 做一次线性拟合,得到TSC→UTC 的映射(同样固定点转换)。这样热路径仍旧只记 TSC,离线批量转 UTC 纳秒

工程化清单

  • 绑核 + 关闭频率波动(performance governor)、确认 invariant TSC
  • 位图常驻:把位图和它的读写者放同核,避免跨核伪共享;读多写少时用双缓冲或 RCU,避免读到撕裂的 64 位。
  • 禁止异常、RTTI,开 LTO;必要时 -fno-plt -fno-asynchronous-unwind-tables
  • 事件日志:用无锁环形队列,记录 {idx, result, tsc};批量 flush 时统一换算。
  • 监控:定期重标定 TSC(如每几秒做一次短窗口),防极端漂移;记录校准参数版本号到事件中,确保可逆。

迷你示例:批量查位 + 批量打时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <cstdint>
#include <cstddef>

struct alignas(64) Bitset20000 {
// 20000 bits -> 313 x u64 (199+1? 实际需要 (20000+63)/64 = 313)
static constexpr size_t kWords = (20000 + 63) / 64;
uint64_t w[kWords];
};

// __restrict__ + always_inline + noalias,帮助编译器矢量化/消掉别名顾虑
static inline __attribute__((always_inline))
bool bit_test(const uint64_t* __restrict bs64, size_t i) {
// 无分支:一次加载 + 位移 + 与
return (bs64[i >> 6] >> (i & 63)) & 1u;
}

//--------------------------------------------------------------------
struct Event { uint32_t idx; uint8_t val; uint64_t tsc; };

void process_batch(const Bitset20000& bs, const uint32_t* idxs, size_t n,
Event* out, const TscCalib& calib) {
// 只在批首读一次 TSC(也可每K次读一次)
uint64_t tsc0 = rdtsc_ordered();
for (size_t i = 0; i < n; ++i) {
bool v = bit_test(bs.w, idxs[i]);
out[i] = { idxs[i], (uint8_t)v, tsc0 };
}
// 批处理末尾再读一次,必要时你也能给范围时间
}

// 批量转 ns(离线/低频线程)
void convert_to_ns(Event* evts, size_t n, const TscCalib& c) {
for (size_t i = 0; i < n; ++i) {
uint64_t ns = tsc_to_ns(evts[i].tsc, c);
// 写回或落盘…
(void)ns;
}
}

小结

  • 查位:上面那段无分支 bit-test + L1 热数据,单次 ~1 ns 级
  • 时间戳:想要纳秒值但平均 <1 ns,只能热路径记 TSC,批量换算 ns;若必须每次拿 ns,就接受 10–60 ns 的现实(TSC+换算 或 vDSO)。
  • 如需与交易所/撮合时间对齐,用 PHC/PTP 做对时,再把 TSC→UTC 的映射用于批量转换。

Quill

  • 高效“持续不断”打印纳秒时间戳的核心在于:前端线程用 TSC 直接采集 rdtsc 值并入队,后端线程运行 RdtscClock 周期校准,采用无锁转换算法将 rdtsc 快速转换为自纪元以来的纳秒;再由格式化器按 %Qns 输出纳秒。
  • 队列采用单生产者单消费者(SPSC)环形缓冲,生产与消费均为 wait-free(无锁)。根据配置有“有界/无界 + 阻塞/丢弃”策略,但底层 SPSC 操作是无锁的;例如“UnboundedBlocking”只在触达上限时阻塞策略层面,而非用锁。

前端使用 TSC,把原始 rdtsc 推到队列;后端用 RdtscClock 周期与墙钟同步并用无锁算法转换为纳秒:

任何线程可获取与后端 TSC 时钟同步的“纳秒 since epoch”;内部通过 BackendManager::convert_rdtsc_to_epoch_time
BackendTscClock.h

1
2
3
4
5
6
7
8
9
10
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT static time_point now() noexcept
{
uint64_t const ts = detail::BackendManager::instance().convert_rdtsc_to_epoch_time(detail::rdtsc());

return ts ? time_point{std::chrono::nanoseconds{ts}}
: time_point{std::chrono::nanoseconds{
std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count()}};
}

后端在首次遇到 TSC 源时懒初始化 RdtscClock,然后把前端塞入的 rdtsc 转成纳秒:
BackendWorker.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (transit_event->logger_base->_clock_source == ClockSourceType::Tsc)
{
// If using the rdtsc clock, convert the value to nanoseconds since epoch.
// This conversion ensures that every transit inserted in the buffer below has a timestamp in
// nanoseconds since epoch, allowing compatibility with Logger objects using different clocks.
if (QUILL_UNLIKELY(!_rdtsc_clock.load(std::memory_order_relaxed)))
{
// Lazy initialization of rdtsc clock on the backend thread only if the user decides to use
// it. The clock requires a few seconds to init as it is taking samples first.
_rdtsc_clock.store(new RdtscClock{_options.rdtsc_resync_interval}, std::memory_order_release);
_last_rdtsc_resync_time = std::chrono::steady_clock::now();
}

// Convert the rdtsc value to nanoseconds since epoch.
transit_event->timestamp =
_rdtsc_clock.load(std::memory_order_relaxed)->time_since_epoch(transit_event->timestamp);
}

RdtscClock 的核心转换:以校准基准 base_time 与 base_tsc,按 ns_per_tick 做线性换算;当 rdtsc 差值超过阈值时触发重同步:

RdtscClock.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
uint64_t time_since_epoch(uint64_t rdtsc_value) const noexcept
{
auto const index = _version.load(std::memory_order_relaxed) & (_base.size() - 1);

auto diff = static_cast<int64_t>(rdtsc_value - _base[index].base_tsc);

if (diff > _resync_interval_ticks)
{
resync(resync_lag_cycles);
diff = static_cast<int64_t>(rdtsc_value - _base[index].base_tsc);
}

return static_cast<uint64_t>(_base[index].base_time +
static_cast<int64_t>(static_cast<double>(diff) * _ns_per_tick));
}

后端格式化时按纳秒传给时间戳格式化器;%Qns 专门支持纳秒输出:

PatternFormatter.h

1
2
3
4
if (_is_set_in_pattern[Attribute::Time])
{
_set_arg_val<Attribute::Time>(_timestamp_formatter.format_timestamp(std::chrono::nanoseconds{timestamp}));
}

TimestampFormatter.h

1
2
3
4
* same format specifiers as strftime() but with the following additional specifiers :
* 1) %Qms - Milliseconds
* 2) %Qus - Microseconds
* 3) %Qns - Nanoseconds
1
2
3
4
5
6
7
8
9
10
11
12
13
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::string_view format_timestamp(std::chrono::nanoseconds time_since_epoch)
{
int64_t const timestamp_ns = time_since_epoch.count();
...
if (_additional_format_specifier == AdditionalSpecifier::Qns)
{
static constexpr std::string_view zeros{"000000000"};
_formatted_date.append(zeros);
_write_fractional_seconds(extracted_ns);
}
...
return std::string_view{_formatted_date.data(), _formatted_date.size()};
}

队列模型:线程局部的 SPSC 队列把前端日志传给后端;默认“无界 + 阻塞”策略,但本质是 SPSC:
overview.rst

1
2
3
4
5
6
7
8
9
10
11
12
13
Reliable Logging Mechanism

--------------------------

Quill utilizes a thread-local single-producer-single-consumer queue to relay logs to the backend thread, ensuring that log messages are never dropped.

Initially, an unbounded queue with a small size is used to optimise performance.

However, if the queue reaches its capacity, a new queue will be allocated, which may cause a slight performance penalty for the frontend.

The default unbounded queue can expand up to a size of `FrontendOptions::unbounded_queue_max_capacity`. If this limit is reached, the caller thread will block.

It's possible to change the queue type within the :cpp:class:`FrontendOptions`.

无界 SPSC 的注释直接说明“生产/消费均为 wait-free(无锁)”,满时切换到新节点继续产出:
UnboundedSPSCQueue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* A single-producer single-consumer FIFO circular buffer
*
* The buffer allows producing and consuming objects
*
* Production is wait free.
*
* When the internal circular buffer becomes full a new one will be created and the production
* will continue in the new buffer.
*
* Consumption is wait free. If not data is available a special value is returned. If a new
* buffer is created from the producer the consumer first consumes everything in the old
* buffer and then moves to the new buffer.
*/

有界 SPSC 明确是环形缓冲实现,作为无界队列的基础块:
BoundedSPSCQueue.h

1
2
3
/**
* A bounded single producer single consumer ring buffer.
*/

前端写队列的调用点(prepare_write/commit),无锁热路径:

Logger.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* _reserve_queue_space(size_t total_size,
MacroMetadata const* macro_metadata)
{
std::byte* write_buffer =
_thread_context->get_spsc_queue<frontend_options_t::queue_type>().prepare_write(total_size);
...
else if constexpr ((frontend_options_t::queue_type == QueueType::BoundedBlocking) ||
(frontend_options_t::queue_type == QueueType::UnboundedBlocking))
{
if (QUILL_UNLIKELY(write_buffer == nullptr))
{
...
do
{
...
write_buffer =
_thread_context->get_spsc_queue<frontend_options_t::queue_type>().prepare_write(total_size);
} while (write_buffer == nullptr);
}
}

return write_buffer;
}

队列策略(有界/无界 + 阻塞/丢弃)的官方说明:

frontend_options.rst

1
2
3
4
- **UnboundedBlocking**: Starts with a small initial capacity. The queue reallocates up to `FrontendOptions::unbounded_queue_max_capacity` and then blocks the calling thread until space becomes available.
- **UnboundedDropping**: Starts with a small initial capacity. The queue reallocates up to `FrontendOptions::unbounded_queue_max_capacity` and then discards log messages.
- **BoundedBlocking**: Has a fixed capacity and never reallocates. It blocks the calling thread when the limit is reached until space becomes available.
- **BoundedDropping**: Has a fixed capacity and never reallocates. It discards log messages when the limit is reached.

简短总结

  • 前端用 rdtsc 极快取样,后端 RdtscClock 周期校准,以无锁算法转换为纳秒;格式化器 %Qns 输出纳秒。

  • 队列为线程局部 SPSC 环形缓冲,生产/消费均 wait-free(无锁);策略层面可选“阻塞/丢弃/有界/无界”。因此:是无锁队列。