Server : LiteSpeed
System : Linux server51.dnsbootclub.com 4.18.0-553.62.1.lve.el8.x86_64 #1 SMP Mon Jul 21 17:50:35 UTC 2025 x86_64
User : nandedex ( 1060)
PHP Version : 8.1.33
Disable Function : NONE
Directory :  /proc/thread-self/root/opt/imunify360/venv/share/imunify360/scripts/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]


Current File : //proc/thread-self/root/opt/imunify360/venv/share/imunify360/scripts/rules_checker.py
"""
Usage /opt/imunify360/venv/bin/python3 /opt/imunify360/venv/share/imunify360/scripts/rules_checker.py <action>
    choose an action from ACTIONS map
    e.g.
    /opt/imunify360/venv/bin/python3 rules_checker.py recreate

    Actions:
        - `recreate` - recreates rules if needed and checks ipsets consistent
        - `clear` - waits RulesChecker stop and destroys all rules and ipsets

    Actions based on lazy_init plugin
        placed in im360.utils.lazy_init
"""

import itertools
import asyncio
import json
from pathlib import Path
import os
import pickle
import sys
import time
import logging
import argparse

from defence360agent.internals import logger as lg
from defence360agent.internals.global_scope import g
from defence360agent.model import instance, tls_check, simplification, infected_domain
from defence360agent.contracts.config import Model, Merger
from im360.contracts.config import IPSET_LISTS_PATH
from im360.files import WHITELISTS, Index
from im360.internals.core import ip_versions
from im360.internals.strategy import Strategy
from im360.model import (
    firewall,
    incident,
    messages_to_send,
    proactive,
)

from im360.utils.lazy_init import (
    RulesChecker,
    RealProtector,
    RULES_CHECK_IN_PROGRESS,
)


logger = logging.getLogger("rules-checker")
# go-resident reads from stdout in case we have errors
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)

STATE = {"last_ipset_check": 0.0}
DAY = 24 * 60 * 60

REAL_PROTECTOR_STATE = Path("/var/imunify360/.realprotector.state")
RULES_CHECKER_STATE = Path("/var/imunify360/.ruleschecker.state")

RULES_CHECKER_EXTERN_STATE = Path(
    "/var/imunify360/.ruleschecker.extern_state.json"
)


class RealProtectorState:
    """RealProtector state to save and restore."""

    def __init__(self, _ws, _pb_dmv, _pbm, _de, _lic):
        self._webshield_status = _ws
        self._port_blocking_deny_mode_values = _pb_dmv
        self._port_blocking_mode = _pbm
        self._dos_enabled = _de
        self.last_ipset_check = _lic

    def __str__(self) -> str:
        return (
            "RealProtectorState("
            f"_webshield_status={self._webshield_status}, "
            f"_port_blocking_mode={self._port_blocking_mode}, "
            "_port_blocking_deny_mode_values"
            f"={self._port_blocking_deny_mode_values},"
            f"last_ipset_check={self.last_ipset_check},"
            f"_dos_enabled={self._dos_enabled})"
        )


class RulesCheckerState:
    def __init__(
        self, interface_conf, ipset_outdated_events, outdated_ipsets, versions
    ):
        self._interface_conf = interface_conf
        self._ipsets_outdated_events = ipset_outdated_events
        self.outdated_ipsets = outdated_ipsets
        self.versions = versions

    def __str__(self) -> str:
        return (
            "RulesCheckerState("
            f"_interface_conf={self._interface_conf}, "
            f"_ipsets_outdated_events={self._ipsets_outdated_events})"
            f"outdated_ipsets={self.outdated_ipsets})"
            f"versions={self.versions})"
        )

    def make_external_state(self) -> dict:
        versions_data = {}
        for version, version_state in self.versions.items():
            versions_data[version] = {
                "transient_error_on_create": version_state.transient_error_on_create,
                "errors": version_state.errors,
                "next_try_time": version_state.next_try_time,
                "running": version_state.running,
            }
        _outdated_ipsets = {
            ver: list(sets) for ver, sets in self.outdated_ipsets.items()
        }
        return {
            "versions": versions_data,
            "outdated_ipsets": _outdated_ipsets,
        }

    def update_from_external_state(self, state: dict):
        if "versions" in state:
            versions_data = state["versions"]
            for version, version_data in versions_data.items():
                if version in self.versions:
                    version_state = self.versions[version]
                    version_state.transient_error_on_create = version_data[
                        "transient_error_on_create"
                    ]
                    version_state.errors = version_data["errors"]
                    version_state.next_try_time = version_data["next_try_time"]
                    version_state.running = version_data["running"]


