#!/usr/bin/python3

import sys, os, types, inspect
from settings import config
from context import test_context
from tool_box import *
from set_up import *
from mount_union import mount_union
from unmount_union import unmount_union
from remount_union import remount_union
from direct import direct_open_file, direct_fs_op

def show_format(why):
    if why:
        print(why)
    print("Format:")
    print("\t", sys.argv[0], "<--no|--ov|--ovov>[=<maxlayers>] [--fuse=<subtype>] [--samefs|--maxfs=<maxfs>|--squashfs|--erofs] [--xdev] [--xino] [--meta] [--verify] [--ts=<0|1>] [-v] [<test-name>+]")
    print("\t", sys.argv[0], "<--no|--ov|--ovov> [--fuse=<subtype>] [--samefs|--squashfs|--erofs] [-s|--set-up]")
    print("\t", sys.argv[0], "[-c|--clean-up]")
    print("\t", sys.argv[0], "--open-file <file> [-acdertvw] [-W <data>] [-R <data>] [-B] [-E <err>]")
    print("\t", sys.argv[0], "--<fsop> <file> [<args>*] [-aLlv] [-R <content>] [-B] [-E <err>]")
    sys.exit(2)

cfg = config(sys.argv[0])

if len(sys.argv) < 2:
    show_format("Insufficient arguments")

args = sys.argv[1:]

###############################################################################
#
# Handle requests to perform single probes
#
###############################################################################
if args[0] == "--open-file":
    try:
        direct_open_file(cfg, args[1:])
    except ArgumentError as ae:
        show_format(str(ae))
    except TestError as te:
        exit_error(str(te))
    sys.exit(0)

if (args[0] == "--chmod" or
    args[0] == "--link" or
    args[0] == "--mkdir" or
    args[0] == "--readlink" or
    args[0] == "--rename" or
    args[0] == "--rmdir" or
    args[0] == "--truncate" or
    args[0] == "--unlink" or
    args[0] == "--utimes"):
    try:
        direct_fs_op(cfg, args)
        sys.exit(0)
    except ArgumentError as ae:
        show_format(str(ae))
    except TestError as te:
        exit_error(str(te))
    sys.exit(0)

###############################################################################
#
# Unmount old attempts
#
###############################################################################
clean_up(cfg)

if (args[0] == "-c" or args[0] == "--clean-up"):
    sys.exit(0)

###############################################################################
#
# Work out the test parameters
#
###############################################################################
recycle_list = [ "" ]
maxlayers = 0
if args[0] == "--no":
    cfg.set_testing_none()
elif args[0].startswith("--ov"):
    cfg.set_testing_overlayfs()
    if args[0].startswith("--ovov"):
        cfg.set_nested()
    s = args[0]
    i = s.rfind("=")
    if i >= 0:
        n = s[i+1:]
        recycle_list = [ n, "" ]
        maxlayers = int(n)
        if maxlayers < 0:
            show_format("Invalid value for maxlayers >= 0")
else:
    show_format("Invalid test type selector (--ov or --no)")
args = args[1:]

if len(args) > 0 and args[0].startswith("--fuse="):
    if not cfg.testing_overlayfs() or cfg.is_nested():
        show_format("--fuse requires --ov")
    s = args[0]
    t = s[s.rfind("=")+1:]
    cfg.set_fusefs(t)
    args = args[1:]

xino = None
index_def = None
index_opt = None
metacopy = None
redirect_dir = None
if cfg.is_fusefs():
    # fuse-overlayfs only supports redirect_dir=off
    # user can disable redirect_dir with --xdev
    redirect_dir = True
