#! /usr/bin/env python3
#
# Copyright (C) 2024 BitBake Contributors
#
# SPDX-License-Identifier: GPL-2.0-only
#

from . import create_server, create_client, increase_revision, revision_greater, revision_smaller, _revision_greater_or_equal
import prserv.db as db
from bb.asyncrpc import InvokeError
import logging
import os
import sys
import tempfile
import unittest
import socket
import subprocess
from pathlib import Path

THIS_DIR = Path(__file__).parent
BIN_DIR = THIS_DIR.parent.parent / "bin"

version = "dummy-1.0-r0"
pkgarch = "core2-64"
other_arch = "aarch64"

checksumX = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4f0"
checksum0 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a0"
checksum1 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a1"
checksum2 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a2"
checksum3 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a3"
checksum4 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a4"
checksum5 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a5"
checksum6 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a6"
checksum7 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a7"
checksum8 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a8"
checksum9 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4a9"
checksum10 = "51bf8189dbe9ea81fa6dd89608bf19380c437a9cf12f6c6239887801ba4ab4aa"

def server_prefunc(server, name):
    logging.basicConfig(level=logging.DEBUG, filename='prserv-%s.log' % name, filemode='w',
                        format='%(levelname)s %(filename)s:%(lineno)d %(message)s')
    server.logger.debug("Running server %s" % name)
    sys.stdout = open('prserv-stdout-%s.log' % name, 'w')
    sys.stderr = sys.stdout

class PRTestSetup(object):

    def start_server(self, name, dbfile, upstream=None, read_only=False, prefunc=server_prefunc):

        def cleanup_server(server):
            if server.process.exitcode is not None:
                return
            server.process.terminate()
            server.process.join()

        server = create_server(socket.gethostbyname("localhost") + ":0",
                               dbfile,
                               upstream=upstream,
                               read_only=read_only)

        server.serve_as_process(prefunc=prefunc, args=(name,))
        self.addCleanup(cleanup_server, server)

        return server

    def start_client(self, server_address):
        def cleanup_client(client):
            client.close()

        client = create_client(server_address)
        self.addCleanup(cleanup_client, client)

        return client

class FunctionTests(unittest.TestCase):

    def setUp(self):
        self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-prserv')
        self.addCleanup(self.temp_dir.cleanup)

    def test_increase_revision(self):
        self.assertEqual(increase_revision("1"), "2")
        self.assertEqual(increase_revision("1.0"), "1.1")
        self.assertEqual(increase_revision("1.1.1"), "1.1.2")
        self.assertEqual(increase_revision("1.1.1.3"), "1.1.1.4")
        self.assertEqual(increase_revision("9"), "10")
        self.assertEqual(increase_revision("1.9"), "1.10")
        self.assertRaises(ValueError, increase_revision, "1.a")
        self.assertRaises(ValueError, increase_revision, "1.")
        self.assertRaises(ValueError, increase_revision, "")

    def test_revision_greater_or_equal(self):
        self.assertTrue(_revision_greater_or_equal("2", "2"))
        self.assertTrue(_revision_greater_or_equal("2", "1"))
        self.assertTrue(_revision_greater_or_equal("10", "2"))
        self.assertTrue(_revision_greater_or_equal("1.10", "1.2"))
        self.assertFalse(_revision_greater_or_equal("1.2", "1.10"))
        self.assertTrue(_revision_greater_or_equal("1.10", "1"))
        self.assertTrue(_revision_greater_or_equal("1.10.1", "1.10"))
        self.assertFalse(_revision_greater_or_equal("1.10.1", "1.10.2"))
        self.assertTrue(_revision_greater_or_equal("1.10.1", "1.10.1"))
        self.assertTrue(_revision_greater_or_equal("1.10.1", "1"))
        self.assertTrue(revision_greater("1.20", "1.3"))
        self.assertTrue(revision_smaller("1.3", "1.20"))

    # DB tests

    def test_db(self):
        dbfile = os.path.join(self.temp_dir.name, "testtable.sqlite3")

        self.db = db.PRData(dbfile)
        self.table = self.db["PRMAIN"]

        self.table.store_value(version, pkgarch, checksum0, "0")
        self.table.store_value(version, pkgarch, checksum1, "1")
        # "No history" mode supports multiple PRs for the same checksum
        self.table.store_value(version, pkgarch, checksum0, "2")
        self.table.store_value(version, pkgarch, checksum2, "1.0")

        self.assertTrue(self.table.test_package(version, pkgarch))
        self.assertFalse(self.table.test_package(version, other_arch))

        self.assertTrue(self.table.test_value(version, pkgarch, "0"))
        self.assertTrue(self.table.test_value(version, pkgarch, "1"))
        self.assertTrue(self.table.test_value(version, pkgarch, "2"))

        self.assertEqual(self.table.find_package_max_value(version, pkgarch), "2")

        self.assertEqual(self.table.find_min_value(version, pkgarch, checksum0), "0")
        self.assertEqual(self.table.find_max_value(version, pkgarch, checksum0), "2")

        # Test history modes
        self.assertEqual(self.table.find_value(version, pkgarch, checksum0, True), "0")
        self.assertEqual(self.table.find_value(version, pkgarch, checksum0, False), "2")

        self.assertEqual(self.table.find_new_subvalue(version, pkgarch, "3"), "3.0")
        self.assertEqual(self.table.find_new_subvalue(version, pkgarch, "1"), "1.1")

        # Revision comparison tests
        self.table.store_value(version, pkgarch, checksum1, "1.3")
        self.table.store_value(version, pkgarch, checksum1, "1.20")
        self.assertEqual(self.table.find_min_value(version, pkgarch, checksum1), "1")
        self.assertEqual(self.table.find_max_value(version, pkgarch, checksum1), "1.20")