async def _check_for_config_change(rc: RulesChecker, rp: RealProtector):
    """Checking that config state is consistent with the current state."""
    rp._rules_checker = rc
    await rp._on_config_update_unlocked(None)


async def recreate_rules(rc: RulesChecker, rp: RealProtector, **kwargs):
    """Recreates rules if needed and checks ipsets consistent."""
    logger.info("Checking that need to recreate rules")
    # TODO: check if we need to check it too often
    # for Python implementation we do it only once per day
    if time.time() - STATE["last_ipset_check"] < DAY:
        logger.info("Skip ipsets check")
    else:
        await rc._check_ipsets_consistent()
        STATE["last_ipset_check"] = time.time()
    await rc.recreate_rules_if_needed()
    logger.info("IP sets verification and initialization completed")


async def check_config_update(rc: RulesChecker, rp: RealProtector, **kwargs):
    """Checking config update."""
    logger.info("Checking config update")
    await _check_for_config_change(rc, rp)
    logger.info("Completed")


async def check_global_whitelist_update(
    rc: RulesChecker, rp: RealProtector, **kwargs
):
    """Checking config update."""
    rp._rules_checker = rc
    logger.info("Checking global whitelist update")
    await rp.process_global_whitelist_update()
    logger.info("Completed")


async def check_country_update(rc: RulesChecker, rp: RealProtector, **kwargs):
    """Checking config update."""
    rp._rules_checker = rc
    logger.info("Checking country list update")
    await rp.process_country_list_update()
    logger.info("Completed")


async def recreate_rules_on_strategy_change(
    rc: RulesChecker, rp: RealProtector, **kwargs
):
    """Recreates rules if needed and checks ipsets consistent."""
    logger.info("Checking that need to recreate rules on strategy change")
    await rc.recreate_rules_if_needed()
    logger.info(
        "Firewall rules recreated due to StrategyChange %s", Strategy.current
    )


async def check_ipsets_consistent(
    rc: RulesChecker, rp: RealProtector, check_all=False, **kwargs
):
    """Check ipsets consistent."""
    logger.info("Checking ipsets consistent")
    await rc._check_ipsets_consistent(check_all)
    STATE["last_ipset_check"] = time.time()
    if any(sets for sets in rc.outdated_ipsets.values()):
        await rc.recreate_rules_if_needed()
    logger.info("Completed")


async def _stop_and_wait(rc: RulesChecker):
    rc.should_stop()
    await rc.wait()


async def clear_everything(rc: RulesChecker, rp: RealProtector, **kwargs):
    """Clear rules and ipsets on stop."""
    logger.info("Clear rules and ipsets")
    await _stop_and_wait(rc)
    await rc.clear_everything()
    logger.info("Completed")


async def clear_rules(rc: RulesChecker, rp: RealProtector, **kwargs):
    """Clear rules on stop."""
    logger.info("Clear rules")
    await _stop_and_wait(rc)
    await rc.clear_rules()
    logger.info("Completed")


async def force_recreate_rules_and_refill_ports_if_needed(
    rc: RulesChecker, rp: RealProtector, refill=False, **kwargs
):
    rp._rules_checker = rc
    await rc.recreate_rules_if_needed(recreate_any_way=True)
    logger.info("Firewall rules recreated due to ConfigUpdate")
    if refill and await rp._refill_port_blocking_ipsets():
        logger.info("Blocked ports ipsets reffiled")


async def refill_ports(rc: RulesChecker, rp: RealProtector, **kwargs):
    rp._rules_checker = rc
    if await rp._refill_port_blocking_ipsets():
        logger.info("Blocked ports deny mode updated on ConfigUpdate")


def setup_environment():
    """Setup environment for rules checker."""
    lg.reconfigure()
    ip_versions.init()
    instance.db.init(f"file:{Model.PATH}?mode=ro", uri=True)
    instance.db.execute_sql("ATTACH ? AS resident", (Model.RESIDENT_PATH,))
    instance.db.execute_sql("ATTACH ? AS ipsetlists", (IPSET_LISTS_PATH,))

    models = list(itertools.chain(
        *[
            simplification.get_models(module)
            for module in (
                    simplification,
                    firewall,
                    incident,
                    messages_to_send,
                    proactive,
                    infected_domain,
            )
        ]
    ))

    instance.db.bind(models)

    if os.environ.get("DEBUG") == "true":
        g.DEBUG = True

    Index.add_type(WHITELISTS, "whitelist/v2", 0o770, 0o660, all_zip=True)


