#!/usr/bin/python3
# group: rw quick auto
#
# Regression test for the qcow2 discard / write-zeroes vs in-flight
# allocating-write race.
#
# A qcow2 allocating writer drops s->lock around its data I/O while
# its l2meta is still on s->cluster_allocs (between
# qcow2_alloc_host_offset() and qcow2_alloc_cluster_link_l2()).  A
# concurrent discard or MAY_UNMAP write-zeroes that overlaps the
# writer's cluster can free that cluster before the writer commits
# its L2 entry, leaving an L2 reference to a freed cluster.  qemu-img
# check then reports refcount=0 with a live OFLAG_COPIED reference,
# or refcount < reference once the allocator re-hands the cluster
# out.  At runtime stderr says "qcow2_free_clusters failed: Invalid
# argument".
#
# The race is reachable through three distinct code paths, each
# exercised by a separate scenario below.  All three share the same
# detection: drive a single qemu-io process with a fixed-seed
# workload, then run qemu-img check and assert zero corruptions.
#
#   cluster                              [reachable on stock upstream]
#       Subcluster-aligned writes racing with cluster-aligned
#       MAY_UNMAP write-zeroes.  Hits the cluster-level path through
#       qcow2_subcluster_zeroize -> zero_in_l2_slice.  Fixed by
#       wiring wait_for_dependencies() into the discard / zeroize
#       entries (Kevin's "qcow2: Fix corruption on discard during
#       write with COW").  Uses only operations that are
#       cluster-aligned, so neither the upstream
#       pwrite_zeroes_alignment nor the VZ pdiscard_alignment
#       matters here.
#
#   subcluster-mixed                     [VZ-only trigger]
#       Subcluster-aligned writes racing with subcluster-aligned
#       aio_discard *and* subcluster-aligned MAY_UNMAP
#       write-zeroes.  Hits the subcluster-level path through
#       discard_l2_subclusters / zero_l2_subclusters with the
#       ALL_ZEROES bitmap fall-through to discard_in_l2_slice /
#       zero_in_l2_slice that frees the writer's m->alloc_offset.
#       The aio_discard half of the workload requires the VZ
#       commit "qcow2: make subclusters discardable", which lowers
#       bs->bl.pdiscard_alignment from cluster_size to
#       subcluster_size.  On a stock upstream tree the discard
#       half would be rejected as misaligned; only the
#       MAY_UNMAP-zero half would still race.  Fixed by gating
#       the keep_old_clusters short-circuit in
#       handle_dependencies() on allow_shortening so non-writers
#       (allow_shortening=false) always wait when clusters
#       intersect, even if the COW areas don't.
#
#   subcluster-zeroize                   [reachable on stock upstream]
#       Same subcluster-level race driven only by subcluster
#       MAY_UNMAP write-zeroes (no aio_discard).  This trigger
#       does not depend on any VZ backport -- only on the
#       upstream pwrite_zeroes_alignment = subcluster_size set
#       by "qcow2: Add subcluster support to
#       qcow2_co_pwrite_zeroes()".  Closed by the same
#       handle_dependencies() fix as subcluster-mixed.
#
# The two subcluster scenarios prime every cluster of the contention
# region into ZERO_ALLOC (write + write -z without MAY_UNMAP), so the
# very next subcluster discard / zeroize can flip the L2 bitmap to
# ALL_ZEROES and trigger the cluster-granularity fall-through.
#
# SPDX-License-Identifier: GPL-2.0-or-later

import random
import subprocess

import iotests
from iotests import qemu_img_create, qemu_img_check, qemu_io_wrap_args


iotests.script_initialize(supported_fmts=['qcow2'],
                          supported_platforms=['linux'])

IMG_SIZE = 256 * 1024 * 1024          # 256 MiB
REGION = 64 * 1024 * 1024             # contention region: 64 MiB
CLUSTER = 1024 * 1024                 # 1 MiB
SUBCLUSTER = 32 * 1024                # 32 KiB  (cluster_size / 32)
SUBS_PER_CLUSTER = CLUSTER // SUBCLUSTER
OPS = 5000
SEED = 7


def cluster_workload(rng):
    """Cluster-level race: subcluster writes vs cluster-aligned MAY_UNMAP
    write-zeroes.  No priming -- the random sequence itself drives
    clusters through allocate / free / re-allocate states.

    Reachable on stock upstream qcow2 (no VZ patches required)."""
    max_cluster = REGION // CLUSTER - 1
    lines = []
    for _ in range(OPS):
        cl = rng.randint(0, max_cluster)
        off = cl * CLUSTER
        if rng.random() < 0.5:
            sub = rng.randrange(0, CLUSTER, SUBCLUSTER)
            lines.append(f'aio_write -q {off + sub} 32k')
        else:
            lines.append(f'aio_write -q -z -u {off} 1M')
    return [], lines


