def do_testimage(d):
    testimage_main(d)

do_testimage(d)

def testimage_main(d):
    import os
    import json
    import signal
    import logging
    import shutil

    from bb.utils import export_proxies
    from oeqa.runtime.context import OERuntimeTestContext
    from oeqa.runtime.context import OERuntimeTestContextExecutor
    from oeqa.core.target.qemu import supported_fstypes
    from oeqa.core.utils.test import getSuiteCases
    from oeqa.utils import make_logger_bitbake_compatible
    from oeqa.utils import get_json_result_dir
    from oeqa.utils.postactions import run_failed_tests_post_actions

    def sigterm_exception(signum, stackframe):
        """
        Catch SIGTERM from worker in order to stop qemu.
        """
        os.kill(os.getpid(), signal.SIGINT)

    def handle_test_timeout(timeout):
        bb.warn("Global test timeout reached (%s seconds), stopping the tests." %(timeout))
        os.kill(os.getpid(), signal.SIGINT)

    testimage_sanity(d)

    if (d.getVar('IMAGE_PKGTYPE') == 'rpm'
       and ('dnf' in d.getVar('TEST_SUITES') or 'auto' in d.getVar('TEST_SUITES'))):
        create_rpm_index(d)

    logger = make_logger_bitbake_compatible(logging.getLogger("BitBake"))
    pn = d.getVar("PN")

    bb.utils.mkdirhier(d.getVar("TEST_LOG_DIR"))

    image_name = ("%s/%s" % (d.getVar('DEPLOY_DIR_IMAGE'),
                             d.getVar('IMAGE_LINK_NAME') or d.getVar('IMAGE_NAME')))

    tdname = "%s.testdata.json" % image_name
    try:
        with open(tdname, "r") as f:
            td = json.load(f)
    except FileNotFoundError as err:
        bb.fatal('File %s not found (%s).\nHave you built the image with IMAGE_CLASSES += "testimage" in the conf/local.conf?' % (tdname, err))

    # Some variables need to be updates (mostly paths) with the
    # ones of the current environment because some tests require them.
    for var in d.getVar('TESTIMAGE_UPDATE_VARS').split():
        td[var] = d.getVar(var)
    td['ORIGPATH'] = d.getVar("BB_ORIGENV").getVar("PATH")

    image_manifest = "%s.manifest" % image_name
    image_packages = OERuntimeTestContextExecutor.readPackagesManifest(image_manifest)

    extract_dir = d.getVar("TEST_EXTRACTED_DIR")

    # Get machine
    machine = d.getVar("MACHINE")

    # Get rootfs
    fstypes = d.getVar('IMAGE_FSTYPES').split()
    if d.getVar("TEST_TARGET") == "qemu":
        fstypes = [fs for fs in fstypes if fs in supported_fstypes]
        if not fstypes:
            bb.fatal('Unsupported image type built. Add a compatible image to '
                     'IMAGE_FSTYPES. Supported types: %s' %
                     ', '.join(supported_fstypes))
    elif d.getVar("TEST_TARGET") == "serial":
        bb.fatal('Serial target is currently only supported in testexport.')
    qfstype = fstypes[0]
    qdeffstype = d.getVar("QB_DEFAULT_FSTYPE")
    if qdeffstype:
        qfstype = qdeffstype
    rootfs = '%s.%s' % (image_name, qfstype)

    # Get tmpdir (not really used, just for compatibility)
    tmpdir = d.getVar("TMPDIR")

    # Get deploy_dir_image (not really used, just for compatibility)
    dir_image = d.getVar("DEPLOY_DIR_IMAGE")

    # Get bootlog
    bootlog = os.path.join(d.getVar("TEST_LOG_DIR"),
                           'qemu_boot_log.%s' % d.getVar('DATETIME'))

    # Get display
    display = d.getVar("BB_ORIGENV").getVar("DISPLAY")

    # Get kernel
    kernel_name = ('%s-%s.bin' % (d.getVar("KERNEL_IMAGETYPE"), machine))
    kernel = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), kernel_name)

    # Get boottime
    boottime = int(d.getVar("TEST_QEMUBOOT_TIMEOUT"))

    # Get use_kvm
    kvm = oe.types.qemu_use_kvm(d.getVar('QEMU_USE_KVM'), d.getVar('TARGET_ARCH'))

    # Get OVMF
    ovmf = d.getVar("QEMU_USE_OVMF")

    slirp = False
    if bb.utils.contains('TEST_RUNQEMUPARAMS', 'slirp', True, False, d):
        slirp = True

    # TODO: We use the current implementation of qemu runner because of
    # time constrains, qemu runner really needs a refactor too.
    target_kwargs = { 'machine'     : machine,
                      'rootfs'      : rootfs,
                      'tmpdir'      : tmpdir,
                      'dir_image'   : dir_image,
                      'display'     : display,
                      'kernel'      : kernel,
                      'boottime'    : boottime,
                      'bootlog'     : bootlog,
                      'kvm'         : kvm,
                      'slirp'       : slirp,
                      'dump_dir'    : d.getVar("TESTIMAGE_DUMP_DIR"),
                      'serial_ports': len(d.getVar("SERIAL_CONSOLES").split()),
                      'ovmf'        : ovmf,
                      'tmpfsdir'    : d.getVar("RUNQEMU_TMPFS_DIR"),
                    }

    if d.getVar("TESTIMAGE_BOOT_PATTERNS"):
        target_kwargs['boot_patterns'] = get_testimage_boot_patterns(d)

    # hardware controlled targets might need further access
    target_kwargs['powercontrol_cmd'] = d.getVar("TEST_POWERCONTROL_CMD") or None
    target_kwargs['powercontrol_extra_args'] = d.getVar("TEST_POWERCONTROL_EXTRA_ARGS") or ""
    target_kwargs['serialcontrol_cmd'] = d.getVar("TEST_SERIALCONTROL_CMD") or None
    target_kwargs['serialcontrol_extra_args'] = d.getVar("TEST_SERIALCONTROL_EXTRA_ARGS") or ""
    target_kwargs['testimage_dump_monitor'] = d.getVar("testimage_dump_monitor") or ""

    def export_ssh_agent(d):
        import os

        variables = ['SSH_AGENT_PID', 'SSH_AUTH_SOCK']
        for v in variables:
            if v not in os.environ.keys():
                val = d.getVar(v)
                if val is not None:
                    os.environ[v] = val

    export_ssh_agent(d)

    # runtime use network for download projects for build
    export_proxies(d)

    if slirp:
        # Default to 127.0.0.1 and let the runner identify the port forwarding
        # (as OEQemuTarget does), but allow overriding.
        target_ip = d.getVar("TEST_TARGET_IP") or "127.0.0.1"
        # Default to 10.0.2.2 as this is the IP that the guest has with the
        # default qemu slirp networking configuration, but allow overriding.
        server_ip = d.getVar("TEST_SERVER_IP") or "10.0.2.2"
    else:
        target_ip = d.getVar("TEST_TARGET_IP")
        server_ip = d.getVar("TEST_SERVER_IP")

    # the robot dance
    target = OERuntimeTestContextExecutor.getTarget(
        d.getVar("TEST_TARGET"), logger, target_ip,
        server_ip, **target_kwargs)

    # test context
    tc = OERuntimeTestContext(td, logger, target, image_packages, extract_dir)

    # Load tests before starting the target
    test_paths = get_runtime_paths(d)
    test_modules = d.getVar('TEST_SUITES').split()
    if not test_modules:
        bb.fatal('Empty test suite, please verify TEST_SUITES variable')

    tc.loadTests(test_paths, modules=test_modules)

    suitecases = getSuiteCases(tc.suites)
    if not suitecases:
        bb.fatal('Empty test suite, please verify TEST_SUITES variable')
    else:
        bb.debug(2, 'test suites:\n\t%s' % '\n\t'.join([str(c) for c in suitecases]))

    package_extraction(d, tc.suites)

    results = None
    complete = False
    orig_sigterm_handler = signal.signal(signal.SIGTERM, sigterm_exception)
    try:
        # We need to check if runqemu ends unexpectedly
        # or if the worker send us a SIGTERM
        tc.target.start(params=d.getVar("TEST_QEMUPARAMS"), runqemuparams=d.getVar("TEST_RUNQEMUPARAMS"))
        import threading
        try:
            threading.Timer(int(d.getVar("TEST_OVERALL_TIMEOUT")), handle_test_timeout, (int(d.getVar("TEST_OVERALL_TIMEOUT")),)).start()
        except ValueError:
            pass
        results = tc.runTests()
        complete = True
        if results.hasAnyFailingTest():
            run_failed_tests_post_actions(d, tc)
    except (KeyboardInterrupt, BlockingIOError) as err:
        if isinstance(err, KeyboardInterrupt):
            bb.error('testimage interrupted, shutting down...')
        else:
            bb.error('runqemu failed, shutting down...')
        if results:
            results.stop()
        results = tc.results
    finally:
        signal.signal(signal.SIGTERM, orig_sigterm_handler)
        tc.target.stop()

    # Show results (if we have them)
    if results:
        configuration = get_testimage_configuration(d, 'runtime', machine)
        results.logDetails(get_json_result_dir(d),
                        configuration,
                        get_testimage_result_id(configuration),
                        dump_streams=d.getVar('TESTREPORT_FULLLOGS'))
        results.logSummary(pn)

    # Copy additional logs to tmp/log/oeqa so it's easier to find them
    targetdir = os.path.join(get_json_result_dir(d), d.getVar("PN"))
    os.makedirs(targetdir, exist_ok=True)
    os.symlink(bootlog, os.path.join(targetdir, os.path.basename(bootlog)))
    os.symlink(d.getVar("BB_LOGFILE"), os.path.join(targetdir, os.path.basename(d.getVar("BB_LOGFILE") + "." + d.getVar('DATETIME'))))

    if not results or not complete:
        bb.fatal('%s - FAILED - tests were interrupted during execution, check the logs in %s' % (pn, d.getVar("LOG_DIR")), forcelog=True)
    if not results.wasSuccessful():
        bb.fatal('%s - FAILED - also check the logs in %s' % (pn, d.getVar("LOG_DIR")), forcelog=True)