def restore_state(rp: RealProtector, rc: RulesChecker):
    """Restore RealProtector state."""
    Strategy.current = Strategy.get()
    try:
        if REAL_PROTECTOR_STATE.exists():
            with REAL_PROTECTOR_STATE.open("rb") as f:
                rp_state = pickle.load(f)
            rp._webshield_status = rp_state._webshield_status
            rp._port_blocking_deny_mode_values = (
                rp_state._port_blocking_deny_mode_values
            )
            rp._port_blocking_mode = rp_state._port_blocking_mode
            # next is new functionality and _dos_enabled will be missing
            # on startup. Since this is used to prevent adding connection
            # tracking to netfilter we basically had this before set to
            # True, that is why default value needs to be True
            rp._dos_enabled = getattr(rp_state, "_dos_enabled", True)
            STATE["last_ipset_check"] = rp_state.last_ipset_check
    except Exception as e:
        logger.error("Failed to restore RealProtector state: %s", e)
    try:
        if RULES_CHECKER_STATE.exists():
            with RULES_CHECKER_STATE.open("rb") as f:
                rc_state = pickle.load(f)

            if RULES_CHECKER_EXTERN_STATE.exists():
                with RULES_CHECKER_EXTERN_STATE.open("r") as f:
                    extern_state = json.load(f)
                rc_state.update_from_external_state(extern_state)

            rc.active_interface_conf = rc_state._interface_conf
            rc._ipsets_outdated_events = rc_state._ipsets_outdated_events
            rc.outdated_ipsets = rc_state.outdated_ipsets
            rc.versions = rc_state.versions
            for version in rc.versions.keys():
                rc.versions[version].running = True
    except Exception as e:
        logger.error("Failed to restore RulesChecker state: %s", e)
    return rp, rc


def save_state(rp: RealProtector, rc: RulesChecker):
    """Save RealProtector state."""
    rp_state = RealProtectorState(
        rp._webshield_status,
        rp._port_blocking_deny_mode_values,
        rp._port_blocking_mode,
        rp._dos_enabled,
        STATE["last_ipset_check"],
    )
    rc_state = RulesCheckerState(
        rc.active_interface_conf,
        rc._ipsets_outdated_events,
        rc.outdated_ipsets,
        rc.versions,
    )

    with REAL_PROTECTOR_STATE.open("wb") as f:
        pickle.dump(rp_state, f)
    with RULES_CHECKER_STATE.open("wb") as f:
        pickle.dump(rc_state, f)

    with RULES_CHECKER_EXTERN_STATE.open("w") as f:
        json.dump(rc_state.make_external_state(), f, indent=4)


ACTIONS = {
    "recreate": recreate_rules,
    "clear": clear_everything,
    "clear-rules": clear_rules,
    "config-update": check_config_update,
    "global-whitelist-update": check_global_whitelist_update,
    "country-update": check_country_update,
    "strategy-change": recreate_rules_on_strategy_change,
    "ipsets-consistent": check_ipsets_consistent,
    "force-recreate-rules": force_recreate_rules_and_refill_ports_if_needed,
    "refill-ports": refill_ports,
}


def parse_arguments():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(
        description="Imunify360 Rules Checker CLI",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__,
    )
    parser.add_argument(
        "action", choices=ACTIONS.keys(), help="Action to perform"
    )
    parser.add_argument(
        "--refill-ports",
        action="store_true",
        help="refills ports in force-recreate-rules",
    )
    parser.add_argument(
        "--check-all",
        action="store_true",
        help=(
            "Check consistency off all ipsets on python side. Should be used"
            " only in tests"
        ),
    )
    return parser.parse_args()


def main(action, **kwargs):
    try:
        RULES_CHECK_IN_PROGRESS.touch()
    except Exception as e:
        logger.error("Failed to create RULES_CHECK_IN_PROGRESS file: %s", e)

    tls_check.reset()
    setup_environment()
    Merger.update_merged_config()
    loop = asyncio.get_event_loop()
    rp, rc = restore_state(RealProtector(), RulesChecker(loop))

    action_func = ACTIONS[action]
    loop.run_until_complete(action_func(rc, rp, **kwargs))
    save_state(rp, rc)
    logger.info("Script finished")


if __name__ == "__main__":
    args = parse_arguments()

    kwargs = {}
    if args.refill_ports:
        kwargs["refill"] = args.refill_ports
        kwargs["check_all"] = args.check_all

    try:
        main(args.action, **kwargs)
    except Exception as e:
        RULES_CHECK_IN_PROGRESS.unlink(missing_ok=True)
        logger.exception("rules checker failed with unhandled error: %r", e)
        sys.exit(2)
    else:
        RULES_CHECK_IN_PROGRESS.unlink(missing_ok=True)

F1le Man4ger