GRAYBYTE WORDPRESS FILE MANAGER2791

Server IP : 198.54.121.189 / Your IP : 216.73.216.112
System : Linux premium69.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
PHP Version : 7.4.33
Disable Function : NONE
cURL : ON | WGET : ON | Sudo : OFF | Pkexec : OFF
Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/feature_management/
Upload Files :
Current_dir [ Not Writeable ] Document_root [ Writeable ]

Command :


Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/feature_management//utils.py
import logging
from itertools import chain
from typing import List, Any

from defence360agent.feature_management import hooks
from defence360agent.feature_management.model import FeatureManagementPerms
from defence360agent.model import instance
from defence360agent.utils import execute_iterable_expression
from .lookup import features

logger = logging.getLogger(__name__)


async def set_feature(user: str, feature: str, value: str) -> bool:
    """Sets a `feature` to `value` for a given `user`.

    Calls appropriate hook and returns its (bool) result. Logs the result of
    setting change. If hook fails rollbacks changes to database.
    """
    with instance.db.atomic() as trx:
        perm = FeatureManagementPerms.get_perm(user)
        perm.set_feature(feature, value)
        hook = hooks.get_hook(feature)
        ok = hook(user, value)
        if ok:
            logger.info(
                "Applied setting %s=%s for user %s", feature, value, user
            )
        else:
            logger.error(
                "Failed to apply setting %s=%s for user %s",
                feature,
                value,
                user,
            )
            trx.rollback()
        return ok


async def update_users(
    feature: str, users: List[str], value: Any, existing_users: List[str]
):
    result = {"succeeded": [], "failed": []}

    for user in users:
        if user not in existing_users:
            logger.warning("No such user: %s", user)
            continue

        if await set_feature(user, feature, value):
            result["succeeded"].append(user)
        else:
            result["failed"].append(user)

    return result


async def update_default(feature: str, value: Any):
    perm = FeatureManagementPerms.get_default()
    perm.set_feature(feature, value)
    hook = hooks.get_hook(feature)
    return hook(None, value)


async def sync_users(users: List[str]) -> bool:
    """Synchronize existing permissions with panel users"""
    panel_users = set(users)

    perm_users = FeatureManagementPerms.select(FeatureManagementPerms.user)
    perm_users = set(chain(*perm_users.tuples()))

    perms_to_remove = perm_users - panel_users
    perms_to_remove.remove(FeatureManagementPerms.DEFAULT)

    if perms_to_remove:
        logger.info("Remove permissions of users %s", perms_to_remove)

        def expression(perms_to_remove):
            return FeatureManagementPerms.delete().where(
                FeatureManagementPerms.user.in_(perms_to_remove)
            )

        execute_iterable_expression(expression, list(perms_to_remove))

    perms_to_add = panel_users - perm_users

    if perms_to_add:
        logger.info("Add permissions to users %s", perms_to_add)

    for user in perms_to_add:
        perm = FeatureManagementPerms.get_perm(user)

        for feature in features:
            value = perm.get_feature(feature)
            callback = hooks.get_hook(feature)
            callback(user, value)
    return bool(perms_to_add) or bool(perms_to_remove)


async def reset_features(**features):
    """Sets feature values for all existing users in feature management
    database to given values in `features`."""
    users = list(
        chain(
            *FeatureManagementPerms.select(FeatureManagementPerms.user)
            .where(
                FeatureManagementPerms.user != FeatureManagementPerms.DEFAULT
            )
            .tuples()
        )
    )
    for user in users:
        for feature, value in features.items():
            await set_feature(user, feature, value)

[ Back ]
Name
Size
Last Modified
Owner / Group
Permissions
Options
..
--
July 11 2025 07:52:56
root / root
0755
__pycache__
--
July 11 2025 07:52:56
root / root
0755
plugins
--
July 11 2025 07:52:56
root / root
0755
rpc
--
July 11 2025 07:52:56
root / root
0755
__init__.py
0 KB
July 09 2025 15:10:25
root / root
0644
checkers.py
2.449 KB
July 09 2025 15:10:25
root / root
0644
constants.py
0.793 KB
July 09 2025 15:10:25
root / root
0644
control.py
2.496 KB
July 09 2025 15:10:25
root / root
0644
exceptions.py
0.299 KB
July 09 2025 15:10:25
root / root
0644
hooks.py
2.394 KB
July 09 2025 15:10:25
root / root
0644
lookup.py
2.765 KB
July 09 2025 15:10:25
root / root
0644
model.py
2.318 KB
July 09 2025 15:10:25
root / root
0644
utils.py
3.477 KB
July 09 2025 15:10:25
root / root
0644

GRAYBYTE WORDPRESS FILE MANAGER @ 2025
CONTACT ME
Static GIF