def get_testimage_boot_patterns(d):
    from collections import defaultdict
    boot_patterns = defaultdict(str)
    # Only accept certain values
    accepted_patterns = ['search_reached_prompt', 'send_login_user', 'search_login_succeeded', 'search_cmd_finished']
    # Not all patterns need to be overriden, e.g. perhaps we only want to change the user
    boot_patterns_flags = d.getVarFlags('TESTIMAGE_BOOT_PATTERNS') or {}
    if boot_patterns_flags:
        patterns_set = [p for p in boot_patterns_flags.items() if p[0] in d.getVar('TESTIMAGE_BOOT_PATTERNS').split()]
        for flag, flagval in patterns_set:
                if flag not in accepted_patterns:
                    bb.fatal('Testimage: The only accepted boot patterns are: search_reached_prompt,send_login_user, \
                    search_login_succeeded,search_cmd_finished\n Make sure your TESTIMAGE_BOOT_PATTERNS=%s \
                    contains an accepted flag.' % d.getVar('TESTIMAGE_BOOT_PATTERNS'))
                    return
                boot_patterns[flag] = flagval.encode().decode('unicode-escape')
    return boot_patterns

def create_rpm_index(d):
    import glob
    # Index RPMs
    rpm_createrepo = bb.utils.which(os.getenv('PATH'), "createrepo_c")
    index_cmds = []
    archs = (d.getVar('ALL_MULTILIB_PACKAGE_ARCHS') or '').replace('-', '_')

    for arch in archs.split():
        rpm_dir = os.path.join(d.getVar('DEPLOY_DIR_RPM'), arch)
        idx_path = os.path.join(d.getVar('WORKDIR'), 'oe-testimage-repo', arch)

        if not os.path.isdir(rpm_dir):
            continue

        lockfilename = os.path.join(d.getVar('DEPLOY_DIR_RPM'), 'rpm.lock')
        lf = bb.utils.lockfile(lockfilename, False)
        oe.path.copyhardlinktree(rpm_dir, idx_path)
        # Full indexes overload a 256MB image so reduce the number of rpms
        # in the feed by filtering to specific packages needed by the tests.
        package_list = glob.glob(idx_path + "*/*.rpm")

        for pkg in package_list:
            if not os.path.basename(pkg).startswith(("dnf-test-", "busybox", "update-alternatives", "libc6", "musl")):
                bb.utils.remove(pkg)

        bb.utils.unlockfile(lf)
        cmd = '%s --update -q %s' % (rpm_createrepo, idx_path)

        # Create repodata
        result = create_index(cmd)
        if result:
            bb.fatal('%s' % ('\n'.join(result)))

