{ lib, ... }:

{
  name = "rtkit";

  meta.maintainers = with lib.maintainers; [ rvl ];

  nodes.machine =
    { config, pkgs, ... }:
    {
      assertions = [
        {
          assertion = config.security.polkit.enable;
          message = "rtkit needs polkit to handle authorization";
        }
      ];

      imports = [ ./common/user-account.nix ];
      services.getty.autologinUser = "alice";

      security.rtkit.enable = true;

      # Modified configuration with higher maximum realtime priority.
      specialisation.withHigherPrio.configuration = {
        security.rtkit.args = [
          "--max-realtime-priority=89"
          "--our-realtime-priority=90"
        ];
      };

      # Target process for testing - belongs to a logind session.
      systemd.user.services.sleeper = {
        description = "Guinea pig service";
        serviceConfig = {
          ExecStart = "@${pkgs.coreutils}/bin/sleep sleep inf";
          # rtkit-daemon won't grant real-time to threads unless they have a rttime limit.
          LimitRTTIME = 200000;
        };
        wantedBy = [ "default.target" ];
      };

      # Target process for testing - doesn't belong to a session.
      systemd.services."sleeper@" = {
        description = "Guinea pig system service for %I";
        serviceConfig = {
          ExecStart = "@${pkgs.coreutils}/bin/sleep sleep inf";
          LimitRTTIME = 200000;
          User = "%I";
        };
      };
      systemd.targets.multi-user.wants = [ "sleeper@alice.service" ];

      # Install chrt to check outcomes of rtkit calls
      environment.systemPackages = [ pkgs.util-linux ];

      # Provide a little logging of polkit checks - otherwise it's
      # impossible to know what's going on.
      security.polkit.debug = true;
      security.polkit.extraConfig = ''
        polkit.addRule(function(action, subject) {
          const ns = "org.freedesktop.RealtimeKit1.";
          const acquireHighPrio = ns + "acquire-high-priority";
          const acquireRT = ns + "acquire-real-time";
          if (action.id == acquireHighPrio || action.id == acquireRT) {
            polkit.log("rtkit: Checking " + action.id + " for " + subject.user + "\n  " + subject);
          }
        });
      '';
    };

  interactive.nodes.machine =
    { pkgs, ... }:
    {
      security.rtkit.args = [ "--debug" ];
      systemd.services.strace-rtkit =
        let
          target = "rtkit-daemon.service";
        in
        {
          bindsTo = [ target ];
          after = [ target ];
          scriptArgs = target;
          script = ''
            pid=$(systemctl show -P MainPID $1)
            strace -tt -s 100 -e trace=all -p $pid
          '';
          path = [ pkgs.strace ];
        };
    };

  testScript =
    { nodes, ... }:
    let
      specialisations = "${nodes.machine.system.build.toplevel}/specialisation";
      uid = toString nodes.machine.users.users.alice.uid;
    in
    ''
      import json
      import shlex
      from collections import namedtuple
      from typing import Any, Optional

      Result = namedtuple("Result", ["command", "machine", "status", "out", "value"])
      Value = namedtuple("Value", ["type", "data"])

      def busctl(node: Machine, *args: Any, user: Optional[str] = None) -> Result:
          command = f"busctl --json=short {shlex.join(map(str, args))}"
          if user is not None:
              command = f"su - {user} -c {shlex.quote(command)}"
          (status, out) = node.execute(command)
          out = out.strip()
          value = json.loads(out, object_hook=lambda x: Value(**x)) if status == 0 and out else None
          return Result(command, node, status, out, value)

      def assert_result_success(result: Result):
          if result.status != 0:
              result.machine.log(f"output: {result.out}")
              raise Exception(f"command `{result.command}` failed (exit code {result.status})")

      def assert_result_fail(result: Result):
          if result.status == 0:
              raise Exception(f"command `{result.command}` unexpectedly succeeded")

      def rtkit_make_process_realtime(node: Machine, pid: int, priority: int, user: Optional[str] = None) -> Result:
          return busctl(node, "call", "org.freedesktop.RealtimeKit1", "/org/freedesktop/RealtimeKit1", "org.freedesktop.RealtimeKit1", "MakeThreadRealtimeWithPID", "ttu", pid, 0, priority, user=user)

      def get_max_realtime_priority() -> int:
          result = busctl(machine, "get-property", "org.freedesktop.RealtimeKit1", "/org/freedesktop/RealtimeKit1", "org.freedesktop.RealtimeKit1", "MaxRealtimePriority")
          assert_result_success(result)
          assert result.value.type == "i", f"""Unexpected MaxRealtimePriority property type ({result.value})"""
          return int(result.value.data)

      def parse_chrt(out: str, field: str) -> str:
          return next(map(lambda l: l.split(": ")[1], filter(lambda l: field in l, out.splitlines())))

      def get_pid(node: Machine, unit: str, user: Optional[str] = None) -> int:
          node.wait_for_unit(unit, user=user)
          (status, out) = node.systemctl(f"show -P MainPID {unit}", user=user)
          if status == 0:
              return int(out.strip())
          else:
              node.log(out)
              raise Exception(f"unable to determine MainPID of {unit} (systemctl exit code {status})")

      def assert_sched(node: Machine, pid: int, policy: Optional[str] = None, priority: Optional[int] = None):
          out = node.succeed(f"chrt -p {pid}")
          node.log(out)
          if policy is not None:
              thread_policy = parse_chrt(out, "policy")
              assert policy in thread_policy, f"Expected {policy} scheduling policy, but got: {thread_policy}"
          if priority is not None:
              thread_priority = parse_chrt(out, "priority")
              assert str(priority) == thread_priority, f"Expected scheduling priority {priority}, but got: {thread_priority}"

      machine.wait_for_unit("basic.target")

      rtprio = 20
      higher_rtprio = 42
      max_rtprio = get_max_realtime_priority()

      with subtest("maximum sched_rr priority"):
          assert max_rtprio >= rtprio, f"""MaxRealtimePriority ({max_rtprio}) too low"""
          assert higher_rtprio > max_rtprio, f"""Test value higher_rtprio ({higher_rtprio}) insufficient compared to MaxRealtimePriority ({max_rtprio})"""

      # wait for autologin and systemd user service manager
      machine.wait_for_unit("multi-user.target")
      machine.wait_for_unit("user@${uid}.service")

      with subtest("polkit sanity check"):
          pid = get_pid(machine, "sleeper.service", user="alice")
          machine.succeed(f"pkcheck -p {pid} -a org.freedesktop.RealtimeKit1.acquire-real-time")

      with subtest("chrt sanity check"):
          print(machine.succeed("chrt --rr --max"))
          pid = get_pid(machine, "sleeper.service", user="alice")
          machine.succeed(f"chrt --rr --pid {rtprio} {pid}")
          assert_sched(machine, pid, policy="SCHED_RR", priority=rtprio)
          machine.stop_job("sleeper.service", user="alice")
          machine.start_job("sleeper.service", user="alice")

      # Permission granted by policy from rtkit package.
      with subtest("local user process can acquire real-time scheduling"):
          pid = get_pid(machine, "sleeper.service", user="alice")
          result = rtkit_make_process_realtime(machine, pid, rtprio, user="alice")
          assert_result_success(result)
          assert_sched(machine, pid, policy="SCHED_RR", priority=rtprio)

      # User must not get higher priority than the maximum
      with subtest("real-time scheduling priority is limited"):
          machine.stop_job("sleeper.service", user="alice")
          machine.start_job("sleeper.service", user="alice")
          pid = get_pid(machine, "sleeper.service", user="alice")
          with machine.nested("rtkit call must fail"):
              result = rtkit_make_process_realtime(machine, pid, max_rtprio + 1, user="alice")
              assert_result_fail(result)
          assert_sched(machine, pid, policy="SCHED_OTHER")

      # This is a local shop for local people - we'll have no trouble here.
      # In this test, the target process belongs to alice, but doesn't
      # have a user session, so it's considered non-local.
      with subtest("non-local user process cannot acquire real-time scheduling"):
          pid = get_pid(machine, "sleeper@alice.service")
          with machine.nested("rtkit call must fail"):
              result = rtkit_make_process_realtime(machine, pid, rtprio, "alice")
              assert_result_fail(result)
          assert_sched(machine, pid, policy="SCHED_OTHER")

      # Switch to alternate configuration then ask for higher priority.
      with subtest("command-line arguments allow increasing maximum rtprio"):
          machine.succeed("${specialisations}/withHigherPrio/bin/switch-to-configuration test")
          pid = get_pid(machine, "sleeper.service", user="alice")
          result = rtkit_make_process_realtime(machine, pid, higher_rtprio, user="alice")
          assert_result_success(result)
          assert_sched(machine, pid, policy="SCHED_RR", priority=higher_rtprio)
    '';
}