elif cfg.testing_overlayfs():
    # Overlayfs feature "redirect_dir" is auto enabled with kernel version >= v4.10,
    # unless explicitly disabled with --xdev or by mount option.  When redirect_dir
    # is disabled, overlayfs tests skip rename tests that would result in EXDEV.
    if "redirect_dir=on" in cfg.mntopts():
        redirect_dir_off = False
    else:
        # userxattr and any redirect= except for redirect=on disables directory rename
        redirect_dir_off = any(x in cfg.mntopts() for x in ["redirect_dir", "userxattr"])
    if not redirect_dir_off:
        redirect_dir = check_bool_modparam("redirect_dir")
    # Overlayfs feature "index" can be enabled with --verify or by mount option on kernel version >= v4.13.
    # When index is disabled some verifications in multi layer tests may fail due to broken hardlinks.
    # When index is enabled we set_verify(), because all verifications are expected to pass.
    index_def = check_bool_modparam("index")
    if not index_def is None:
        index_opt = check_bool_mntopt("index", cfg.mntopts(), None, onopt2="nfs_export=on")
    if index_def or index_opt:
        cfg.set_verify()
    # Overlayfs feature "xino" can be enabled with --xino or by mount option on kernel version >= v4.17.
    # When xino is enabled we set_xino(), so st_ino/st_dev verifications will work correctly.
    xino_def = check_bool_modparam("xino_auto")
    if not xino_def is None:
        xino = check_bool_mntopt("xino", cfg.mntopts(), xino_def, onopt2="xino=auto")
    if xino:
        cfg.set_xino()
    # Overlayfs feature "metacopy" can be enabled with --meta and by mount option on kernel version >= v4.19.
    # When metacopy is enabled we set_metacopy(), so copy up verifications will work correctly.
    metacopy_def = check_bool_modparam("metacopy")
    if not metacopy_def is None:
        metacopy = check_bool_mntopt("metacopy", cfg.mntopts(), metacopy_def, offopt2="nfs_export=on")
    if metacopy:
        cfg.set_metacopy()

maxfs = cfg.maxfs()
if len(args) > 0 and (args[0] == "--samefs" or args[0] == "--squashfs" or args[0] == "--erofs" or args[0].startswith("--maxfs=")):
    if args[0] == "--samefs":
        # maxfs < 0 means samefs
        maxfs = -1
        if cfg.env_lower_mntroot():
            show_format("--samefs is incompatible with UNIONMOUNT_LOWERDIR environment variable")
    elif args[0] == "--squashfs":
        # maxfs 0 means one upper fs and one lower fs
        maxfs = 0
        cfg.set_squashfs()
    elif args[0] == "--erofs":
        # maxfs 0 means one upper fs and one lower fs
        maxfs = 0
        cfg.set_erofs()
    elif args[0].startswith("--maxfs="):
        s = args[0]
        n = s[s.rfind("=")+1:]
        maxfs = int(n)
        if maxfs < 0:
            show_format("Invalid value for maxfs >= 0")
    args = args[1:]
cfg.set_maxfs(maxfs)

# We might be being asked to just set up and then leave without doing
# any further testing.
if len(args) > 0 and ( args[0] == "-s" or args[0] == "--set-up" ):
    if len(args) > 1:
        show_format("Too many arguments for --set-up")
    ctx = test_context(cfg)
    set_up(ctx)
    mount_union(ctx)
    sys.exit(0)

termslash = "0"
while len(args) > 0 and args[0].startswith("-"):
    if args[0] == "-v":
        cfg.set_verbose()
    elif args[0] == "--ts=0":
        termslash = "0"
    elif args[0] == "--ts=1":
        termslash = "1"
    elif args[0] == "--xdev":
        # Disable auto-enable of "redirect_dir" and skip dir rename tests
        redirect_dir = None
    elif args[0] == "--xino":
        if xino is None:
            print("xino not supported - ignoring --xino")
        else:
            xino = True
            cfg.set_xino()
    elif args[0] == "--meta":
        if metacopy is None:
            print("metacopy not supported - ignoring --meta")
        else:
            cfg.add_mntopt("metacopy=on")
            cfg.set_metacopy()
    elif args[0] == "--verify":
        cfg.set_verify()
        # auto enable index for --verify unless explicitly disabled
        if index_def is False and not index_opt is False:
            cfg.add_mntopt("index=on")
    else:
        show_format("Invalid flag " + args[0])
    args = args[1:]

# Auto-enable redired_dir unless explicitly disabled by --xdev or mount options
if redirect_dir is False:
    cfg.add_mntopt("redirect_dir=on")
    redirect_dir = True