class PRBasicTests(PRTestSetup, unittest.TestCase):

    def setUp(self):
        self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-prserv')
        self.addCleanup(self.temp_dir.cleanup)

        dbfile = os.path.join(self.temp_dir.name, "prtest-basic.sqlite3")

        self.server1 = self.start_server("basic", dbfile)
        self.client1 = self.start_client(self.server1.address)

    def test_basic(self):

        # Checks on non existing configuration

        result = self.client1.test_pr(version, pkgarch, checksum0)
        self.assertIsNone(result, "test_pr should return 'None' for a non existing PR")

        result = self.client1.test_package(version, pkgarch)
        self.assertFalse(result, "test_package should return 'False' for a non existing PR")

        result = self.client1.max_package_pr(version, pkgarch)
        self.assertIsNone(result, "max_package_pr should return 'None' for a non existing PR")

        # Add a first configuration

        result = self.client1.getPR(version, pkgarch, checksum0)
        self.assertEqual(result, "0", "getPR: initial PR of a package should be '0'")

        result = self.client1.test_pr(version, pkgarch, checksum0)
        self.assertEqual(result, "0", "test_pr should return '0' here, matching the result of getPR")

        result = self.client1.test_package(version, pkgarch)
        self.assertTrue(result, "test_package should return 'True' for an existing PR")

        result = self.client1.max_package_pr(version, pkgarch)
        self.assertEqual(result, "0", "max_package_pr should return '0' in the current test series")

        # Check that the same request gets the same value

        result = self.client1.getPR(version, pkgarch, checksum0)
        self.assertEqual(result, "0", "getPR: asking for the same PR a second time in a row should return the same value.")

        # Add new configurations

        result = self.client1.getPR(version, pkgarch, checksum1)
        self.assertEqual(result, "1", "getPR: second PR of a package should be '1'")

        result = self.client1.test_pr(version, pkgarch, checksum1)
        self.assertEqual(result, "1", "test_pr should return '1' here, matching the result of getPR")

        result = self.client1.max_package_pr(version, pkgarch)
        self.assertEqual(result, "1", "max_package_pr should return '1' in the current test series")

        result = self.client1.getPR(version, pkgarch, checksum2)
        self.assertEqual(result, "2", "getPR: second PR of a package should be '2'")

        result = self.client1.test_pr(version, pkgarch, checksum2)
        self.assertEqual(result, "2", "test_pr should return '2' here, matching the result of getPR")

        result = self.client1.max_package_pr(version, pkgarch)
        self.assertEqual(result, "2", "max_package_pr should return '2' in the current test series")

        result = self.client1.getPR(version, pkgarch, checksum3)
        self.assertEqual(result, "3", "getPR: second PR of a package should be '3'")

        result = self.client1.test_pr(version, pkgarch, checksum3)
        self.assertEqual(result, "3", "test_pr should return '3' here, matching the result of getPR")

        result = self.client1.max_package_pr(version, pkgarch)
        self.assertEqual(result, "3", "max_package_pr should return '3' in the current test series")

        # Ask again for the first configuration

        result = self.client1.getPR(version, pkgarch, checksum0)
        self.assertEqual(result, "4", "getPR: should return '4' in this configuration")

        # Ask again with explicit "no history" mode

        result = self.client1.getPR(version, pkgarch, checksum0, False)
        self.assertEqual(result, "4", "getPR: should return '4' in this configuration")

        # Ask again with explicit "history" mode. This should return the first recorded PR for checksum0

        result = self.client1.getPR(version, pkgarch, checksum0, True)
        self.assertEqual(result, "0", "getPR: should return '0' in this configuration")

        # Check again that another pkgarg resets the counters

        result = self.client1.test_pr(version, other_arch, checksum0)
        self.assertIsNone(result, "test_pr should return 'None' for a non existing PR")

        result = self.client1.test_package(version, other_arch)
        self.assertFalse(result, "test_package should return 'False' for a non existing PR")

        result = self.client1.max_package_pr(version, other_arch)
        self.assertIsNone(result, "max_package_pr should return 'None' for a non existing PR")

        # Now add the configuration

        result = self.client1.getPR(version, other_arch, checksum0)
        self.assertEqual(result, "0", "getPR: initial PR of a package should be '0'")

        result = self.client1.test_pr(version, other_arch, checksum0)
        self.assertEqual(result, "0", "test_pr should return '0' here, matching the result of getPR")

        result = self.client1.test_package(version, other_arch)
        self.assertTrue(result, "test_package should return 'True' for an existing PR")

        result = self.client1.max_package_pr(version, other_arch)
        self.assertEqual(result, "0", "max_package_pr should return '0' in the current test series")

        result = self.client1.is_readonly()
        self.assertFalse(result, "Server should not be described as 'read-only'")

