profile
viewpoint
Alex Kladov matklad @tigerbeetledb Lisbon https://matklad.github.io/ Computers, democracy, and nervous disorder.

intellij-rust/intellij-rust 4379

Rust plugin for the IntelliJ Platform

fitzgen/id-arena 90

A simple, id-based arena

cuviper/autocfg 80

Automatic cfg for Rust compiler features

async-rs/a-chat 40

A simple chat implemented on top of async-std

graydon/exhaustigen-rs 37

exhaustive testing library

CAD97/sorbus 24

An experimental reimplementation of rowan, focused on size efficiency

eugenyk/hpcourse 19

Repository to store student's practical works on high performance computing course

delete branch tigerbeetledb/tigerbeetle

delete branch : matklad/headchk

delete time in 7 hours

push eventtigerbeetledb/tigerbeetle

Alex Kladov

commit sha 5dc98dcbe574af0f255070c7be4673043ecd6e82

vopr: catch replica-local bugs earlier State checker catches bug once they break cluster-wide linearizability. However not every correctness bug immediately leads to cluster divergance. For example, if a replica forgets its prepare_ok, this might lead to erroneous truncation during a view change, but it is more likely to be just repaired over. In this PR, we enhance the state checker to take a closer look at individual state of each replica, and prevent replicas from rolling back their commitments. Note that we can't track this purely with asserts inside the replica, as the invariant might be broken across restarts. I've verified that it makes discovering problem in https://github.com/tigerbeetledb/tigerbeetle/pull/828 more likely.

view details

Alex Kladov

commit sha 11350bf2270b33d12aefb612ae4ab62060a92715

Merge pull request #843 from tigerbeetledb/matklad/headchk vopr: catch replica-local bugs earlier

view details

push time in 7 hours

PR merged tigerbeetledb/tigerbeetle

vopr: catch replica-local bugs earlier

State checker catches bug once they break cluster-wide linearizability. However not every correctness bug immediately leads to cluster divergance.

For example, if a replica forgets its prepare_ok, this might lead to erroneous truncation during a view change, but it is more likely to be just repaired over.

In this PR, we enhance the state checker to take a closer look at individual state of each replica, and prevent replicas from rolling back their commitments.

Note that we can't track this purely with asserts inside the replica, as the invariant might be broken across restarts.

I've verified that it makes discovering problem in

https://github.com/tigerbeetledb/tigerbeetle/pull/828

more likely.

+46 -0

0 comment

3 changed files

matklad

