--- /dev/null
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+# Author: Breno Leitao <leitao@debian.org>
+"""
+ This test aims to evaluate the netpoll polling mechanism (as in
+ netpoll_poll_dev()). It presents a complex scenario where the network
+ attempts to send a packet but fails, prompting it to poll the NIC from within
+ the netpoll TX side.
+
+ This has been a crucial path in netpoll that was previously untested. Jakub
+ suggested using a single RX/TX queue, pushing traffic to the NIC, and then
+ sending netpoll messages (via netconsole) to trigger the poll.
+
+ In parallel, bpftrace is used to detect if netpoll_poll_dev() was called. If
+ so, the test passes, otherwise it will be skipped. This test is very dependent on
+ the driver and environment, given we are trying to trigger a tricky scenario.
+"""
+
+import errno
+import logging
+import os
+import random
+import string
+import threading
+import time
+from typing import Optional
+
+from lib.py import (
+    bpftrace,
+    CmdExitFailure,
+    defer,
+    ethtool,
+    GenerateTraffic,
+    ksft_exit,
+    ksft_pr,
+    ksft_run,
+    KsftFailEx,
+    KsftSkipEx,
+    NetDrvEpEnv,
+    KsftXfailEx,
+)
+
+# Configure logging
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s - %(levelname)s - %(message)s",
+)
+
+NETCONSOLE_CONFIGFS_PATH: str = "/sys/kernel/config/netconsole"
+NETCONS_REMOTE_PORT: int = 6666
+NETCONS_LOCAL_PORT: int = 1514
+
+# Max number of netcons messages to send. Each iteration will setup
+# netconsole and send MAX_WRITES messages
+ITERATIONS: int = 20
+# Number of writes to /dev/kmsg per iteration
+MAX_WRITES: int = 40
+# MAPS contains the information coming from bpftrace it will have only one
+# key: "hits", which tells the number of times netpoll_poll_dev() was called
+MAPS: dict[str, int] = {}
+# Thread to run bpftrace in parallel
+BPF_THREAD: Optional[threading.Thread] = None
+# Time bpftrace will be running in parallel.
+BPFTRACE_TIMEOUT: int = 10
+
+
+def ethtool_get_ringsize(interface_name: str) -> tuple[int, int]:
+    """
+    Read the ringsize using ethtool. This will be used to restore it after the test
+    """
+    try:
+        ethtool_result = ethtool(f"-g {interface_name}", json=True)[0]
+        rxs = ethtool_result["rx"]
+        txs = ethtool_result["tx"]
+    except (KeyError, IndexError) as exception:
+        raise KsftSkipEx(
+            f"Failed to read RX/TX ringsize: {exception}. Not going to mess with them."
+        ) from exception
+
+    return rxs, txs
+
+
+def ethtool_set_ringsize(interface_name: str, ring_size: tuple[int, int]) -> bool:
+    """Try to the number of RX and TX ringsize."""
+    rxs = ring_size[0]
+    txs = ring_size[1]
+
+    logging.debug("Setting ring size to %d/%d", rxs, txs)
+    try:
+        ethtool(f"-G {interface_name} rx {rxs} tx {txs}")
+    except CmdExitFailure:
+        # This might fail on real device, retry with a higher value,
+        # worst case, keep it as it is.
+        return False
+
+    return True
+
+
+def ethtool_get_queues_cnt(interface_name: str) -> tuple[int, int, int]:
+    """Read the number of RX, TX and combined queues using ethtool"""
+
+    try:
+        ethtool_result = ethtool(f"-l {interface_name}", json=True)[0]
+        rxq = ethtool_result.get("rx", -1)
+        txq = ethtool_result.get("tx", -1)
+        combined = ethtool_result.get("combined", -1)
+
+    except IndexError as exception:
+        raise KsftSkipEx(
+            f"Failed to read queues numbers: {exception}. Not going to mess with them."
+        ) from exception
+
+    return rxq, txq, combined
+
+
+def ethtool_set_queues_cnt(interface_name: str, queues: tuple[int, int, int]) -> None:
+    """Set the number of RX, TX and combined queues using ethtool"""
+    rxq, txq, combined = queues
+
+    cmdline = f"-L {interface_name}"
+
+    if rxq != -1:
+        cmdline += f" rx {rxq}"
+    if txq != -1:
+        cmdline += f" tx {txq}"
+    if combined != -1:
+        cmdline += f" combined {combined}"
+
+    logging.debug("calling: ethtool %s", cmdline)
+
+    try:
+        ethtool(cmdline)
+    except CmdExitFailure as exception:
+        raise KsftSkipEx(
+            f"Failed to configure RX/TX queues: {exception}. Ethtool not available?"
+        ) from exception
+
+
+def netcons_generate_random_target_name() -> str:
+    """Generate a random target name starting with 'netcons'"""
+    random_suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
+    return f"netcons_{random_suffix}"
+
+
+def netcons_create_target(
+    config_data: dict[str, str],
+    target_name: str,
+) -> None:
+    """Create a netconsole dynamic target against the interfaces"""
+    logging.debug("Using netconsole name: %s", target_name)
+    try:
+        os.makedirs(f"{NETCONSOLE_CONFIGFS_PATH}/{target_name}", exist_ok=True)
+        logging.debug(
+            "Created target directory: %s/%s", NETCONSOLE_CONFIGFS_PATH, target_name
+        )
+    except OSError as exception:
+        if exception.errno != errno.EEXIST:
+            raise KsftFailEx(
+                f"Failed to create netconsole target directory: {exception}"
+            ) from exception
+
+    try:
+        for key, value in config_data.items():
+            path = f"{NETCONSOLE_CONFIGFS_PATH}/{target_name}/{key}"
+            logging.debug("Writing %s to %s", key, path)
+            with open(path, "w", encoding="utf-8") as file:
+                # Always convert to string to write to file
+                file.write(str(value))
+
+        # Read all configuration values for debugging purposes
+        for debug_key in config_data.keys():
+            with open(
+                f"{NETCONSOLE_CONFIGFS_PATH}/{target_name}/{debug_key}",
+                "r",
+                encoding="utf-8",
+            ) as file:
+                content = file.read()
+                logging.debug(
+                    "%s/%s/%s : %s",
+                    NETCONSOLE_CONFIGFS_PATH,
+                    target_name,
+                    debug_key,
+                    content.strip(),
+                )
+
+    except Exception as exception:
+        raise KsftFailEx(
+            f"Failed to configure netconsole target: {exception}"
+        ) from exception
+
+
+def netcons_configure_target(
+    cfg: NetDrvEpEnv, interface_name: str, target_name: str
+) -> None:
+    """Configure netconsole on the interface with the given target name"""
+    config_data = {
+        "extended": "1",
+        "dev_name": interface_name,
+        "local_port": NETCONS_LOCAL_PORT,
+        "remote_port": NETCONS_REMOTE_PORT,
+        "local_ip": cfg.addr,
+        "remote_ip": cfg.remote_addr,
+        "remote_mac": "00:00:00:00:00:00",  # Not important for this test
+        "enabled": "1",
+    }
+
+    netcons_create_target(config_data, target_name)
+    logging.debug(
+        "Created netconsole target: %s on interface %s", target_name, interface_name
+    )
+
+
+def netcons_delete_target(name: str) -> None:
+    """Delete a netconsole dynamic target"""
+    target_path = f"{NETCONSOLE_CONFIGFS_PATH}/{name}"
+    try:
+        if os.path.exists(target_path):
+            os.rmdir(target_path)
+    except OSError as exception:
+        raise KsftFailEx(
+            f"Failed to delete netconsole target: {exception}"
+        ) from exception
+
+
+def netcons_load_module() -> None:
+    """Try to load the netconsole module"""
+    os.system("modprobe netconsole")
+
+
+def bpftrace_call() -> None:
+    """Call bpftrace to find how many times netpoll_poll_dev() is called.
+    Output is saved in the global variable `maps`"""
+
+    # This is going to update the global variable, that will be seen by the
+    # main function
+    global MAPS  # pylint: disable=W0603
+
+    # This will be passed to bpftrace as in bpftrace -e "expr"
+    expr = "kprobe:netpoll_poll_dev { @hits = count(); }"
+
+    MAPS = bpftrace(expr, timeout=BPFTRACE_TIMEOUT, json=True)
+    logging.debug("BPFtrace output: %s", MAPS)
+
+
+def bpftrace_start():
+    """Start a thread to call `call_bpf` in a parallel thread"""
+    global BPF_THREAD  # pylint: disable=W0603
+
+    BPF_THREAD = threading.Thread(target=bpftrace_call)
+    BPF_THREAD.start()
+    if not BPF_THREAD.is_alive():
+        raise KsftSkipEx("BPFtrace thread is not alive. Skipping test")
+
+
+def bpftrace_stop() -> None:
+    """Stop the bpftrace thread"""
+    if BPF_THREAD:
+        BPF_THREAD.join()
+
+
+def bpftrace_any_hit(join: bool) -> bool:
+    """Check if netpoll_poll_dev() was called by checking the global variable `maps`"""
+    if not BPF_THREAD:
+        raise KsftFailEx("BPFtrace didn't start")
+
+    if BPF_THREAD.is_alive():
+        if join:
+            # Wait for bpftrace to finish
+            BPF_THREAD.join()
+        else:
+            # bpftrace is still running, so, we will not check the result yet
+            return False
+
+    logging.debug("MAPS coming from bpftrace = %s", MAPS)
+    if "hits" not in MAPS.keys():
+        raise KsftFailEx(f"bpftrace failed to run!?: {MAPS}")
+
+    logging.debug("Got a total of %d hits", MAPS["hits"])
+    return MAPS["hits"] > 0
+
+
+def do_netpoll_flush_monitored(cfg: NetDrvEpEnv, ifname: str, target_name: str) -> None:
+    """Print messages to the console, trying to trigger a netpoll poll"""
+    # Start bpftrace in parallel, so, it is watching
+    # netpoll_poll_dev() while we are sending netconsole messages
+    bpftrace_start()
+    defer(bpftrace_stop)
+
+    do_netpoll_flush(cfg, ifname, target_name)
+
+    if bpftrace_any_hit(join=True):
+        ksft_pr("netpoll_poll_dev() was called. Success")
+        return
+
+    raise KsftXfailEx("netpoll_poll_dev() was not called during the test...")
+
+
+def do_netpoll_flush(cfg: NetDrvEpEnv, ifname: str, target_name: str) -> None:
+    """Print messages to the console, trying to trigger a netpoll poll"""
+    netcons_configure_target(cfg, ifname, target_name)
+    retry = 0
+
+    for i in range(int(ITERATIONS)):
+        if not BPF_THREAD.is_alive() or bpftrace_any_hit(join=False):
+            # bpftrace is done, stop sending messages
+            break
+
+        msg = f"netcons test #{i}"
+        with open("/dev/kmsg", "w", encoding="utf-8") as kmsg:
+            for j in range(MAX_WRITES):
+                try:
+                    kmsg.write(f"{msg}-{j}\n")
+                except OSError as exception:
+                    # in some cases, kmsg can be busy, so, we will retry
+                    time.sleep(1)
+                    retry += 1
+                    if retry < 5:
+                        logging.info("Failed to write to kmsg. Retrying")
+                        # Just retry a few times
+                        continue
+                    raise KsftFailEx(
+                        f"Failed to write to kmsg: {exception}"
+                    ) from exception
+
+        netcons_delete_target(target_name)
+        netcons_configure_target(cfg, ifname, target_name)
+        # If we sleep here, we will have a better chance of triggering
+        # This number is based on a few tests I ran while developing this test
+        time.sleep(0.4)
+
+
+def configure_network(ifname: str) -> None:
+    """Configure ring size and queue numbers"""
+
+    # Set defined queues to 1 to force congestion
+    prev_queues = ethtool_get_queues_cnt(ifname)
+    logging.debug("RX/TX/combined queues: %s", prev_queues)
+    # Only set the queues to 1 if they exists in the device. I.e, they are > 0
+    ethtool_set_queues_cnt(ifname, tuple(1 if x > 0 else x for x in prev_queues))
+    defer(ethtool_set_queues_cnt, ifname, prev_queues)
+
+    # Try to set the ring size to some low value.
+    # Do not fail if the hardware do not accepted desired values
+    prev_ring_size = ethtool_get_ringsize(ifname)
+    for size in [(1, 1), (128, 128), (256, 256)]:
+        if ethtool_set_ringsize(ifname, size):
+            # hardware accepted the desired ringsize
+            logging.debug("Set RX/TX ringsize to: %s from %s", size, prev_ring_size)
+            break
+    defer(ethtool_set_ringsize, ifname, prev_ring_size)
+
+
+def test_netpoll(cfg: NetDrvEpEnv) -> None:
+    """
+    Test netpoll by sending traffic to the interface and then sending
+    netconsole messages to trigger a poll
+    """
+
+    ifname = cfg.ifname
+    configure_network(ifname)
+    target_name = netcons_generate_random_target_name()
+    traffic = None
+
+    try:
+        traffic = GenerateTraffic(cfg)
+        do_netpoll_flush_monitored(cfg, ifname, target_name)
+    finally:
+        if traffic:
+            traffic.stop()
+
+        # Revert RX/TX queues
+        netcons_delete_target(target_name)
+
+
+def test_check_dependencies() -> None:
+    """Check if the dependencies are met"""
+    if not os.path.exists(NETCONSOLE_CONFIGFS_PATH):
+        raise KsftSkipEx(
+            f"Directory {NETCONSOLE_CONFIGFS_PATH} does not exist. CONFIG_NETCONSOLE_DYNAMIC might not be set."  # pylint: disable=C0301
+        )
+
+
+def main() -> None:
+    """Main function to run the test"""
+    netcons_load_module()
+    test_check_dependencies()
+    with NetDrvEpEnv(__file__) as cfg:
+        ksft_run(
+            [test_netpoll],
+            args=(cfg,),
+        )
+    ksft_exit()
+
+
+if __name__ == "__main__":
+    main()