计算机计时

https://fasttime.sourceforge.net/doc/internal.html

3 种 板载时钟机制

当前的桌面计算机有三种板载时钟机制来确定时间:

  • 1)电池供电的实时时钟即使在断电的情况下也能保持计时,其精确度堪比任何石英手表。获取此类时钟的时间成本相对较高,通常操作系统只会在启动过程中查询它。
  • 2)操作系统会设置一个定时器,以固定的时间间隔中断 CPU。每次中断时,内核都会增加一个计数器。Windows 和大多数 Linux 内核将此间隔设置为 10 毫秒。该时钟的频率漂移和抖动相对较低,但分辨率仅为 10 毫秒(取决于内核)。
  • 3)所有现代 CPU 都包含一个在每个时钟周期递增的寄存器(例如,如果您的处理器主频为 1.0 GHz,则每秒递增 10 亿次)。不同的架构赋予该寄存器不同的名称;在本文档中,我们将其称为 TSC 寄存器。
    • 该时钟具有非常高的分辨率,但由于晶体不稳定、温度和功率波动(可能由系统负载变化引起)以及明确的电源管理(速度限制),频率漂移相对较高。

来自 gettimeofday 的时间戳

gettimeofday 系统调用能够提供精度高达 1 纳秒的时间戳。

该时间由中断定时器的计数(在启动时初始化为实时时钟)获取。

任何低于 10 毫秒的精度均通过对 TSC 寄存器进行插值获得;由于内核没有关于 TSC 频率的准确信息,因此通常在启动时进行简单的校准。

1
2
3
4
5
6
7
8
9
10
// startup calibration
t1 = read_tsc();
t2 = read_interrupt_count();
sleep(1);
t3 = read_tsc();
t4 = read_interrupt_count();
tsc_rate = (t4 - t2) / (t3 - t1);

// gettimeofday evaluating sub 10ms time
time = read_tsc() * tsc_rate;

其结果是,尽管分辨率为 1 纳秒,但系统时间只能准确报告高达 10 毫秒分辨率的时间,并且任何较低的分辨率都基于 TSC 寄存器的启动时间校准。

在许多操作系统中,gettimeofday 被实现为系统调用,需要先切换到内核,然后再切换回来才能读取时间。除了性能明显下降(在奔腾等上下文切换成本高昂的平台上尤其如此)之外,如果内核利用了这种切换,应用程序还可能丢失其时间片。例如,对于需要在发送数据包之前立即获取时间片的网络应用程序来说,这可能会带来问题。

fasttime 的 原则

fasttime 实现基于 Network Time Protocol v4 的算法,以使用 TSC 寄存器提供准确的时间估计。

  • 用于将 TSC 计数转换为时间的校准会不断更新和完善,以解决频率漂移。
  • 这种重复校准不得受到系统负载的影响,并且应尽量减少系统负担。
  • 校准可以由守护进程完成,并且其最新的校准表可供实现 fasttime 库的所有用户进程使用。
  • 校准必须快速适应系统时间的变化;例如由 NTP 守护进程或手动用户干预带来的变化。
  • 该库应该在用户空间中运行,因此上下文切换时性能不会下降。

Derived Time 派生时间

fasttime 将 TSC 值与当前时间之间的关系建模为线性函数。校准过程会保留一个截距(intercept)和梯度(gradient),并将其应用于 TSC 值以得出当前时间。最初,这些值是使用一个非常简单的校准循环计算的,该循环类似于操作系统使用的循环;然后使用锁相环(phase-locked loop)对其进行迭代调整。

锁相环(PLL)算法将当前偏移量作为输入,并返回一个调整值,用于 fasttime 的梯度和截距。该实现大致基于 NTP4 的 PLL:

\begin{align} prediction_k &= offset_{k-1} + mu_k * (gradient_k - gradient_{k-1});\\ correction_k &= offset_k - prediction_k / 2;\\ interceptk_k &= correction_k;\\ gradient_k &= gain * correction_k / mu_k\\ \end{align}

其中 kk 是迭代次数,gaingain 是某个常数,mumu 是 TSC 值 与上一次迭代之间的差值。

两个因素决定了 PLL 的行为:迭代间隔时间(称为环路延迟(loop delay))和增益值(gain)。

较短的环路延迟允许派生时钟(derived clock)更快地收敛(converge)到系统时间,但更容易出现振荡(oscillation)和偏移测量误差。
较长的环路延迟通常可以提供更稳定的性能,但派生时钟中的任何误差都需要更长时间才能纠正。

PLL 增益也起着类似的作用:较高的增益会导致振荡,而较低的增益则需要更长的时间来稳定。

在 fasttime 中,当时钟稳定时,环路延迟会周期性地延长;当时钟不稳定时,环路延迟会缩短。

控制延迟加大或减小多少以及 PLL 增益的可以通过 fasttimed 的命令行参数指定。

Clock Sampling 时钟采样

从 fasttime 调用 gettimeofday 到 计算出时间(when the time is evaluated)之间存在延迟,并且在调用返回之前再次存在延迟。估算 派生时钟 与 系统时间 之间偏移量的一种简单方法是 对系统调用中的派生时间进行平均:

1
2
3
4
t1 = get_derived_time();  
t2 = gettimeofday();
t3 = get_derived_time();
offset = t2 - (t1 + t3) / 2;

如果 (t2 - t1) == (t3 - t2),即延迟对称,则 offset 为 0 ,gettimeofday 结果准确。

但有很多原因可能导致结果不准确,例如在上下文切换期间丢失了时间片(这种情况已在 Pentium 4 处理器上运行 Linux 时进行过测量,并确认至少会发生)。

[Shalunov 2000]描述了一种更好的方法,该方法只需假设延迟是随机对称分布的,即可进行多次采样并组合。该方法既在 fasttime 中实现,用于采样系统时间;也在演示应用程序中实现,用于测试 fasttime 的准确性。

Rate change amortisation 速率变动摊销

获取当前时间的主要应用之一是测量某个进程的运行时间。对于这类应用来说,时间的绝对准确性并不那么重要,只要速率稳定且正确即可。fasttime 通过尽可能避免引入剧烈的速率变化来适应这些程序。

当速率变化相对较小时(由于正常的 PLL 程序),变化会在几秒钟内逐渐完成。较大的误差仍会立即得到纠正(例如,系统时间的变化)。

Clock filtering 时钟滤波

由于操作系统对系统时钟的校准不佳,其返回值偶尔会出现 30-100 微秒左右的抖动(glitches)。这种抖动(jitter)通常不会被大多数应用程序察觉,但对于 fasttime 来说却很显著,因为这导致引入时钟振荡(oscillation),而振荡需要一些时间来校正。

为了解决这个问题以及预期的硬件抖动,在将偏移传递给 PLL 之前对偏移施加滤波器。该偏移量会与最近 10 个样本的中位数进行比较;如果偏移量超过该中位数一定量(例如 5 倍),则不会调整时钟。实际上,该样本会被丢弃,尽管它确实会对后续的滤波有所贡献,因此,当系统时间发生真正的变化时,它会在短时间内通过滤波器。

