import ../make-test-python.nix ({ lib, pkgs, ... }:

{
  name = "vector-nginx-clickhouse";
  meta.maintainers = [ pkgs.lib.maintainers.happysalada ];

  nodes = {
    clickhouse = { config, pkgs, ... }: {
      virtualisation.memorySize = 4096;

      # Clickhouse module can't listen on a non-loopback IP.
      networking.firewall.allowedTCPPorts = [ 6000 ];
      services.clickhouse.enable = true;

      # Exercise Vector sink->source for now.
      services.vector = {
        enable = true;

        settings = {
          sources = {
            vector_source = {
              type = "vector";
              address = "[::]:6000";
            };
          };

          sinks = {
            clickhouse = {
              type = "clickhouse";
              inputs = [ "vector_source" ];
              endpoint = "http://localhost:8123";
              database = "nginxdb";
              table = "access_logs";
              skip_unknown_fields = true;
            };
          };
        };
      };
    };

    nginx = { config, pkgs, ... }: {
      services.nginx = {
        enable = true;
        virtualHosts.localhost = {};
      };

      services.vector = {
        enable = true;

        settings = {
          sources = {
            nginx_logs = {
              type = "file";
              include = [ "/var/log/nginx/access.log" ];
              read_from = "end";
            };
          };

          sinks = {
            vector_sink = {
              type = "vector";
              inputs = [ "nginx_logs" ];
              address = "clickhouse:6000";
            };
          };
        };
      };

      systemd.services.vector.serviceConfig = {
        SupplementaryGroups = [ "nginx" ];
      };
    };
  };

  testScript =
  let
    # work around quote/substitution complexity by Nix, Perl, bash and SQL.
    databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS nginxdb";

    tableDDL = pkgs.writeText "table.sql" ''
      CREATE TABLE IF NOT EXISTS  nginxdb.access_logs (
        message String
      )
      ENGINE = MergeTree()
      ORDER BY tuple()
    '';

    # Graciously taken from https://clickhouse.com/docs/en/integrations/vector
    tableView = pkgs.writeText "table-view.sql" ''
      CREATE MATERIALIZED VIEW nginxdb.access_logs_view
      (
        RemoteAddr String,
        Client String,
        RemoteUser String,
        TimeLocal DateTime,
        RequestMethod String,
        Request String,
        HttpVersion String,
        Status Int32,
        BytesSent Int64,
        UserAgent String
      )
      ENGINE = MergeTree()
      ORDER BY RemoteAddr
      POPULATE AS
      WITH
       splitByWhitespace(message) as split,
       splitByRegexp('\S \d+ "([^"]*)"', message) as referer
      SELECT
        split[1] AS RemoteAddr,
        split[2] AS Client,
        split[3] AS RemoteUser,
        parseDateTimeBestEffort(replaceOne(trim(LEADING '[' FROM split[4]), ':', ' ')) AS TimeLocal,
        trim(LEADING '"' FROM split[6]) AS RequestMethod,
        split[7] AS Request,
        trim(TRAILING '"' FROM split[8]) AS HttpVersion,
        split[9] AS Status,
        split[10] AS BytesSent,
        trim(BOTH '"' from referer[2]) AS UserAgent
      FROM
        (SELECT message FROM nginxdb.access_logs)
    '';

    selectQuery = pkgs.writeText "select.sql" "SELECT * from nginxdb.access_logs_view";
  in
  ''
    clickhouse.wait_for_unit("clickhouse")
    clickhouse.wait_for_open_port(8123)

    clickhouse.wait_until_succeeds(
      "journalctl -o cat -u clickhouse.service | grep 'Started ClickHouse server'"
    )

    clickhouse.wait_for_unit("vector")
    clickhouse.wait_for_open_port(6000)

    clickhouse.succeed(
      "cat ${databaseDDL} | clickhouse-client"
    )

    clickhouse.succeed(
      "cat ${tableDDL} | clickhouse-client"
    )

    clickhouse.succeed(
      "cat ${tableView} | clickhouse-client"
    )

    nginx.wait_for_unit("nginx")
    nginx.wait_for_open_port(80)
    nginx.wait_for_unit("vector")
    nginx.wait_until_succeeds(
      "journalctl -o cat -u vector.service | grep 'Starting file server'"
    )

    nginx.succeed("curl http://localhost/")
    nginx.succeed("curl http://localhost/")

    nginx.wait_for_file("/var/log/nginx/access.log")
    nginx.wait_until_succeeds(
      "journalctl -o cat -u vector.service | grep 'Found new file to watch. file=/var/log/nginx/access.log'"
    )

    clickhouse.wait_until_succeeds(
      "cat ${selectQuery} | clickhouse-client | grep 'curl'"
    )
  '';
})