def testimage_sanity(d):
    if (d.getVar('TEST_TARGET') == 'simpleremote'
        and (not d.getVar('TEST_TARGET_IP')
             or not d.getVar('TEST_SERVER_IP'))):
        bb.fatal('When TEST_TARGET is set to "simpleremote" '
                 'TEST_TARGET_IP and TEST_SERVER_IP are needed too.')

def get_testimage_configuration(d, test_type, machine):
    import platform
    from oeqa.utils.metadata import get_layers
    configuration = {'TEST_TYPE': test_type,
                    'MACHINE': machine,
                    'DISTRO': d.getVar("DISTRO"),
                    'IMAGE_BASENAME': d.getVar("IMAGE_BASENAME"),
                    'IMAGE_PKGTYPE': d.getVar("IMAGE_PKGTYPE"),
                    'STARTTIME': d.getVar("DATETIME"),
                    'HOST_DISTRO': oe.lsb.distro_identifier().replace(' ', '-'),
                    'LAYERS': get_layers(d.getVar("BBLAYERS"))}
    return configuration

def get_testimage_result_id(configuration):
    return '%s_%s_%s_%s' % (configuration['TEST_TYPE'], configuration['IMAGE_BASENAME'], configuration['MACHINE'], configuration['STARTTIME'])