Shared memory protection 共享内存保护

FastTime 在客户端/服务器模型中运行,其中校准表格(calibration table)在单独的进程或线程中不断更新到客户端应用程序。在 fasttime 处于单独进程(fasttimed 守护进程)的情况下,POSIX 共享内存用于允许客户端访问校准。

保护共享内存的标准方法是使用互斥体、信号量或消息传递。在这种情况下,这些都不能应用,因为它需要客户端应用程序进行系统调用,这是不使用 gettimeofday 的主要动机之一。

相反,fasttime 维护一个校准表格的循环数组,其中只有一个在任何时候处于活动状态(这意味着它用于计算客户端的派生时间)。守护进程或校准线程更新未使用的校准表,然后以原子方式更新指向活动表格(active table)的索引。这种原子更新是通过 CPU 指令而不是系统调用完成的。

Terminology  术语

  • System time  系统时间
    • The current time, as returned by gettimeofday 当前时间,由 gettimeofday 返回
  • Derived time  派生时间
    • Current time, calculated by fasttime 当前时间,由 fasttime 计算
  • Offset  偏移
    • The difference between system and derived time 系统时间与派生时间的差值
  • Rate  率
    • The frequency, or speed of a clock. For system time this is ideally 1 sec/sec, but may vary due to wander or NTP adjustment. 时钟的频率或速度。对于系统时间,理想情况下为 1 秒/秒,但可能会因漂移或 NTP 调整而变化
  • Loop delay  循环延迟
    • Time between iterations of the PLL PLL 迭代之间的时间

References  引用

Clock Discipline Algorithms for the Network Time Protocol Version 4, D. Mills 1997
网络时间协议第 4 版的时钟规则算法 ,D. Mills 1997

Adaptive Hybrid Clock Discipline Algorithm for the Network Time Protocol, D. Mills 1998
网络时间协议的自适应混合时钟规则算法 ,D. Mills 1998

PC Based Precision Timing Without GPS, A. Pasztor & D. Veitch 2002
基于 PC 的无 GPS 精确计时 ,A. Pasztor 和 D. Veitch 2002

NTP Implementation and Assumptions about the Network, S. Shalunov 2000
NTP 实施和关于网络的假设 ,S. Shalunov 2000

Source code for NTP4 is also an excellent reference as it differs from the descriptions above.
NTP4 的源代码也是一个很好的参考,因为它与上面的描述不同。

benchmark

fast差距

标准库和

获取纳秒的方式一览

通用、够快:在现代 Linux 上,glibc 的 clock_gettime 通常通过 vDSO 在用户态完成,不走系统调用,已经非常快;std::chrono 常常就是对它的封装。std::chrono::steady_clock,等价于clock_gettime(CLOCK_MONOTONIC)。若想完全不受 NTP 微调影响,则用CLOCK_MONOTONIC_RAW

C++

1
2
3
4
5
6
7
8
#include <chrono>
#include <cstdint>

uint64_t now_ns() {
using namespace std::chrono;
auto t = steady_clock::now(); // 一般映射到 CLOCK_MONOTONIC
return duration_cast<nanoseconds>(t.time_since_epoch()).count();
}

POSIX(精细控制时钟源)

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <time.h>
#include <stdint.h>

static inline uint64_t clock_gettime_ns(clockid_t clk) {
struct timespec ts;
clock_gettime(clk, &ts); // glibc 多数情况走 vDSO
return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec;
}

// 例子:稳定单调时钟: 一般业务/性能测试 CLOCK_MONOTONIC 就足够且更通用。
uint64_t t1 = clock_gettime_ns(CLOCK_MONOTONIC);
// 例子:完全不受 NTP 微调影响: 做高精度基准、需要避免 NTP 频率微调时的轻微漂移。
uint64_t t2 = clock_gettime_ns(CLOCK_MONOTONIC_RAW);

极限、低开销、最快:读 TSC(RDTSC / RDTSCP)。前提:x86_64invariant TSC,确认 CPU 支持 constant_tsc/nonstop_tsc/proc/cpuinfo),尽量 绑核,对虚拟化/跨核迁移要谨慎。需要自己把“周期 到 纳秒”做一次标定。只用于同机短区间相对时间。

读周期计数(RDTSCP,带序列化)

1
2
3
4
5
6
7
8
9
#include <cstdint>
#ifdef __x86_64__
static inline uint64_t rdtsc() {
unsigned int lo, hi;
unsigned int aux;
asm volatile ("rdtscp" : "=a"(lo), "=d"(hi), "=c"(aux) ::);
return ((uint64_t)hi << 32) | lo;
}
#endif

把周期换算成纳秒(简单标定一次 TSC 频率)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <time.h>
#include <unistd.h>

double calibrate_tsc_ghz() {
// 用 MONOTONIC_RAW 做 100ms 的简易标定
struct timespec a,b;
uint64_t c0 = rdtsc();
clock_gettime(CLOCK_MONOTONIC_RAW, &a);
usleep(100000); // 100ms
uint64_t c1 = rdtsc();
clock_gettime(CLOCK_MONOTONIC_RAW, &b);
double dt_ns = (b.tv_sec - a.tv_sec)*1e9 + (b.tv_nsec - a.tv_nsec);
double cycles = double(c1 - c0);
return cycles / dt_ns; // GHz = cycles per ns
}

uint64_t tsc_cycles_to_ns(uint64_t cycles, double tsc_ghz) {
// ns = cycles / (GHz)
return (uint64_t)(cycles / tsc_ghz);
}

注意要点

  • 读区间时,常用模式:start=rdtsc(); … ; end=rdtsc();
    若对序列化严格:lfence; rdtsc;……rdtscp; lfence;
  • 线程绑核sched_setaffinity)可避免跨核不同步导致的抖动。
  • 仅做相对时间,不要当作“系统时间”。

要“对点墙钟”的真实时间戳(日志、审计) :用 CLOCK_REALTIME(受 NTP/chrony 调整); 若机器跑了 PTP 且网卡有 PHC:读 /dev/ptpX 对应的 clock(最准的对时)。

系统墙钟(受 NTP/chrony 校时):

1
uint64_t realtime_ns = clock_gettime_ns(CLOCK_REALTIME);

PTP 硬件时钟(更准,对时最强)

1
2
3
4
5
6
7
8
9
#include <sys/timex.h>
#include <sys/ioctl.h>
#include <fcntl.h>
#include <linux/ptp_clock.h>

int fd = open("/dev/ptp0", O_RDONLY);
clockid_t clk = FD_TO_CLOCKID(fd);
uint64_t phc_ns = clock_gettime_ns(clk); // 读取网卡 PHC
close(fd);

若系统跑了 linuxptpptp4l/phc2sys),PHC 与系统时钟会保持纳秒级一致。

对点墙钟是什么