class PRUpstreamTests(PRTestSetup, unittest.TestCase):

    def setUp(self):

        self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-prserv')
        self.addCleanup(self.temp_dir.cleanup)

        dbfile2 = os.path.join(self.temp_dir.name, "prtest-upstream2.sqlite3")
        self.server2 = self.start_server("upstream2", dbfile2)
        self.client2 = self.start_client(self.server2.address)

        dbfile1 = os.path.join(self.temp_dir.name, "prtest-upstream1.sqlite3")
        self.server1 = self.start_server("upstream1", dbfile1, upstream=self.server2.address)
        self.client1 = self.start_client(self.server1.address)

        dbfile0 = os.path.join(self.temp_dir.name, "prtest-local.sqlite3")
        self.server0 = self.start_server("local", dbfile0, upstream=self.server1.address)
        self.client0 = self.start_client(self.server0.address)
        self.shared_db = dbfile0

    def test_upstream_and_readonly(self):

        # For identical checksums, all servers should return the same PR

        result = self.client2.getPR(version, pkgarch, checksum0)
        self.assertEqual(result, "0", "getPR: initial PR of a package should be '0'")

        result = self.client1.getPR(version, pkgarch, checksum0)
        self.assertEqual(result, "0", "getPR: initial PR of a package should be '0' (same as upstream)")

        result = self.client0.getPR(version, pkgarch, checksum0)
        self.assertEqual(result, "0", "getPR: initial PR of a package should be '0' (same as upstream)")

        # Now introduce new checksums on server1 for, same version

        result = self.client1.getPR(version, pkgarch, checksum1)
        self.assertEqual(result, "0.0", "getPR: first PR of a package which has a different checksum upstream should be '0.0'")

        result = self.client1.getPR(version, pkgarch, checksum2)
        self.assertEqual(result, "0.1", "getPR: second PR of a package that has a different checksum upstream should be '0.1'")

        # Now introduce checksums on server0 for, same version

        result = self.client1.getPR(version, pkgarch, checksum1)
        self.assertEqual(result, "0.2", "getPR: can't decrease for known PR")

        result = self.client1.getPR(version, pkgarch, checksum2)
        self.assertEqual(result, "0.3")

        result = self.client1.max_package_pr(version, pkgarch)
        self.assertEqual(result, "0.3")

        result = self.client0.getPR(version, pkgarch, checksum3)
        self.assertEqual(result, "0.3.0", "getPR: first PR of a package that doesn't exist upstream should be '0.3.0'")

        result = self.client0.getPR(version, pkgarch, checksum4)
        self.assertEqual(result, "0.3.1", "getPR: second PR of a package that doesn't exist upstream should be '0.3.1'")

        result = self.client0.getPR(version, pkgarch, checksum3)
        self.assertEqual(result, "0.3.2")

        # More upstream updates
        # Here, we assume no communication between server2 and server0. server2 only impacts server0
        # after impacting server1

        self.assertEqual(self.client2.getPR(version, pkgarch, checksum5), "1")
        self.assertEqual(self.client1.getPR(version, pkgarch, checksum6), "1.0")
        self.assertEqual(self.client1.getPR(version, pkgarch, checksum7), "1.1")
        self.assertEqual(self.client0.getPR(version, pkgarch, checksum8), "1.1.0")
        self.assertEqual(self.client0.getPR(version, pkgarch, checksum9), "1.1.1")

        # "history" mode tests

        self.assertEqual(self.client2.getPR(version, pkgarch, checksum0, True), "0")
        self.assertEqual(self.client1.getPR(version, pkgarch, checksum2, True), "0.1")
        self.assertEqual(self.client0.getPR(version, pkgarch, checksum3, True), "0.3.0")

        # More "no history" mode tests

        self.assertEqual(self.client2.getPR(version, pkgarch, checksum0), "2")
        self.assertEqual(self.client1.getPR(version, pkgarch, checksum0), "2") # Same as upstream
        self.assertEqual(self.client0.getPR(version, pkgarch, checksum0), "2") # Same as upstream
        self.assertEqual(self.client1.getPR(version, pkgarch, checksum7), "3") # This could be surprising, but since the previous revision was "2", increasing it yields "3".
                                                                               # We don't know how many upstream servers we have
        # Start read-only server with server1 as upstream
        self.server_ro = self.start_server("local-ro", self.shared_db, upstream=self.server1.address, read_only=True)
        self.client_ro = self.start_client(self.server_ro.address)

        self.assertTrue(self.client_ro.is_readonly(), "Database should be described as 'read-only'")

        # Checks on non existing configurations
        self.assertIsNone(self.client_ro.test_pr(version, pkgarch, checksumX))
        self.assertFalse(self.client_ro.test_package("unknown", pkgarch))

        # Look up existing configurations
        self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum0), "3") # "no history" mode
        self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum0, True), "0") # "history" mode
        self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum3), "3")
        self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum3, True), "0.3.0")
        self.assertEqual(self.client_ro.max_package_pr(version, pkgarch), "2") # normal as "3" was never saved

        # Try to insert a new value. Here this one is know upstream.
        self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum7), "3")
        # Try to insert a completely new value. As the max upstream value is already "3", it should be "3.0"
        self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum10), "3.0")
        # Same with another value which only exists in the upstream upstream server
        # This time, as the upstream server doesn't know it, it will ask its upstream server. So that's a known one.
        self.assertEqual(self.client_ro.getPR(version, pkgarch, checksum9), "3")

class ScriptTests(unittest.TestCase):

    def setUp(self):

        self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-prserv')
        self.addCleanup(self.temp_dir.cleanup)
        self.dbfile = os.path.join(self.temp_dir.name, "prtest.sqlite3")

    def test_1_start_bitbake_prserv(self):
        try:
            subprocess.check_call([BIN_DIR / "bitbake-prserv", "--start", "-f", self.dbfile])
        except subprocess.CalledProcessError as e:
            self.fail("Failed to start bitbake-prserv: %s" % e.returncode)

    def test_2_stop_bitbake_prserv(self):
        try:
            subprocess.check_call([BIN_DIR / "bitbake-prserv", "--stop"])
        except subprocess.CalledProcessError as e:
            self.fail("Failed to stop bitbake-prserv: %s" % e.returncode)