def _prime_all_zero_alloc(max_cluster):
    """Bring every cluster in [0, REGION) to ZERO_ALLOC: the cluster is
    allocated (refcount=1, OFLAG_COPIED) and its L2 bitmap is
    ALL_ZEROES.  One subcluster discard / zeroize away from the
    fall-through that frees the cluster."""
    lines = []
    for cl in range(max_cluster + 1):
        off = cl * CLUSTER
        lines.append(f'write -q -P 0xa5 {off} {CLUSTER}')
        lines.append(f'write -q -z {off} {CLUSTER}')
    return lines


def subcluster_mixed_workload(rng):
    """Subcluster-level race with all three op types in the mix:
    subcluster writes (the in-flight allocating writer with
    keep_old_clusters=true and a narrow COW range), subcluster
    aio_discard, and subcluster MAY_UNMAP aio_write -z -u.

    VZ-only trigger: the aio_discard half requires the VZ commit
    "qcow2: make subclusters discardable" (pdiscard_alignment
    lowered to subcluster_size).  On stock upstream the discard
    commands would be rejected as misaligned; only the MAY_UNMAP
    zero half would still race -- which is exactly what the
    subcluster_zeroize_workload below covers.  Keep this scenario
    on top of the upstream-reachable one to also verify the VZ
    aio_discard path against the same fix."""
    max_cluster = REGION // CLUSTER - 1
    prime = _prime_all_zero_alloc(max_cluster)
    lines = []
    for _ in range(OPS):
        cl = rng.randint(0, max_cluster)
        sc = rng.randint(0, SUBS_PER_CLUSTER - 1)
        off = cl * CLUSTER + sc * SUBCLUSTER
        choice = rng.random()
        if choice < 0.5:
            lines.append(f'aio_write -q {off} {SUBCLUSTER}')
        elif choice < 0.75:
            lines.append(f'aio_discard -q {off} {SUBCLUSTER}')
        else:
            lines.append(f'aio_write -q -z -u {off} {SUBCLUSTER}')
    return prime, lines


def subcluster_zeroize_workload(rng):
    """Same subcluster-level race driven entirely by upstream-available
    operations: subcluster writes vs subcluster MAY_UNMAP zeroes.
    No aio_discard, so reproduction does not depend on the VZ
    pdiscard_alignment backport.

    Reachable on stock upstream qcow2 (no VZ patches required)."""
    max_cluster = REGION // CLUSTER - 1
    prime = _prime_all_zero_alloc(max_cluster)
    lines = []
    for _ in range(OPS):
        cl = rng.randint(0, max_cluster)
        sc = rng.randint(0, SUBS_PER_CLUSTER - 1)
        off = cl * CLUSTER + sc * SUBCLUSTER
        if rng.random() < 0.5:
            lines.append(f'aio_write -q {off} {SUBCLUSTER}')
        else:
            lines.append(f'aio_write -q -z -u {off} {SUBCLUSTER}')
    return prime, lines


SCENARIOS = [
    ('cluster',            cluster_workload),
    ('subcluster-mixed',   subcluster_mixed_workload),
    ('subcluster-zeroize', subcluster_zeroize_workload),
]


def run_scenario(name, workload_fn, img):
    rng = random.Random(SEED)
    prime, race = workload_fn(rng)

    qemu_img_create('-f', 'qcow2',
                    '-o', 'cluster_size=1M,extended_l2=on,'
                          'lazy_refcounts=on,refcount_bits=16',
                    img, str(IMG_SIZE))

    cmds = prime + race + ['aio_flush']
    input_bytes = ('\n'.join(cmds) + '\n').encode()

    # --cache=none and --aio=native ensure the writer coroutine
    # actually yields around its data I/O (which is what opens the
    # race window).  Swallow stdout/stderr: the result we care about
    # is the on-disk state, checked below.
    args = qemu_io_wrap_args(['-f', 'qcow2', '-n',
                              '--cache=none', '--aio=native', img])
    subprocess.run(args, input=input_bytes,
                   stdout=subprocess.DEVNULL,
                   stderr=subprocess.DEVNULL,
                   check=True)

    result = qemu_img_check(img)
    corruptions = result.get('corruptions', 0)
    check_errors = result.get('check-errors', 0)
    if corruptions or check_errors:
        iotests.log(f'{name}: FAIL '
                    f'(corruptions={corruptions} '
                    f'check-errors={check_errors})')
    else:
        iotests.log(f'{name}: OK')


def main():
    with iotests.FilePath('disk.img') as img:
        for name, workload_fn in SCENARIOS:
            run_scenario(name, workload_fn, img)


if __name__ == '__main__':
    main()