对比项 传统挂钟 智能对点墙钟
​核心功能​ 自行走时,依赖内部机芯(石英或机械) ​自动接收信号校准时间​​,消除累积误差
​对时方式​ 手动调节 自动(如GPS、NTP、电波)
​时间精度​ 有机芯本身存在的误差,会累积 ​极高​​,与标准时间源保持同步
​多钟同步​ 难以实现,各钟显示时间可能存在差异 ​轻松实现​​,所有时钟显示完全一致的时间
​典型应用​ 家居、普通办公室 ​学校、医院、车站、工厂、办公楼​​等需要统一时间的公共场所

纳秒时间戳怎么实现

我们通过记录 TSC,最后批量换算 ns 即可。

单次真实“纳秒时间戳”调用 <1 ns 不现实

  • clock_gettime(vDSO)通常十几到数十 ns;
  • RDTSC/RDTSCP 也要 ~5–15 ns(视 CPU/栅栏而定)。

解决:在热路径记录TSC(CPU 周期计数),把换算成 ns 的工作放到批量/后台。只要你的业务需要“本地单调时序”而不是“墙钟对时”,这就完美契合。

读 TSC(建议 RDTSCP + 绑核)

1
2
3
4
5
6
7
8
9
10
static inline __attribute__((always_inline)) uint64_t rdtsc_ordered() {
#if defined(__x86_64__)
unsigned aux, lo, hi;
asm volatile("rdtscp" : "=a"(lo), "=d"(hi), "=c"(aux) :: "memory");
// RDTSCP 自带乱序屏障,读后不会被重排到指令前
return (uint64_t(hi) << 32) | lo;
#else
# error "Use aarch64 cntvct_el0 or platform-specific counter"
#endif
}

运行前检查 CPU 有 invariant/constant_tsc;线程用 sched_setaffinity 绑核,避免跨核 TSC 偏差

固定点比例换算(高效、无浮点除法)

思路:ns = base_ns + ((tsc - base_tsc) * mult) >> shift
在启动/定时校准时,用 CLOCK_MONOTONIC_RAW 标定 mult/shift

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <time.h>
#include <atomic>

struct TscCalib {
uint64_t base_tsc;
uint64_t base_ns;
uint32_t mult; // cycles -> ns 的乘子(放大后右移)
uint32_t shift; // 右移位数
};

// 简易标定:100ms 窗口;生产里可做多次取中位数
inline TscCalib calibrate_tsc_fixedpoint() {
timespec a{}, b{};
// 读起始
clock_gettime(CLOCK_MONOTONIC_RAW, &a);
uint64_t t0 = rdtsc_ordered();
// 睡 100ms(或忙等更稳定)
struct timespec req{0, 100000000};
nanosleep(&req, nullptr);
uint64_t t1 = rdtsc_ordered();
clock_gettime(CLOCK_MONOTONIC_RAW, &b);

uint64_t dt_ns = uint64_t(b.tv_sec - a.tv_sec) * 1000000000ull
+ uint64_t(b.tv_nsec - a.tv_nsec);
uint64_t dt_cy = t1 - t0;

// 计算 (ns/cycle) 的定点表示:找个 shift 使 mult 不溢出
// 我们想要:ns ≈ (cycles * mult) >> shift
uint32_t shift = 24; // 经验值;可根据 dt_cy 调整
uint64_t mult64 = ((dt_ns << shift) + (dt_cy/2)) / dt_cy;

TscCalib c{};
c.base_tsc = t1;
c.base_ns = uint64_t(b.tv_sec) * 1000000000ull + b.tv_nsec;
c.mult = (uint32_t)mult64;
c.shift = shift;
return c;
}

inline uint64_t tsc_to_ns(uint64_t tsc, const TscCalib& c) {
uint64_t d = tsc - c.base_tsc;
// 128bit 乘法更稳;GCC/Clang 下内建 __int128
__uint128_t prod = (__uint128_t)d * c.mult;
return c.base_ns + (uint64_t)(prod >> c.shift);
}

热路径怎么存储:
只做tsc = rdtsc_ordered();tsc 写入环形队列 / 结构体。

大概 5 - 15 ns。

可以不在每次操作时都真的读取 TSC,而是每 N 次 / 每一批读一次,写同一个批次的时间共用一个时间戳(或首尾时间戳 + 序号插值)。这样平均到每次调用就会 小于 1 ns 了。

批量阶段:
取出tsc批量tsc_to_ns(),一次循环里用固定点乘法右移,吞吐很高(无 syscalls)。

若必须每次都带 ns 值:就只能接受~10 ns 级开销(TSC + 换算)或~20-60 ns (vDSO clock_gettime),做不到 <1 ns。

如果需要“对时”怎么办(墙钟/与行情时间对齐)

  • 本机“墙钟”用 CLOCK_REALTIME;但开销 > TSC。
  • 有 PTP 的交易网卡(X710/E810 等)可读 PHC(/dev/ptpX)做对时,再把 PHC 与 TSC 做一次线性拟合,得到TSC→UTC 的映射(同样固定点转换)。这样热路径仍旧只记 TSC,离线批量转 UTC 纳秒

工程化清单

  • 绑核 + 关闭频率波动(performance governor)、确认 invariant TSC
  • 位图常驻:把位图和它的读写者放同核,避免跨核伪共享;读多写少时用双缓冲或 RCU,避免读到撕裂的 64 位。
  • 禁止异常、RTTI,开 LTO;必要时 -fno-plt -fno-asynchronous-unwind-tables
  • 事件日志:用无锁环形队列,记录 {idx, result, tsc};批量 flush 时统一换算。
  • 监控:定期重标定 TSC(如每几秒做一次短窗口),防极端漂移;记录校准参数版本号到事件中,确保可逆。

迷你示例:批量查位 + 批量打时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <cstdint>
#include <cstddef>

struct alignas(64) Bitset20000 {
// 20000 bits -> 313 x u64 (199+1? 实际需要 (20000+63)/64 = 313)
static constexpr size_t kWords = (20000 + 63) / 64;
uint64_t w[kWords];
};

// __restrict__ + always_inline + noalias,帮助编译器矢量化/消掉别名顾虑
static inline __attribute__((always_inline))
bool bit_test(const uint64_t* __restrict bs64, size_t i) {
// 无分支:一次加载 + 位移 + 与
return (bs64[i >> 6] >> (i & 63)) & 1u;
}

//--------------------------------------------------------------------
struct Event { uint32_t idx; uint8_t val; uint64_t tsc; };

void process_batch(const Bitset20000& bs, const uint32_t* idxs, size_t n,
Event* out, const TscCalib& calib) {
// 只在批首读一次 TSC(也可每K次读一次)
uint64_t tsc0 = rdtsc_ordered();
for (size_t i = 0; i < n; ++i) {
bool v = bit_test(bs.w, idxs[i]);
out[i] = { idxs[i], (uint8_t)v, tsc0 };
}
// 批处理末尾再读一次,必要时你也能给范围时间
}

// 批量转 ns(离线/低频线程)
void convert_to_ns(Event* evts, size_t n, const TscCalib& c) {
for (size_t i = 0; i < n; ++i) {
uint64_t ns = tsc_to_ns(evts[i].tsc, c);
// 写回或落盘…
(void)ns;
}
}