# Auto-upgrade xino=auto to xino=on for kernel < v5.7
if xino:
    cfg.add_mntopt("xino=on")

###############################################################################
#
# Work out the list of tests to run
#
###############################################################################
tests = [
    "open-plain",
    "open-trunc",
    "open-creat",
    "open-creat-trunc",
    "open-creat-excl",
    "open-creat-excl-trunc",
    "noent-plain",
    "noent-trunc",
    "noent-creat",
    "noent-creat-trunc",
    "noent-creat-excl",
    "noent-creat-excl-trunc",
    "sym1-plain",
    "sym1-trunc",
    "sym1-creat",
    "sym1-creat-excl",
    "sym2-plain",
    "sym2-trunc",
    "sym2-creat",
    "sym2-creat-excl",
    "symx-plain",
    "symx-trunc",
    "symx-creat",
    "symx-creat-excl",
    "symx-creat-trunc",
    "truncate",
    "dir-open",
    "dir-weird-open",
    "dir-open-dir",
    "dir-sym1-open",
    "dir-sym1-weird-open",
    "dir-sym2-open",
    "dir-sym2-weird-open",
    "readlink",
    "mkdir",
    "rmdir",
    "rmtree",
    "rmtree-new",
    "hard-link",
    "hard-link-dir",
    "hard-link-sym",
    "unlink",
    "rename-file",
    "rename-hard-link",
    "rename-new-dir",
    "rename-new-pop-dir",
    "rename-mass",
    "rename-mass-2",
    "rename-mass-3",
    "rename-mass-4",
    "rename-mass-5",
    "rename-mass-dir",
    "rename-mass-sym",
    "impermissible"
    ]

# For backward compat with kernel version < v4.10 or with flag --xdev,
# run an alternative test to verify EXDEV behavior. Otherwise, run all
# directory rename tests.
if cfg.testing_overlayfs() and not redirect_dir:
    tests += [
        "rename-exdev",
        ]
else:
    tests += [
        "rename-dir",
        "rename-empty-dir",
        "rename-pop-dir",
        "rename-move-dir",
        ]

if len(args) > 0:
    tests = args

if cfg.testing_overlayfs():
    if cfg.is_nested():
        test_what = "--ovov"
    else:
        test_what = "--ov"
else:
    test_what = "--no"

for test in tests:
    for recycle in recycle_list:
        test_how = test_what
        if recycle != "":
            test_how = test_what + "=" + str(maxlayers)
        if cfg.testing_overlayfs():
            if maxfs > 0:
                test_how += " --maxfs=" + str(maxfs)
            elif maxfs < 0:
                test_how += " --samefs"
            if not redirect_dir:
                test_how += " --xdev"
            if cfg.is_xino():
                test_how += " --xino"
            if cfg.is_metacopy():
                test_how += " --meta"
            if cfg.is_verify():
                test_how += " --verify"
        msg = cfg.progname() + " " + test_how + " " + test
        print("***");
        print("***", msg);
        print("***");
        if cfg.is_verbose():
            write_kmsg(msg);

        ctx = test_context(cfg, termslash == "1", False, recycle != "", maxlayers)

        # Construct the union
        set_up(ctx)
        mount_union(ctx)
        os.sync()

        # Run a test script
        script = __import__("tests." + test, globals(), locals(), ['subtests'])
        try:
            subtests = []
            for name in dir(script):
                if not name.startswith("subtest_"):
                    continue
                subtest = getattr(script, name)
                if type(subtest) != types.FunctionType:
                    continue
                subtests.append((inspect.getfile(subtest),
                                 inspect.getsourcelines(subtest)[1],
                                 subtest.__doc__,
                                 subtest))

            # Perform the subtests in the order they're defined in the file
            for (f, line, doc, func) in sorted(subtests, key=lambda s: s[1]):
                ctx.begin_test(f, line, doc)
                func(ctx)

        except TestError as te:
            exit_error(str(te))

        # Stop if the kernel is now tainted
        check_not_tainted()

        # Make sure that all dentries and inodes are correctly released
        unmount_union(ctx)
        del ctx

# Leave the union mounted for further playing
ctx = test_context(cfg)
set_up(ctx)
mount_union(ctx)