def package_extraction(d, test_suites):
    from oeqa.utils.package_manager import find_packages_to_extract
    from oeqa.utils.package_manager import extract_packages

    bb.utils.remove(d.getVar("TEST_NEEDED_PACKAGES_DIR"), recurse=True)
    packages = find_packages_to_extract(test_suites)
    if packages:
        bb.utils.mkdirhier(d.getVar("TEST_INSTALL_TMP_DIR"))
        bb.utils.mkdirhier(d.getVar("TEST_PACKAGED_DIR"))
        bb.utils.mkdirhier(d.getVar("TEST_EXTRACTED_DIR"))
        extract_packages(d, packages)

def get_runtime_paths(d):
    """
    Returns a list of paths where runtime test must reside.

    Runtime tests are expected in <LAYER_DIR>/lib/oeqa/runtime/cases/
    """
    paths = []

    for layer in d.getVar('BBLAYERS').split():
        path = os.path.join(layer, 'lib/oeqa/runtime/cases')
        if os.path.isdir(path):
            paths.append(path)
    return paths

def create_index(arg):
    import subprocess

    index_cmd = arg
    try:
        bb.note("Executing '%s' ..." % index_cmd)
        result = subprocess.check_output(index_cmd,
                                        stderr=subprocess.STDOUT,
                                        shell=True)
        result = result.decode('utf-8')
    except subprocess.CalledProcessError as e:
        return("Index creation command '%s' failed with return code "
               '%d:\n%s' % (e.cmd, e.returncode, e.output.decode("utf-8")))
    if result:
        bb.note(result)
    return None