小结

  • 查位:上面那段无分支 bit-test + L1 热数据,单次 ~1 ns 级
  • 时间戳:想要纳秒值但平均 <1 ns,只能热路径记 TSC,批量换算 ns;若必须每次拿 ns,就接受 10–60 ns 的现实(TSC+换算 或 vDSO)。
  • 如需与交易所/撮合时间对齐,用 PHC/PTP 做对时,再把 TSC→UTC 的映射用于批量转换。

Quill

  • 高效“持续不断”打印纳秒时间戳的核心在于:前端线程用 TSC 直接采集 rdtsc 值并入队,后端线程运行 RdtscClock 周期校准,采用无锁转换算法将 rdtsc 快速转换为自纪元以来的纳秒;再由格式化器按 %Qns 输出纳秒。
  • 队列采用单生产者单消费者(SPSC)环形缓冲,生产与消费均为 wait-free(无锁)。根据配置有“有界/无界 + 阻塞/丢弃”策略,但底层 SPSC 操作是无锁的;例如“UnboundedBlocking”只在触达上限时阻塞策略层面,而非用锁。

前端使用 TSC,把原始 rdtsc 推到队列;后端用 RdtscClock 周期与墙钟同步并用无锁算法转换为纳秒:

任何线程可获取与后端 TSC 时钟同步的“纳秒 since epoch”;内部通过 BackendManager::convert_rdtsc_to_epoch_time
BackendTscClock.h

1
2
3
4
5
6
7
8
9
10
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT static time_point now() noexcept
{
uint64_t const ts = detail::BackendManager::instance().convert_rdtsc_to_epoch_time(detail::rdtsc());

return ts ? time_point{std::chrono::nanoseconds{ts}}
: time_point{std::chrono::nanoseconds{
std::chrono::time_point_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count()}};
}

后端在首次遇到 TSC 源时懒初始化 RdtscClock,然后把前端塞入的 rdtsc 转成纳秒:
BackendWorker.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (transit_event->logger_base->_clock_source == ClockSourceType::Tsc)
{
// If using the rdtsc clock, convert the value to nanoseconds since epoch.
// This conversion ensures that every transit inserted in the buffer below has a timestamp in
// nanoseconds since epoch, allowing compatibility with Logger objects using different clocks.
if (QUILL_UNLIKELY(!_rdtsc_clock.load(std::memory_order_relaxed)))
{
// Lazy initialization of rdtsc clock on the backend thread only if the user decides to use
// it. The clock requires a few seconds to init as it is taking samples first.
_rdtsc_clock.store(new RdtscClock{_options.rdtsc_resync_interval}, std::memory_order_release);
_last_rdtsc_resync_time = std::chrono::steady_clock::now();
}

// Convert the rdtsc value to nanoseconds since epoch.
transit_event->timestamp =
_rdtsc_clock.load(std::memory_order_relaxed)->time_since_epoch(transit_event->timestamp);
}

RdtscClock 的核心转换:以校准基准 base_time 与 base_tsc,按 ns_per_tick 做线性换算;当 rdtsc 差值超过阈值时触发重同步:

RdtscClock.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
uint64_t time_since_epoch(uint64_t rdtsc_value) const noexcept
{
auto const index = _version.load(std::memory_order_relaxed) & (_base.size() - 1);

auto diff = static_cast<int64_t>(rdtsc_value - _base[index].base_tsc);

if (diff > _resync_interval_ticks)
{
resync(resync_lag_cycles);
diff = static_cast<int64_t>(rdtsc_value - _base[index].base_tsc);
}

return static_cast<uint64_t>(_base[index].base_time +
static_cast<int64_t>(static_cast<double>(diff) * _ns_per_tick));
}

后端格式化时按纳秒传给时间戳格式化器;%Qns 专门支持纳秒输出:

PatternFormatter.h

1
2
3
4
if (_is_set_in_pattern[Attribute::Time])
{
_set_arg_val<Attribute::Time>(_timestamp_formatter.format_timestamp(std::chrono::nanoseconds{timestamp}));
}

TimestampFormatter.h

1
2
3
4
* same format specifiers as strftime() but with the following additional specifiers :
* 1) %Qms - Milliseconds
* 2) %Qus - Microseconds
* 3) %Qns - Nanoseconds
1
2
3
4
5
6
7
8
9
10
11
12
13
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::string_view format_timestamp(std::chrono::nanoseconds time_since_epoch)
{
int64_t const timestamp_ns = time_since_epoch.count();
...
if (_additional_format_specifier == AdditionalSpecifier::Qns)
{
static constexpr std::string_view zeros{"000000000"};
_formatted_date.append(zeros);
_write_fractional_seconds(extracted_ns);
}
...
return std::string_view{_formatted_date.data(), _formatted_date.size()};
}

队列模型:线程局部的 SPSC 队列把前端日志传给后端;默认“无界 + 阻塞”策略,但本质是 SPSC:
overview.rst

1
2
3
4
5
6
7
8
9
10
11
12
13
Reliable Logging Mechanism

--------------------------

Quill utilizes a thread-local single-producer-single-consumer queue to relay logs to the backend thread, ensuring that log messages are never dropped.

Initially, an unbounded queue with a small size is used to optimise performance.

However, if the queue reaches its capacity, a new queue will be allocated, which may cause a slight performance penalty for the frontend.

The default unbounded queue can expand up to a size of `FrontendOptions::unbounded_queue_max_capacity`. If this limit is reached, the caller thread will block.

It's possible to change the queue type within the :cpp:class:`FrontendOptions`.

无界 SPSC 的注释直接说明“生产/消费均为 wait-free(无锁)”,满时切换到新节点继续产出:
UnboundedSPSCQueue.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* A single-producer single-consumer FIFO circular buffer
*
* The buffer allows producing and consuming objects
*
* Production is wait free.
*
* When the internal circular buffer becomes full a new one will be created and the production
* will continue in the new buffer.
*
* Consumption is wait free. If not data is available a special value is returned. If a new
* buffer is created from the producer the consumer first consumes everything in the old
* buffer and then moves to the new buffer.
*/

有界 SPSC 明确是环形缓冲实现,作为无界队列的基础块:
BoundedSPSCQueue.h

1
2
3
/**
* A bounded single producer single consumer ring buffer.
*/

前端写队列的调用点(prepare_write/commit),无锁热路径:

Logger.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
QUILL_NODISCARD QUILL_ATTRIBUTE_HOT std::byte* _reserve_queue_space(size_t total_size,
MacroMetadata const* macro_metadata)
{
std::byte* write_buffer =
_thread_context->get_spsc_queue<frontend_options_t::queue_type>().prepare_write(total_size);
...
else if constexpr ((frontend_options_t::queue_type == QueueType::BoundedBlocking) ||
(frontend_options_t::queue_type == QueueType::UnboundedBlocking))
{
if (QUILL_UNLIKELY(write_buffer == nullptr))
{
...
do
{
...
write_buffer =
_thread_context->get_spsc_queue<frontend_options_t::queue_type>().prepare_write(total_size);
} while (write_buffer == nullptr);
}
}

return write_buffer;
}