pr closed time in 7 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

 pub fn SuperBlockType(comptime Storage: type) type {                     std.mem.zeroes(vsr.Header),                 );             } else {-                assert(context.caller == .checkpoint);+                assert(!context.caller.updates_vsr_headers());             } -            if (context.caller != .view_change) {+            if (context.caller.updates_trailers()) {+                superblock.staging.parent_checkpoint_id = superblock.staging.checkpoint_id();

I am a bit confused here. parent_checkpoint_id should point to parent's checkpoint_id, so I would expect.

superblock.staging.parent_checkpoint_id = superblock.working.checkpoint_id();
sentientwaffle

comment created time in 9 hours

PullRequestReviewEvent

push eventrust-analyzer/metrics

Bot

commit sha f81aa8232f31488d0c8aaf87a5a034d1b2eeeb48

📈

view details

push time in 10 hours

push eventtigerbeetledb/tigerbeetle

Alex Kladov

commit sha 1372260cc2fbe4a40cddbf091e9ac89d42332980

vsr: head can be recovered in an idle cluster When we are in `.recovering_head`, the `op == message.header.op` case isn't necessary outdated, it tells us that our provisional op is in fact ok. This is especially important in idle cluster, where we might not get a fresher op. We _could_ return here if are not in .recovering_head, but let's rather hit the rest of assertions.

view details

Alex Kladov

commit sha e73fdd22ed95a8a3f4e116850ee0a782af788156

replica-test: fix latent bug When we stop a0, we still can have some messages in the network, so we should also cut network to prevent op=30 from being repaired.

view details

Alex Kladov

commit sha ee1c742c0200ae61ddc0c4107cd4d7b79a57dc04

Merge pull request #840 from tigerbeetledb/matklad/recover-last-head vsr: head can be recovered in an idle cluster

view details

push time in 11 hours

delete branch tigerbeetledb/tigerbeetle

delete branch : matklad/recover-last-head

delete time in 11 hours

PR merged tigerbeetledb/tigerbeetle

vsr: head can be recovered in an idle cluster vopr

When we are in .recovering_head, the op == message.header.op case isn't necessary outdated, it tells us that our provisional op is in fact ok. This is especially important in idle cluster, where we might not get a fresher op.

We could return here if are not in .recovering_head, but let's rather hit the rest of assertions.

+27 -2

0 comment

2 changed files

matklad

pr closed time in 11 hours

delete branch tigerbeetledb/tigerbeetle

delete branch : matklad/faster-tests

delete time in 11 hours

PR merged tigerbeetledb/tigerbeetle

test: speed up unit tests

mem.set is quite slow, removing it cuts replica_tests time from 30s to 15s.

My first thought here is to just remove the memset and rely on mmap to lazily initialize with zeros, but it turns out that Zig intentionally doesn't provide this API in the allocator:

https://github.com/ziglang/zig/issues/2090#issuecomment-475641804

That's surprising, but also makes sense. In particular, in this case this pushed me to a better solution --- just randomly initializing the sectors on the first access. This has the same perf bonus, while also increasing the coverage (as we now return random data, rather than just zero).

Technically, this is not initializing, as subsequent init reads of the same sector can return different data, but I think we shouldn't rely on the fact that un-initialized data is frozen. This is certainly not true for memory (MADV_FREE), and I think it could reasonably be true for disks as well.

+10 -12

0 comment

1 changed file

matklad

pr closed time in 11 hours

push eventtigerbeetledb/tigerbeetle

Alex Kladov

commit sha daf9bf70e0aca9c05427e94eed8c306fd7f8ba86

test: speed up unit tests `mem.set` is quite slow, removing it cuts replica_tests time from 30s to 15s. My first thought here is to just remove the memset and rely on mmap to lazily initialize with zeros, _but_ it turns out that Zig intentionally doesn't provide this API in the allocator: <https://github.com/ziglang/zig/issues/2090#issuecomment-475641804> That's surprising, but also makes sense. In particular, in this case this pushed me to a better solution --- just randomly initializing the sectors on the first access. This has the same perf bonus, while also increasing the coverage (as we now return random data, rather than just zero). Technically, this is not initializing, as subsequent init reads of the same sector can return different data, but I think we shouldn't rely on the fact that un-initialized data is frozen. This is certainly not true for memory (MADV_FREE), and I think it could reasonably be true for disks as well.

view details

Alex Kladov

commit sha c5855baceb83f1ef4842a3ca5d31b63911208e1d

Merge pull request #842 from tigerbeetledb/matklad/faster-tests test: speed up unit tests

view details

push time in 11 hours

push eventtigerbeetledb/tigerbeetle

sentientwaffle

commit sha f0a78fdc30292b9ca4ac8bbb7aa9f302c17afd91

Build: Add '-Dconfig-log-level' to build options

view details

Alex Kladov

commit sha 525a0e7ff7e17a5f988f6c4672ac78db7baceb36

replica-test: improve readability This is command-query separation. In this tests, some statements are asserts over the state, and some are commands to change the state. try expectEqual(replicas.open(), .ok) is both, and is too easy to confuse with a pure assertion.

view details

Taras Tsugrii

commit sha e3011f08f7cb688bf3587746e7c5e5a0c2fbe486

[nit][eytzinger] Fix example of eytzinger layout. Looks like a typo because `18` is missing from the Eytzinger layout.

view details

Joran Dirk Greef

commit sha b5fb287e2cdf82cb7d12ad372e40b77395237f10

Merge pull request #844 from ttsugriy/patch-1 [nit][eytzinger] Fix example of eytzinger layout.

view details

djg

commit sha 9580b92075c1c037062c9cfc428d89b4b93b0377

Merge pull request #836 from tigerbeetledb/dj-build-log-level Build: Add `-Dconfig-log-level` to build options

view details

Alex Kladov

commit sha 693f836c2dd8d9d300addaf95d2760212ef3b19d

Merge pull request #839 from tigerbeetledb/matklad/arrange-act-assert replica-test: improve readability

view details

Alex Kladov

commit sha 1372260cc2fbe4a40cddbf091e9ac89d42332980

vsr: head can be recovered in an idle cluster When we are in `.recovering_head`, the `op == message.header.op` case isn't necessary outdated, it tells us that our provisional op is in fact ok. This is especially important in idle cluster, where we might not get a fresher op. We _could_ return here if are not in .recovering_head, but let's rather hit the rest of assertions.

view details

Alex Kladov

commit sha e73fdd22ed95a8a3f4e116850ee0a782af788156

replica-test: fix latent bug When we stop a0, we still can have some messages in the network, so we should also cut network to prevent op=30 from being repaired.

view details

push time in 13 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

 pub const SuperBlockHeader = extern struct {         pub fn monotonic(old: VSRState, new: VSRState) bool {

Could we assert (or maybe maybe) something about statuses here?

sentientwaffle

comment created time in 14 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

+const std = @import("std");+const assert = std.debug.assert;+const maybe = stdx.maybe;++const vsr = @import("../vsr.zig");+const constants = @import("../constants.zig");+const stdx = @import("../stdx.zig");++/// Initial stage: .none+///+/// Transitions:+///+///   .none                  → .cancel_commit | .cancel_grid | .request_target+///   .cancel_commit         → .cancel_grid+///   .cancel_grid           → .request_target+///   .request_target        → .write_sync_start+///   .write_sync_start      → .write_sync_start | .request_trailers+///   .request_trailers      → .write_sync_start | .cancel_grid | .request_manifest_logs+///   .request_manifest_logs → .write_sync_start | .cancel_grid | .write_sync_done+///   .write_sync_done       → .write_sync_start | .done+///   .done                  → .write_sync_start | .none+///+pub const SyncStage = union(enum) {+    /// Not syncing.+    none,++    /// Waiting for a uninterruptible step of the commit chain.+    cancel_commit,++    /// Waiting for `Grid.cancel()`.+    cancel_grid,++    /// We need to sync, but are waiting for a usable `sync_target_max`.+    request_target,++    /// superblock.sync_start()+    write_sync_start: struct { target: SyncTargetCanonical },++    request_trailers: struct {+        target: SyncTargetCanonical,+        manifest: SyncTrailer = .{},+        free_set: SyncTrailer = .{},+        client_sessions: SyncTrailer = .{},++        pub fn done(self: *const @This()) bool {+            return self.manifest.done and self.free_set.done and self.client_sessions.done;+        }+    },++    request_manifest_logs: struct { target: SyncTargetCanonical },++    /// superblock.sync_done()+    write_sync_done: struct { target: SyncTargetCanonical },++    done: struct { target: SyncTargetCanonical },++    pub fn target(stage: *const SyncStage) ?SyncTargetCanonical {+        return switch (stage.*) {+            .none,+            .cancel_commit,+            .cancel_grid,+            .request_target,+            => null,+            .write_sync_start => |s| s.target,+            .request_trailers => |s| s.target,+            .request_manifest_logs => |s| s.target,+            .write_sync_done => |s| s.target,+            .done => |s| s.target,+        };+    }+};++pub const SyncTargetCandidate = struct {+    /// The target's checkpoint identifier.+    checkpoint_id: u128,+    /// The op_checkpoint() that corresponds to the checkpoint id.+    checkpoint_op: u64,+    /// The checksum of the prepare corresponding to checkpoint_op.+    checkpoint_op_checksum: u128,++    pub fn canonical(target: SyncTargetCandidate) SyncTargetCanonical {+        return .{+            .checkpoint_id = target.checkpoint_id,+            .checkpoint_op = target.checkpoint_op,+            .checkpoint_op_checksum = target.checkpoint_op_checksum,+        };+    }+};++pub const SyncTargetCanonical = struct {

Do you mean I should rename SyncTargetCanonical to SyncTarget?

Yes

sentientwaffle

comment created time in 13 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

 pub const SuperBlockHeader = extern struct {         [_]u8{0} ** vsr_headers_reserved_size,      pub const VSRState = extern struct {+        pub const Status = enum(u8) {

:thinking: don't like how this collides with replcia.status. Maybe it's better to think about this not as a status (enum), but as a flag (so, a single bit)?

sentientwaffle

comment created time in 14 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

+//! Verification related to state sync.+//!+//! Check:+//! - the number of simultaneously state-syncing replicas.+//! - that every sync target is a canonical checkpoint.+//! - that every *potential* sync target is a canonical checkpoint.+const std = @import("std");+const assert = std.debug.assert;+const log = std.log.scoped(.sync_checker);++const constants = @import("../../constants.zig");+const vsr = @import("../../vsr.zig");++pub fn SyncCheckerType(comptime Replica: type) type {+    return struct {+        const SyncChecker = @This();++        /// A list of canonical checkpoint ids.+        /// Indexed by checkpoint_index(checkpoint_op).+        canonical: std.ArrayList(u128),++        replicas_syncing: std.StaticBitSet(constants.nodes_max) =+            std.StaticBitSet(constants.nodes_max).initEmpty(),++        pub fn init(allocator: std.mem.Allocator) SyncChecker {+            var canonical = std.ArrayList(u128).init(allocator);+            errdefer canonical.deinit();++            return SyncChecker{+                .canonical = canonical,+            };+        }++        pub fn deinit(checker: *SyncChecker) void {+            checker.canonical.deinit();+        }++        /// Verify that the number of simultaneous syncing (non-standby) replicas does not exceed+        /// the safe limit.+        pub fn replica_sync_start(+            checker: *SyncChecker,+            replica_index: u8,+            replica: *const Replica,+        ) void {+            if (replica.standby()) return;+            checker.replicas_syncing.set(replica_index);++            // This implicitly checks that R=1 and R=2 clusters never state sync.+            // Don't count standbys since they aren't part of the replication quorum.+            const quorums = vsr.quorums(replica.replica_count);+            const replicas_syncing_max = max: {+                if (replica.replica_count == 2) {+                    // See Replica.jump_sync_target() and the test "Cluster: sync: R=2" for details.+                    break :max 1;+                } else {+                    break :max replica.replica_count - quorums.replication;+                }+            };++            assert(checker.replicas_syncing.count() <= replicas_syncing_max);+        }++        pub fn replica_sync_done(+            checker: *SyncChecker,+            replica_index: u8,+            replica: *const Replica,

Unclear at this point tbh :-) I think *const Replica would be the safest thing to use, as you can look at its u128 id, or compute index as offset in Replica[] slice

sentientwaffle

comment created time in 14 hours

PullRequestReviewEvent
PullRequestReviewEvent

delete branch tigerbeetledb/tigerbeetle

delete branch : matklad/arrange-act-assert

delete time in 14 hours

PR merged tigerbeetledb/tigerbeetle

replica-test: improve readability

This is command-query separation. In this tests, some statements are asserts over the state, and some are commands to change the state.

try expectEqual(replicas.open(), .ok)

is both, and is too easy to confuse with a pure assertion.

Pre-merge checklist

Performance:

  • [X] I am very sure this PR could not affect performance.
+28 -25

1 comment

1 changed file

matklad

pr closed time in 14 hours

push eventtigerbeetledb/tigerbeetle

Alex Kladov

commit sha daf9bf70e0aca9c05427e94eed8c306fd7f8ba86

test: speed up unit tests `mem.set` is quite slow, removing it cuts replica_tests time from 30s to 15s. My first thought here is to just remove the memset and rely on mmap to lazily initialize with zeros, _but_ it turns out that Zig intentionally doesn't provide this API in the allocator: <https://github.com/ziglang/zig/issues/2090#issuecomment-475641804> That's surprising, but also makes sense. In particular, in this case this pushed me to a better solution --- just randomly initializing the sectors on the first access. This has the same perf bonus, while also increasing the coverage (as we now return random data, rather than just zero). Technically, this is not initializing, as subsequent init reads of the same sector can return different data, but I think we shouldn't rely on the fact that un-initialized data is frozen. This is certainly not true for memory (MADV_FREE), and I think it could reasonably be true for disks as well.

view details

push time in 14 hours

push eventtigerbeetledb/tigerbeetle

Alex Kladov

commit sha ed02e1cabfdd9ccdf094db49b5d0c1416c31908e

test: speed up unit tests `mem.set` is quite slow, removing it cuts replica_tests time from 30s to 15s. My first thought here is to just remove the memset and rely on mmap to lazily initialize with zeros, _but_ it turns out that Zig intentionally doesn't provide this API in the allocator: <https://github.com/ziglang/zig/issues/2090#issuecomment-475641804> That's surprising, but also makes sense. In particular, in this case this pushed me to a better solution --- just randomly initializing the sectors on the first access. This has the same perf bonus, while also increasing the coverage (as we now return random data, rather than just zero). Technically, this is not initializing, as subsequent init reads of the same sector can return different data, but I think we shouldn't rely on the fact that un-initialized data is frozen. This is certainly not true for memory (MADV_FREE), and I think it could reasonably be true for disks as well.

view details

push time in 14 hours

issue commentziglang/zig

terminology update: use the phrase "detectable illegal behavior" rather than "safety-checked undefined behavior"

the docs would say "detectable illegal behavior".

This is splitting hairs and I am not at all feeling strongly, but maybe just "checked illegal behavior"?

  • It is simple
  • It is more precise. detectable leaves the possibility open that the behavior isn't actually detected, while checked implies that it is always defined to trap (in release safe).
andrewrk

comment created time in 14 hours

PullRequestReviewEvent

push eventmatklad/config

Alex Kladov

commit sha 0884f884400c99ead7868c875892dfa34bd2ece1

.

view details

push time in 14 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

+const std = @import("std");+const assert = std.debug.assert;+const maybe = stdx.maybe;++const vsr = @import("../vsr.zig");+const constants = @import("../constants.zig");+const stdx = @import("../stdx.zig");++/// Initial stage: .none+///+/// Transitions:+///+///   .none                  → .cancel_commit | .cancel_grid | .request_target+///   .cancel_commit         → .cancel_grid+///   .cancel_grid           → .request_target+///   .request_target        → .write_sync_start+///   .write_sync_start      → .write_sync_start | .request_trailers+///   .request_trailers      → .write_sync_start | .cancel_grid | .request_manifest_logs+///   .request_manifest_logs → .write_sync_start | .cancel_grid | .write_sync_done+///   .write_sync_done       → .write_sync_start | .done+///   .done                  → .write_sync_start | .none+///+pub const SyncStage = union(enum) {+    /// Not syncing.+    none,++    /// Waiting for a uninterruptible step of the commit chain.+    cancel_commit,++    /// Waiting for `Grid.cancel()`.+    cancel_grid,++    /// We need to sync, but are waiting for a usable `sync_target_max`.+    request_target,++    /// superblock.sync_start()+    write_sync_start: struct { target: SyncTargetCanonical },++    request_trailers: struct {+        target: SyncTargetCanonical,+        manifest: SyncTrailer = .{},+        free_set: SyncTrailer = .{},+        client_sessions: SyncTrailer = .{},++        pub fn done(self: *const @This()) bool {+            return self.manifest.done and self.free_set.done and self.client_sessions.done;+        }+    },++    request_manifest_logs: struct { target: SyncTargetCanonical },++    /// superblock.sync_done()+    write_sync_done: struct { target: SyncTargetCanonical },++    done: struct { target: SyncTargetCanonical },++    pub fn target(stage: *const SyncStage) ?SyncTargetCanonical {+        return switch (stage.*) {+            .none,+            .cancel_commit,+            .cancel_grid,+            .request_target,+            => null,+            .write_sync_start => |s| s.target,+            .request_trailers => |s| s.target,+            .request_manifest_logs => |s| s.target,+            .write_sync_done => |s| s.target,+            .done => |s| s.target,+        };+    }+};++pub const SyncTargetCandidate = struct {+    /// The target's checkpoint identifier.+    checkpoint_id: u128,+    /// The op_checkpoint() that corresponds to the checkpoint id.+    checkpoint_op: u64,+    /// The checksum of the prepare corresponding to checkpoint_op.+    checkpoint_op_checksum: u128,++    pub fn canonical(target: SyncTargetCandidate) SyncTargetCanonical {+        return .{+            .checkpoint_id = target.checkpoint_id,+            .checkpoint_op = target.checkpoint_op,+            .checkpoint_op_checksum = target.checkpoint_op_checksum,+        };+    }+};++pub const SyncTargetCanonical = struct {+    /// The target's checkpoint identifier.+    checkpoint_id: u128,+    /// The op_checkpoint() that corresponds to the checkpoint id.+    checkpoint_op: u64,+    /// The checksum of the prepare corresponding to checkpoint_op.+    checkpoint_op_checksum: u128,+};++pub const SyncTargetQuorum = struct {+    /// The latest known checkpoint identifier from every *other* replica.+    /// Unlike sync_target_max, these SyncTargets are *not* known to be canonical.+    candidates: [constants.replicas_max]?SyncTargetCandidate =+        [_]?SyncTargetCandidate{null} ** constants.replicas_max,++    pub fn replace(+        quorum: *SyncTargetQuorum,+        replica: u8,+        candidate: *const SyncTargetCandidate,+    ) bool {+        if (quorum.candidates[replica]) |candidate_existing| {+            // Ignore old candidate.+            if (candidate.checkpoint_op < candidate_existing.checkpoint_op) {+                return false;+            }++            maybe(candidate.checkpoint_op == candidate_existing.checkpoint_op);+        }+        quorum.candidates[replica] = candidate.*;+        return true;+    }++    pub fn count(quorum: *const SyncTargetQuorum, candidate: *const SyncTargetCandidate) usize {+        var candidates_matching: usize = 0;+        for (quorum.candidates) |candidate_target| {+            if (candidate_target != null and+                candidate_target.?.checkpoint_op == candidate.checkpoint_op and+                candidate_target.?.checkpoint_id == candidate.checkpoint_id)+            {+                assert(std.meta.eql(candidate_target, candidate.*));+                candidates_matching += 1;+            }+        }+        return candidates_matching;

assert(candidates_matching > 0)

sentientwaffle

comment created time in 16 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

+const std = @import("std");+const assert = std.debug.assert;+const maybe = stdx.maybe;++const vsr = @import("../vsr.zig");+const constants = @import("../constants.zig");+const stdx = @import("../stdx.zig");++/// Initial stage: .none+///+/// Transitions:+///+///   .none                  → .cancel_commit | .cancel_grid | .request_target+///   .cancel_commit         → .cancel_grid+///   .cancel_grid           → .request_target+///   .request_target        → .write_sync_start+///   .write_sync_start      → .write_sync_start | .request_trailers+///   .request_trailers      → .write_sync_start | .cancel_grid | .request_manifest_logs+///   .request_manifest_logs → .write_sync_start | .cancel_grid | .write_sync_done+///   .write_sync_done       → .write_sync_start | .done+///   .done                  → .write_sync_start | .none+///+pub const SyncStage = union(enum) {+    /// Not syncing.+    none,++    /// Waiting for a uninterruptible step of the commit chain.+    cancel_commit,++    /// Waiting for `Grid.cancel()`.+    cancel_grid,++    /// We need to sync, but are waiting for a usable `sync_target_max`.+    request_target,++    /// superblock.sync_start()+    write_sync_start: struct { target: SyncTargetCanonical },++    request_trailers: struct {+        target: SyncTargetCanonical,+        manifest: SyncTrailer = .{},+        free_set: SyncTrailer = .{},+        client_sessions: SyncTrailer = .{},++        pub fn done(self: *const @This()) bool {+            return self.manifest.done and self.free_set.done and self.client_sessions.done;+        }+    },++    request_manifest_logs: struct { target: SyncTargetCanonical },++    /// superblock.sync_done()+    write_sync_done: struct { target: SyncTargetCanonical },++    done: struct { target: SyncTargetCanonical },++    pub fn target(stage: *const SyncStage) ?SyncTargetCanonical {+        return switch (stage.*) {+            .none,+            .cancel_commit,+            .cancel_grid,+            .request_target,+            => null,+            .write_sync_start => |s| s.target,+            .request_trailers => |s| s.target,+            .request_manifest_logs => |s| s.target,+            .write_sync_done => |s| s.target,+            .done => |s| s.target,+        };+    }+};++pub const SyncTargetCandidate = struct {+    /// The target's checkpoint identifier.+    checkpoint_id: u128,+    /// The op_checkpoint() that corresponds to the checkpoint id.+    checkpoint_op: u64,+    /// The checksum of the prepare corresponding to checkpoint_op.+    checkpoint_op_checksum: u128,++    pub fn canonical(target: SyncTargetCandidate) SyncTargetCanonical {+        return .{+            .checkpoint_id = target.checkpoint_id,+            .checkpoint_op = target.checkpoint_op,+            .checkpoint_op_checksum = target.checkpoint_op_checksum,+        };+    }+};++pub const SyncTargetCanonical = struct {+    /// The target's checkpoint identifier.+    checkpoint_id: u128,+    /// The op_checkpoint() that corresponds to the checkpoint id.+    checkpoint_op: u64,+    /// The checksum of the prepare corresponding to checkpoint_op.+    checkpoint_op_checksum: u128,+};++pub const SyncTargetQuorum = struct {+    /// The latest known checkpoint identifier from every *other* replica.+    /// Unlike sync_target_max, these SyncTargets are *not* known to be canonical.+    candidates: [constants.replicas_max]?SyncTargetCandidate =+        [_]?SyncTargetCandidate{null} ** constants.replicas_max,++    pub fn replace(+        quorum: *SyncTargetQuorum,+        replica: u8,+        candidate: *const SyncTargetCandidate,+    ) bool {+        if (quorum.candidates[replica]) |candidate_existing| {+            // Ignore old candidate.+            if (candidate.checkpoint_op < candidate_existing.checkpoint_op) {+                return false;+            }++            maybe(candidate.checkpoint_op == candidate_existing.checkpoint_op);+        }+        quorum.candidates[replica] = candidate.*;+        return true;+    }++    pub fn count(quorum: *const SyncTargetQuorum, candidate: *const SyncTargetCandidate) usize {+        var candidates_matching: usize = 0;+        for (quorum.candidates) |candidate_target| {+            if (candidate_target != null and+                candidate_target.?.checkpoint_op == candidate.checkpoint_op and+                candidate_target.?.checkpoint_id == candidate.checkpoint_id)+            {+                assert(std.meta.eql(candidate_target, candidate.*));+                candidates_matching += 1;+            }+        }+        return candidates_matching;+    }+};++pub const SyncTrailer = struct {

Maybe a short comment here? I wasn't able to guess the purpose of the type just from the name. Or maybe add Progress somewhere to the name?

sentientwaffle

comment created time in 16 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

+//! Verification related to state sync.+//!+//! Check:+//! - the number of simultaneously state-syncing replicas.+//! - that every sync target is a canonical checkpoint.+//! - that every *potential* sync target is a canonical checkpoint.+const std = @import("std");+const assert = std.debug.assert;+const log = std.log.scoped(.sync_checker);++const constants = @import("../../constants.zig");+const vsr = @import("../../vsr.zig");++pub fn SyncCheckerType(comptime Replica: type) type {+    return struct {+        const SyncChecker = @This();++        /// A list of canonical checkpoint ids.+        /// Indexed by checkpoint_index(checkpoint_op).+        canonical: std.ArrayList(u128),++        replicas_syncing: std.StaticBitSet(constants.nodes_max) =+            std.StaticBitSet(constants.nodes_max).initEmpty(),++        pub fn init(allocator: std.mem.Allocator) SyncChecker {+            var canonical = std.ArrayList(u128).init(allocator);+            errdefer canonical.deinit();++            return SyncChecker{+                .canonical = canonical,+            };+        }++        pub fn deinit(checker: *SyncChecker) void {+            checker.canonical.deinit();+        }++        /// Verify that the number of simultaneous syncing (non-standby) replicas does not exceed+        /// the safe limit.+        pub fn replica_sync_start(+            checker: *SyncChecker,+            replica_index: u8,+            replica: *const Replica,+        ) void {+            if (replica.standby()) return;+            checker.replicas_syncing.set(replica_index);++            // This implicitly checks that R=1 and R=2 clusters never state sync.+            // Don't count standbys since they aren't part of the replication quorum.+            const quorums = vsr.quorums(replica.replica_count);+            const replicas_syncing_max = max: {+                if (replica.replica_count == 2) {+                    // See Replica.jump_sync_target() and the test "Cluster: sync: R=2" for details.+                    break :max 1;+                } else {+                    break :max replica.replica_count - quorums.replication;+                }+            };++            assert(checker.replicas_syncing.count() <= replicas_syncing_max);+        }++        pub fn replica_sync_done(+            checker: *SyncChecker,+            replica_index: u8,+            replica: *const Replica,+        ) void {+            _ = replica;+            checker.replicas_syncing.unset(replica_index);

assert that the bit was set.

sentientwaffle

comment created time in 16 hours

Pull request review commenttigerbeetledb/tigerbeetle

VSR: State Sync (Part 1/?)

+const std = @import("std");+const assert = std.debug.assert;+const maybe = stdx.maybe;++const vsr = @import("../vsr.zig");+const constants = @import("../constants.zig");+const stdx = @import("../stdx.zig");++/// Initial stage: .none+///+/// Transitions:+///+///   .none                  → .cancel_commit | .cancel_grid | .request_target+///   .cancel_commit         → .cancel_grid+///   .cancel_grid           → .request_target+///   .request_target        → .write_sync_start+///   .write_sync_start      → .write_sync_start | .request_trailers+///   .request_trailers      → .write_sync_start | .cancel_grid | .request_manifest_logs+///   .request_manifest_logs → .write_sync_start | .cancel_grid | .write_sync_done+///   .write_sync_done       → .write_sync_start | .done+///   .done                  → .write_sync_start | .none+///+pub const SyncStage = union(enum) {+    /// Not syncing.+    none,++    /// Waiting for a uninterruptible step of the commit chain.+    cancel_commit,++    /// Waiting for `Grid.cancel()`.+    cancel_grid,++    /// We need to sync, but are waiting for a usable `sync_target_max`.+    request_target,++    /// superblock.sync_start()+    write_sync_start: struct { target: SyncTargetCanonical },++    request_trailers: struct {+        target: SyncTargetCanonical,+        manifest: SyncTrailer = .{},+        free_set: SyncTrailer = .{},+        client_sessions: SyncTrailer = .{},++        pub fn done(self: *const @This()) bool {+            return self.manifest.done and self.free_set.done and self.client_sessions.done;+        }+    },++    request_manifest_logs: struct { target: SyncTargetCanonical },++    /// superblock.sync_done()+    write_sync_done: struct { target: SyncTargetCanonical },++    done: struct { target: SyncTargetCanonical },++    pub fn target(stage: *const SyncStage) ?SyncTargetCanonical {+        return switch (stage.*) {+            .none,+            .cancel_commit,+            .cancel_grid,+            .request_target,+            => null,+            .write_sync_start => |s| s.target,+            .request_trailers => |s| s.target,+            .request_manifest_logs => |s| s.target,+            .write_sync_done => |s| s.target,+            .done => |s| s.target,+        };+    }+};++pub const SyncTargetCandidate = struct {+    /// The target's checkpoint identifier.+    checkpoint_id: u128,+    /// The op_checkpoint() that corresponds to the checkpoint id.+    checkpoint_op: u64,+    /// The checksum of the prepare corresponding to checkpoint_op.+    checkpoint_op_checksum: u128,++    pub fn canonical(target: SyncTargetCandidate) SyncTargetCanonical {+        return .{+            .checkpoint_id = target.checkpoint_id,+            .checkpoint_op = target.checkpoint_op,+            .checkpoint_op_checksum = target.checkpoint_op_checksum,+        };+    }+};++pub const SyncTargetCanonical = struct {+    /// The target's checkpoint identifier.+    checkpoint_id: u128,+    /// The op_checkpoint() that corresponds to the checkpoint id.+    checkpoint_op: u64,+    /// The checksum of the prepare corresponding to checkpoint_op.+    checkpoint_op_checksum: u128,+};++pub const SyncTargetQuorum = struct {+    /// The latest known checkpoint identifier from every *other* replica.+    /// Unlike sync_target_max, these SyncTargets are *not* known to be canonical.+    candidates: [constants.replicas_max]?SyncTargetCandidate =+        [_]?SyncTargetCandidate{null} ** constants.replicas_max,++    pub fn replace(+        quorum: *SyncTargetQuorum,+        replica: u8,+        candidate: *const SyncTargetCandidate,+    ) bool {+        if (quorum.candidates[replica]) |candidate_existing| {+            // Ignore old candidate.+            if (candidate.checkpoint_op < candidate_existing.checkpoint_op) {+                return false;+            }++            maybe(candidate.checkpoint_op == candidate_existing.checkpoint_op);+        }+        quorum.candidates[replica] = candidate.*;+        return true;+    }++    pub fn count(quorum: *const SyncTargetQuorum, candidate: *const SyncTargetCandidate) usize {+        var candidates_matching: usize = 0;+        for (quorum.candidates) |candidate_target| {+            if (candidate_target != null and+                candidate_target.?.checkpoint_op == candidate.checkpoint_op and+                candidate_target.?.checkpoint_id == candidate.checkpoint_id)+            {+                assert(std.meta.eql(candidate_target, candidate.*));+                candidates_matching += 1;+            }+        }+        return candidates_matching;+    }+};++pub const SyncTrailer = struct {+    pub const chunk_size_max = constants.message_body_size_max;++    /// The next offset to fetch.+    offset: u32 = 0,+    done: bool = false,+    final: ?struct { size: u32, checksum: u128 } = null,++    pub fn write(+        trailer: *SyncTrailer,+        data: struct {+            total_buffer: []align(@alignOf(u128)) u8,+            total_size: u32,+            total_checksum: u128,+            chunk: []const u8,+            chunk_offset: u32,+        },+    ) ?[]align(@alignOf(u128)) u8 {+        assert(data.chunk.len <= chunk_size_max);+        assert(data.total_size <= data.total_buffer.len);++        if (trailer.final) |*final| {+            assert(final.checksum == data.total_checksum);+            assert(final.size == data.total_size);+        } else {+            assert(trailer.offset == 0);+            assert(!trailer.done);++            trailer.final = .{+                .checksum = data.total_checksum,+                .size = data.total_size,+            };+        }++        if (trailer.done) return null;++        const total_buffer = data.total_buffer[data.chunk_offset..][0..data.chunk.len];+        if (trailer.offset == data.chunk_offset) {+            stdx.copy_disjoint(.exact, u8, total_buffer, data.chunk);+            trailer.offset += @intCast(u32, data.chunk.len);+            assert(trailer.offset <= data.total_size);++            if (trailer.offset == data.total_size) {+                assert(vsr.checksum(data.total_buffer[0..data.total_size]) == data.total_checksum);++                trailer.done = true;+                return data.total_buffer[0..data.total_size];+            }+        } else {+            if (trailer.offset < data.chunk_offset) {+                // Already received this chunk.

assert(data.chunk_offset + data.chunk.len() <= trailer.offest)

sentientwaffle

comment created time in 15 hours

more