队列策略(有界/无界 + 阻塞/丢弃)的官方说明:

frontend_options.rst

1
2
3
4
- **UnboundedBlocking**: Starts with a small initial capacity. The queue reallocates up to `FrontendOptions::unbounded_queue_max_capacity` and then blocks the calling thread until space becomes available.
- **UnboundedDropping**: Starts with a small initial capacity. The queue reallocates up to `FrontendOptions::unbounded_queue_max_capacity` and then discards log messages.
- **BoundedBlocking**: Has a fixed capacity and never reallocates. It blocks the calling thread when the limit is reached until space becomes available.
- **BoundedDropping**: Has a fixed capacity and never reallocates. It discards log messages when the limit is reached.

简短总结

  • 前端用 rdtsc 极快取样,后端 RdtscClock 周期校准,以无锁算法转换为纳秒;格式化器 %Qns 输出纳秒。

  • 队列为线程局部 SPSC 环形缓冲,生产/消费均 wait-free(无锁);策略层面可选“阻塞/丢弃/有界/无界”。因此:是无锁队列。

Benchmark_bitset

场景

数据规模20000个,但有效数据很稀疏,50个。存的只是bool真假值。

需要做 benchmark 调研:std::vector<bool>
boost::dynamic_bitsetstd::bitsetflat_set<short>flat_unordered_set<short>的性能差异调研。

std::bitset是固定大小的位集。

std::vector<bool>是可扩展的位集。1 个 bool 数据占 1 位。

官方宣称,如果在编译时已知位集的大小,可以使用std::bitset,它提供了更丰富的成员函数集。

boost::dynamic_bitset可以作为std::vector<bool>的替代方案。

https://www.boost.org/doc/libs/latest/libs/dynamic_bitset/dynamic_bitset.html

还需要注意:

std::vector<bool>表示形式可能经过优化,std::vector<bool>不一定满足所有 Container 或 SequenceContainer 的要求。例如,由于 std::vector<bool> 是实现定义的,因此它可能不满足 LegacyForwardIterator 的要求。使用诸如 std::search 之类需要 LegacyForwardIterator 的算法可能会导致编译时或运行时错误 。

Boost.Container 版本的 vector 并不专门针对 bool 。

总结

对稀疏真值位集(N=20,000,K≈50)在不同容器实现上的构建、查询与顺序扫描性能进行严谨对比:

  • std::bitset<N>
  • boost::dynamic_bitset<>
  • std::vector<bool>
  • boost::container::flat_set<short>
  • boost::unordered_flat_set<short>

骨架

先创建一个结构化的基准测试工程骨架(CMake + Google Benchmark + 示例基准项)。
先创建目录:
benchmark/bitset_bench/

  • 基准框架:Google Benchmark(统计稳健性、自动热身、汇总)。

benchmark根目录的CMake文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
cmake_minimum_required(VERSION 3.16)
project(bitset_bench LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)

option(BENCH_ENABLE_LTO "Enable link time optimization" ON)
if(BENCH_ENABLE_LTO)
include(CheckIPOSupported)
check_ipo_supported(RESULT ipo_supported OUTPUT ipo_error)
if(ipo_supported)
set(CMAKE_INTERPROCEDURAL_OPTIMIZATION ON)
endif()
endif()

include(FetchContent)

# Google Benchmark
FetchContent_Declare(
benchmark
GIT_REPOSITORY https://github.com/google/benchmark.git
GIT_TAG v1.8.3
)
FetchContent_MakeAvailable(benchmark)

add_subdirectory(bitset_bench)


子目录bitset_bench的CMake文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
set(BENCH_N 20000 CACHE STRING "Domain size (bitset length)")
set(BENCH_TRUE_COUNT 50 CACHE STRING "Number of true bits (sparsity)")
set(BENCH_QUERY_COUNT 100000 CACHE STRING "Random queries per iteration")

add_executable(bitset_bench main.cpp)
target_compile_features(bitset_bench PRIVATE cxx_std_20)

find_package(Threads REQUIRED)
option(ENABLE_BOOST_BENCH "Enable Boost-based benchmarks (dynamic_bitset / unordered_flat_set)" ON)

target_link_libraries(bitset_bench PRIVATE benchmark::benchmark Threads::Threads)

target_compile_definitions(bitset_bench PRIVATE
BENCH_N=${BENCH_N}
BENCH_TRUE_COUNT=${BENCH_TRUE_COUNT}
BENCH_QUERY_COUNT=${BENCH_QUERY_COUNT}
)

if (ENABLE_BOOST_BENCH)
find_package(Boost QUIET)
if (Boost_FOUND)
target_include_directories(bitset_bench PRIVATE ${Boost_INCLUDE_DIRS})
target_compile_definitions(bitset_bench PRIVATE HAS_BOOST=1)
else()
message(WARNING "Boost not found; disabling Boost-based benchmarks. Install Boost or set -DENABLE_BOOST_BENCH=OFF.")
endif()
endif()

if (CMAKE_CXX_COMPILER_ID MATCHES "Clang|AppleClang|GNU")
target_compile_options(bitset_bench PRIVATE -O3 -DNDEBUG -Wall -Wextra -Wpedantic)
endif()

依赖

  • CMake >= 3.16
  • C++23 编译器(AppleClang/Clang/GCC)
    • 因为C++23才支持flat_set
  • Google Benchmark(自动 FetchContent)
  • Boost(头文件即可,用于 dynamic_bitset / unordered_flat_set

编译选项

  • 固定工作负载:默认 N=20,000,K=50,随机查询 Q=100,000(50%命中)。
    • 均可通过编译期宏覆盖:-DBENCH_N-DBENCH_TRUE_COUNT-DBENCH_QUERY_COUNT
  • 编译设置:Release(-O3 -DNDEBUG),可选 LTO(默认启用)。

Cpp程序

编译期选择 Index 类型

1
2
3
4
5
// Use the narrowest index type that can hold [0, BENCH_N)
using IndexType = std::conditional_t<
(kDomainSize - 1) <= static_cast<std::size_t>(std::numeric_limits<std::uint16_t>::max()),
std::uint16_t,
std::uint32_t>;

这是在“编译期”选择索引类型,尽量用更窄的无符号整数来表示区间 [0, BENCH_N) 的索引,以节省内存并提升性能。
逻辑:如果 N−1 不超过 uint16_t 的上限(65535),则 IndexType 取 uint16_t;否则取 uint32_t。这完全在编译期决定,没有运行时开销。
影响:用于存放真值位置、查询序列、以及基于集合的容器键类型。N=20000 时用 uint16_tN=200000 时用 uint32_t
假设与边界:默认假设 N≥1;若未来 N 可能超过 2^32−1,可把分支再扩展到 uint64_t

make_dataset 生成可复现的 K 个唯一索引(真值位置)的数据集

  • 先构造 0..N-1 的索引数组(reserve(domain) 是为避免反复扩容)。(参数 domain 控制)
  • 用固定种子 mt19937 打乱顺序(保证每次运行一致)。
  • 截取前 K 个(得到 K 个索引)。(参数 take 控制)
  • 再 sort + unique 确保“有序且唯一”。

这样返回的 ones_sorted (std::vector<IndexType>)就是升序、不重复、可复现实验的真值位置集合。随后还会用它:

  • complementIndices 求出未命中的索引集合(全域减去已选)。
  • 从命中/未命中集合各采样,合成 50% 命中率的随机查询集,并再打乱顺序。

shuffle

std::shuffle(all.begin(), all.end(), rng)是 C++ 中用来​​随机打乱​​一个序列(比如数组或容器)元素顺序的标准方法。

代码部分 含义 说明
std::shuffle 标准库中的随机重排函数 位于 <algorithm>头文件中
all.begin() 指向序列​​开始​​位置的迭代器 与 all.end()一起定义了需要被打乱的范围 [begin, end)
all.end() 指向序列​​结尾​​(最后一个元素之后)的迭代器
rng ​随机数生成器​ 一个提供了随机性的对象,是 std::shuffle随机性的来源
如果希望每次程序运行时打乱的顺序都相同(例如,为了复现某个测试结果),可以为随机数生成器提供一个​​固定种子​​:
1
2
std::mt19937 rng(42); // 使用固定种子 42
std::shuffle(all.begin(), all.end(), rng); // 每次结果都相同
  • ​区别于已弃用的函数​​:在 C11 之前,人们使用 std::random_shuffle,但它默认依赖 rand()函数,随机性较差且非线程安全。自 C14 起它被弃用,C17 中已被移除。​**​std::shuffle是现代 C 推荐的方式​**。​
  • ​容器要求​​:std::shuffle要求容器支持​​随机访问迭代器​​,因此它适用于 std::vectorstd::arraystd::deque和普通数组等。但不适用于 std::list这类不支持随机访问的容器。

技巧:排序-压缩-删除

C++中一个非常经典的STL操作,用于​​从容器中删除所有重复元素,只保留唯一值​​。它通常被称为“排序-去重”或“unique-erase”惯用法。

下面这个表格可以帮你快速理解整个操作过程:

步骤 函数 作用 说明
​1. 排序​ std::sort(all.begin(), all.end()) 使重复元素相邻 ​(关键前提)​​ 这行代码本身不包含排序,但你必须先做这一步,否则去重无效
​2. 压缩​ std::unique(all.begin(), all.end()) 将相邻重复元素移到末尾 返回一个指向​​新逻辑末尾​​的迭代器
​3. 删除​ all.erase(new_end, all.end()) 物理删除末尾的重复元素 容器大小真正变小

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::vector<IndexType> generateUniqueIndices(std::size_t domain,
std::size_t take,
std::uint32_t seed) {
std::vector<IndexType> all;
all.reserve(domain);
for (std::size_t i = 0; i < domain; ++i)
all.push_back(static_cast<IndexType>(i));
std::mt19937 rng(seed);
std::shuffle(all.begin(), all.end(), rng);
if (take < all.size())
all.resize(take);
std::sort(all.begin(), all.end());
all.erase(std::unique(all.begin(), all.end()), all.end());
return all;
}

Benchmark项目

  1. 从0到20000构建
  2. random query,一半命中,一半未命中。测试contains性能
  3. 顺序扫描

总代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
#include <benchmark/benchmark.h>

#include <algorithm>
#include <array>
#include <cstdint>
#include <limits>
#include <numeric>
#include <random>
#include <type_traits>
#include <utility>
#include <vector>

#include <bitset>
#ifdef HAS_BOOST
# include <boost/dynamic_bitset.hpp>
# include <boost/container/flat_set.hpp>
# include <boost/unordered/unordered_flat_set.hpp>
#endif

#ifndef BENCH_N
# define BENCH_N 20000
#endif
#ifndef BENCH_TRUE_COUNT
# define BENCH_TRUE_COUNT 50
#endif
#ifndef BENCH_QUERY_COUNT
# define BENCH_QUERY_COUNT 100000
#endif

static constexpr std::size_t kDomainSize = static_cast<std::size_t>(BENCH_N);
static constexpr std::size_t kTrueCount = static_cast<std::size_t>(BENCH_TRUE_COUNT);
static constexpr std::size_t kQueryCount = static_cast<std::size_t>(BENCH_QUERY_COUNT);

// Use the narrowest index type that can hold [0, BENCH_N)
using IndexType = std::conditional_t<
(kDomainSize - 1) <= static_cast<std::size_t>(std::numeric_limits<std::uint16_t>::max()),
std::uint16_t,
std::uint32_t>;

static_assert(kTrueCount <= kDomainSize, "True count cannot exceed domain size");

namespace util {

std::vector<IndexType> generateUniqueIndices(std::size_t domain, std::size_t take, std::uint32_t seed) {
std::vector<IndexType> all;
all.reserve(domain);
for (std::size_t i = 0; i < domain; ++i) all.push_back(static_cast<IndexType>(i));
std::mt19937 rng(seed);
std::shuffle(all.begin(), all.end(), rng);
if (take < all.size()) all.resize(take);
std::sort(all.begin(), all.end());
all.erase(std::unique(all.begin(), all.end()), all.end());
return all;
}

std::vector<IndexType> generateRandomIndicesFromPool(const std::vector<IndexType>& pool, std::size_t count, std::uint32_t seed) {
if (pool.empty()) return {};
std::vector<IndexType> out;
out.reserve(count);
std::mt19937 rng(seed);
std::uniform_int_distribution<std::size_t> dist(0, pool.size() - 1);
for (std::size_t i = 0; i < count; ++i) {
out.push_back(pool[dist(rng)]);
}
return out;
}

std::vector<IndexType> complementIndices(const std::vector<IndexType>& present, std::size_t domain) {
std::vector<IndexType> missing;
missing.reserve(domain - present.size());
std::size_t p = 0;
for (std::size_t i = 0; i < domain; ++i) {
IndexType ii = static_cast<IndexType>(i);
if (p < present.size() && present[p] == ii) {
++p;
} else {
missing.push_back(ii);
}
}
return missing;
}

} // namespace util

// Containers under test
using BitsetFixed = std::bitset<BENCH_N>;
#ifdef HAS_BOOST
using BitsetDyn = boost::dynamic_bitset<>;
using FlatSet = boost::container::flat_set<IndexType>;
using UnorderedFlatSet = boost::unordered_flat_set<IndexType>;
#endif

// Build helpers
BitsetFixed build_bitset_fixed(const std::vector<IndexType>& ones) {
BitsetFixed b; // zero-initialized
for (IndexType idx : ones) b.set(static_cast<std::size_t>(idx));
return b;
}

#ifdef HAS_BOOST
BitsetDyn build_bitset_dynamic(const std::vector<IndexType>& ones) {
BitsetDyn b(kDomainSize);
for (IndexType idx : ones) b.set(static_cast<std::size_t>(idx));
return b;
}
#endif

std::vector<bool> build_vector_bool(const std::vector<IndexType>& ones) {
std::vector<bool> v(kDomainSize);
for (IndexType idx : ones) v[static_cast<std::size_t>(idx)] = true;
return v;
}

#ifdef HAS_BOOST
FlatSet build_flat_set(const std::vector<IndexType>& ones) {
FlatSet s;
s.reserve(ones.size());
for (IndexType idx : ones) s.insert(idx);
return s;
}
#endif

#ifdef HAS_BOOST
UnorderedFlatSet build_unordered_flat_set(const std::vector<IndexType>& ones) {
UnorderedFlatSet s;
s.reserve(ones.size());
for (IndexType idx : ones) s.insert(idx);
return s;
}
#endif

// Contains helpers
template <typename T>
inline bool contains_index(const T& container, std::size_t i);

template <> inline bool contains_index<BitsetFixed>(const BitsetFixed& b, std::size_t i) { return b.test(i); }
#ifdef HAS_BOOST
template <> inline bool contains_index<BitsetDyn>(const BitsetDyn& b, std::size_t i) { return b.test(i); }
#endif
template <> inline bool contains_index<std::vector<bool>>(const std::vector<bool>& v, std::size_t i) { return v[i]; }
#ifdef HAS_BOOST
template <> inline bool contains_index<FlatSet>(const FlatSet& s, std::size_t i) { return s.find(static_cast<IndexType>(i)) != s.end(); }
template <> inline bool contains_index<UnorderedFlatSet>(const UnorderedFlatSet& s, std::size_t i) { return s.find(static_cast<IndexType>(i)) != s.end(); }
#endif

// Precomputed data
struct Dataset {
std::vector<IndexType> ones_sorted;
std::vector<IndexType> misses_pool;
std::vector<IndexType> rand_queries_50_50;
};

Dataset make_dataset() {
Dataset d;
d.ones_sorted = util::generateUniqueIndices(kDomainSize, kTrueCount, /*seed=*/0xC0FFEEu);
auto misses = util::complementIndices(d.ones_sorted, kDomainSize);
// 50-50 hits/misses random queries
auto hits = util::generateRandomIndicesFromPool(d.ones_sorted, kQueryCount / 2, /*seed=*/0xBEEF1234u);
auto missq = util::generateRandomIndicesFromPool(misses, kQueryCount - hits.size(), /*seed=*/0xDEADBEEFu);
d.rand_queries_50_50.reserve(hits.size() + missq.size());
d.rand_queries_50_50.insert(d.rand_queries_50_50.end(), hits.begin(), hits.end());
d.rand_queries_50_50.insert(d.rand_queries_50_50.end(), missq.begin(), missq.end());
// Shuffle queries to avoid locality artifacts
std::mt19937 rng(0x1234567u);
std::shuffle(d.rand_queries_50_50.begin(), d.rand_queries_50_50.end(), rng);
d.misses_pool = std::move(misses);
return d;
}

static Dataset g_dataset = make_dataset();

// 1) Build benchmarks
static void BM_build_vector_bool(benchmark::State& state) {
for (auto _ : state) {
benchmark::DoNotOptimize(_);
auto v = build_vector_bool(g_dataset.ones_sorted);
benchmark::DoNotOptimize(v);
benchmark::ClobberMemory();
}
state.counters["N"] = static_cast<double>(kDomainSize);
state.counters["K"] = static_cast<double>(kTrueCount);
}

static void BM_build_bitset_fixed(benchmark::State& state) {
for (auto _ : state) {
benchmark::DoNotOptimize(_);
auto b = build_bitset_fixed(g_dataset.ones_sorted);
benchmark::DoNotOptimize(b);
benchmark::ClobberMemory();
}
state.counters["N"] = static_cast<double>(kDomainSize);
state.counters["K"] = static_cast<double>(kTrueCount);
}

#ifdef HAS_BOOST
static void BM_build_bitset_dynamic(benchmark::State& state) {
for (auto _ : state) {
benchmark::DoNotOptimize(_);
auto b = build_bitset_dynamic(g_dataset.ones_sorted);
benchmark::DoNotOptimize(b);
benchmark::ClobberMemory();
}
state.counters["N"] = static_cast<double>(kDomainSize);
state.counters["K"] = static_cast<double>(kTrueCount);
}
#endif

#ifdef HAS_BOOST
static void BM_build_flat_set(benchmark::State& state) {
for (auto _ : state) {
benchmark::DoNotOptimize(_);
auto s = build_flat_set(g_dataset.ones_sorted);
benchmark::DoNotOptimize(s);
benchmark::ClobberMemory();
}
state.counters["N"] = static_cast<double>(kDomainSize);
state.counters["K"] = static_cast<double>(kTrueCount);
}
#endif

#ifdef HAS_BOOST
static void BM_build_unordered_flat_set(benchmark::State& state) {
for (auto _ : state) {
benchmark::DoNotOptimize(_);
auto s = build_unordered_flat_set(g_dataset.ones_sorted);
benchmark::DoNotOptimize(s);
benchmark::ClobberMemory();
}
state.counters["N"] = static_cast<double>(kDomainSize);
state.counters["K"] = static_cast<double>(kTrueCount);
}
#endif

// 2) Random contains() queries (50% hits, 50% misses), container built outside timed loop
template <typename Container>
static void BM_contains_random(benchmark::State& state, const char* name, const Container& container) {
(void)name; // name is useful if we later add custom labels
const auto& queries = g_dataset.rand_queries_50_50;
for (auto _ : state) {
std::size_t sum = 0;
for (std::size_t i = 0; i < queries.size(); ++i) {
sum += contains_index<Container>(container, static_cast<std::size_t>(queries[i]));
}
benchmark::DoNotOptimize(sum);
}
state.counters["Q"] = static_cast<double>(queries.size());
state.counters["N"] = static_cast<double>(kDomainSize);
state.counters["K"] = static_cast<double>(kTrueCount);
}

static void BM_contains_random_vector_bool(benchmark::State& state) {
auto container = build_vector_bool(g_dataset.ones_sorted);
BM_contains_random(state, "vector_bool", container);
}

static void BM_contains_random_bitset_fixed(benchmark::State& state) {
auto container = build_bitset_fixed(g_dataset.ones_sorted);
BM_contains_random(state, "bitset_fixed", container);
}

#ifdef HAS_BOOST
static void BM_contains_random_bitset_dynamic(benchmark::State& state) {
auto container = build_bitset_dynamic(g_dataset.ones_sorted);
BM_contains_random(state, "bitset_dynamic", container);
}
#endif

#ifdef HAS_BOOST
static void BM_contains_random_flat_set(benchmark::State& state) {
auto container = build_flat_set(g_dataset.ones_sorted);
BM_contains_random(state, "flat_set", container);
}
#endif

#ifdef HAS_BOOST
static void BM_contains_random_unordered_flat_set(benchmark::State& state) {
auto container = build_unordered_flat_set(g_dataset.ones_sorted);
BM_contains_random(state, "unordered_flat_set", container);
}
#endif

// 3) Sequential scan across full domain: count number of true bits
template <typename Container>
static void BM_sequential_scan_count_true(benchmark::State& state, const char* name, const Container& container) {
(void)name;
for (auto _ : state) {
std::size_t cnt = 0;
for (std::size_t i = 0; i < kDomainSize; ++i) {
cnt += contains_index<Container>(container, i);
}
benchmark::DoNotOptimize(cnt);
}
state.counters["N"] = static_cast<double>(kDomainSize);
state.counters["K"] = static_cast<double>(kTrueCount);
}

static void BM_sequential_scan_vector_bool(benchmark::State& state) {
auto container = build_vector_bool(g_dataset.ones_sorted);
BM_sequential_scan_count_true(state, "vector_bool", container);
}

static void BM_sequential_scan_bitset_fixed(benchmark::State& state) {
auto container = build_bitset_fixed(g_dataset.ones_sorted);
BM_sequential_scan_count_true(state, "bitset_fixed", container);
}

#ifdef HAS_BOOST
static void BM_sequential_scan_bitset_dynamic(benchmark::State& state) {
auto container = build_bitset_dynamic(g_dataset.ones_sorted);
BM_sequential_scan_count_true(state, "bitset_dynamic", container);
}
#endif

#ifdef HAS_BOOST
static void BM_sequential_scan_flat_set(benchmark::State& state) {
auto container = build_flat_set(g_dataset.ones_sorted);
BM_sequential_scan_count_true(state, "flat_set", container);
}
#endif

#ifdef HAS_BOOST
static void BM_sequential_scan_unordered_flat_set(benchmark::State& state) {
auto container = build_unordered_flat_set(g_dataset.ones_sorted);
BM_sequential_scan_count_true(state, "unordered_flat_set", container);
}
#endif

BENCHMARK(BM_build_vector_bool);
BENCHMARK(BM_build_bitset_fixed);
// Boost-enabled benchmarks
#ifdef HAS_BOOST
BENCHMARK(BM_build_bitset_dynamic);
BENCHMARK(BM_build_flat_set);
BENCHMARK(BM_build_unordered_flat_set);
#endif

BENCHMARK(BM_contains_random_vector_bool);
BENCHMARK(BM_contains_random_bitset_fixed);
#ifdef HAS_BOOST
BENCHMARK(BM_contains_random_bitset_dynamic);
BENCHMARK(BM_contains_random_flat_set);
BENCHMARK(BM_contains_random_unordered_flat_set);
#endif

BENCHMARK(BM_sequential_scan_vector_bool);
BENCHMARK(BM_sequential_scan_bitset_fixed);
#ifdef HAS_BOOST
BENCHMARK(BM_sequential_scan_bitset_dynamic);
BENCHMARK(BM_sequential_scan_flat_set);
BENCHMARK(BM_sequential_scan_unordered_flat_set);
#endif

BENCHMARK_MAIN();



矩阵模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
impl,N,K,Q,benchmark,min(ns),median(ns),mean(ns),stddev(ns),notes
std::bitset,20000,50,100000,build,,,,,
std::bitset,20000,50,100000,contains_random,,,,,
std::bitset,20000,50,100000,sequential_scan,,,,,
boost::dynamic_bitset,20000,50,100000,build,,,,,
boost::dynamic_bitset,20000,50,100000,contains_random,,,,,
boost::dynamic_bitset,20000,50,100000,sequential_scan,,,,,
std::vector<bool>,20000,50,100000,build,,,,,
std::vector<bool>,20000,50,100000,contains_random,,,,,
std::vector<bool>,20000,50,100000,sequential_scan,,,,,
boost::container::flat_set<short>,20000,50,100000,build,,,,,
boost::container::flat_set<short>,20000,50,100000,contains_random,,,,,
boost::container::flat_set<short>,20000,50,100000,sequential_scan,,,,,
boost::unordered_flat_set<short>,20000,50,100000,build,,,,,
boost::unordered_flat_set<short>,20000,50,100000,contains_random,,,,,
boost::unordered_flat_set<short>,20000,50,100000,sequential_scan,,,,,

测试执行

1
2
3
4
5
6
rm -rf ~/Work/benchmark/build
cmake -S ~/Work/benchmark -B ~/Work/benchmark/build -DCMAKE_BUILD_TYPE=Release -DENABLE_BOOST_BENCH=ON

cmake --build ~/Work/benchmark/build -j

~/Work/benchmark/build/bitset_bench/bitset_bench --benchmark_min_time=0.5 --benchmark_repetitions=15 --benchmark_report_aggregates_only=true

​​-S​​:用于指定​​源代码目录​​,即顶级 CMakeLists.txt文件所在的目录。例如,-S .表示使用当前目录作为源码目录

​​-B​​:用于指定​​构建目录​​,即所有中间构建文件(如 Makefile、CMakeCache.txt)和最终编译输出文件存放的目录。如果指定的目录不存在,CMake 会自动创建它。

最常见的用法是 cmake -S . -B build。这个命令的含义是:使用当前目录(.)的源码,并在当前目录下创建一个名为 build的文件夹来存放所有构建文件

除了 -S和 -B,CMake 还提供了许多其他实用选项。下表进行了归纳:

选项 说明 示例
-D <var>=<value> 设置或覆盖 CMake 缓存变量。这是最常用的选项之一,用于传递配置参数。 -DCMAKE_BUILD_TYPE=Release
-G <generator-name> 指定生成器,即选择使用哪种底层构建系统(如 Makefile 或 Ninja)。 -G "Ninja"
-j [<jobs>] 在运行 cmake --build时使用,指定并行编译的作业数,以加快构建速度。 cmake --build build -j 8
--target <tgt> 在运行 cmake --build时使用,指定要构建的具体目标(如某个可执行文件或库)。 cmake --build build --target m

生成compile_commands.json并在工作区创建符号链接

确保 IDE 能正确解析头文件与跳转。

在 benchmark/CMakeLists.txt中启用CMAKE_EXPORT_COMPILE_COMMANDS

然后,重新运行CMake。

1
cmake -S . -B ./build -DCMAKE_BUILD_TYPE=Release -DCMAKE_EXPORT_COMPILE_COMMANDS=ON

之后,在Work根目录创建符号链接:

1
ln -sf /Users/mrcan/Work/benchmark/build/compile_commands.json /Users/mrcan/Work/compile_commands.json

作用:

  • 在 benchmark/CMakeLists.txt 增加了:
    • set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
  • 重新生成了编译数据库:
    • 生成位置:/Users/mrcan/Work/benchmark/build/compile_commands.json
    • 在工作区根建立符号链接:/Users/mrcan/Work/compile_commands.json
  • C++ 标准与依赖:
    • 已设置 -std=c++20,并通过 benchmark::benchmark 目标自动注入 -I.../benchmark-src/include 搜索路径。