GRAYBYTE WORDPRESS FILE MANAGER4607

Server IP : 198.54.121.189 / Your IP : 216.73.216.112
System : Linux premium69.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
PHP Version : 7.4.33
Disable Function : NONE
cURL : ON | WGET : ON | Sudo : OFF | Pkexec : OFF
Directory : /opt/imunify360/venv/lib/python3.11/site-packages/sentry_sdk/
Upload Files :
Current_dir [ Not Writeable ] Document_root [ Writeable ]

Command :


Current File : /opt/imunify360/venv/lib/python3.11/site-packages/sentry_sdk//transport.py
from __future__ import print_function

import io
import urllib3  # type: ignore
import certifi
import gzip

from datetime import datetime, timedelta

from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions, json_dumps
from sentry_sdk.worker import BackgroundWorker
from sentry_sdk.envelope import Envelope

from sentry_sdk._types import MYPY

if MYPY:
    from typing import Any
    from typing import Callable
    from typing import Dict
    from typing import Iterable
    from typing import Optional
    from typing import Tuple
    from typing import Type
    from typing import Union

    from urllib3.poolmanager import PoolManager  # type: ignore
    from urllib3.poolmanager import ProxyManager

    from sentry_sdk._types import Event, EndpointType

    DataCategory = Optional[str]

try:
    from urllib.request import getproxies
except ImportError:
    from urllib import getproxies  # type: ignore


class Transport(object):
    """Baseclass for all transports.

    A transport is used to send an event to sentry.
    """

    parsed_dsn = None  # type: Optional[Dsn]

    def __init__(
        self, options=None  # type: Optional[Dict[str, Any]]
    ):
        # type: (...) -> None
        self.options = options
        if options and options["dsn"] is not None and options["dsn"]:
            self.parsed_dsn = Dsn(options["dsn"])
        else:
            self.parsed_dsn = None

    def capture_event(
        self, event  # type: Event
    ):
        # type: (...) -> None
        """
        This gets invoked with the event dictionary when an event should
        be sent to sentry.
        """
        raise NotImplementedError()

    def capture_envelope(
        self, envelope  # type: Envelope
    ):
        # type: (...) -> None
        """
        Send an envelope to Sentry.

        Envelopes are a data container format that can hold any type of data
        submitted to Sentry. We use it for transactions and sessions, but
        regular "error" events should go through `capture_event` for backwards
        compat.
        """
        raise NotImplementedError()

    def flush(
        self,
        timeout,  # type: float
        callback=None,  # type: Optional[Any]
    ):
        # type: (...) -> None
        """Wait `timeout` seconds for the current events to be sent out."""
        pass

    def kill(self):
        # type: () -> None
        """Forcefully kills the transport."""
        pass

    def __del__(self):
        # type: () -> None
        try:
            self.kill()
        except Exception:
            pass


def _parse_rate_limits(header, now=None):
    # type: (Any, Optional[datetime]) -> Iterable[Tuple[DataCategory, datetime]]
    if now is None:
        now = datetime.utcnow()

    for limit in header.split(","):
        try:
            retry_after, categories, _ = limit.strip().split(":", 2)
            retry_after = now + timedelta(seconds=int(retry_after))
            for category in categories and categories.split(";") or (None,):
                yield category, retry_after
        except (LookupError, ValueError):
            continue


class HttpTransport(Transport):
    """The default HTTP transport."""

    def __init__(
        self, options  # type: Dict[str, Any]
    ):
        # type: (...) -> None
        from sentry_sdk.consts import VERSION

        Transport.__init__(self, options)
        assert self.parsed_dsn is not None
        self._worker = BackgroundWorker()
        self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
        self._disabled_until = {}  # type: Dict[DataCategory, datetime]
        self._retry = urllib3.util.Retry()
        self.options = options

        self._pool = self._make_pool(
            self.parsed_dsn,
            http_proxy=options["http_proxy"],
            https_proxy=options["https_proxy"],
            ca_certs=options["ca_certs"],
        )

        from sentry_sdk import Hub

        self.hub_cls = Hub

    def _update_rate_limits(self, response):
        # type: (urllib3.HTTPResponse) -> None

        # new sentries with more rate limit insights.  We honor this header
        # no matter of the status code to update our internal rate limits.
        header = response.headers.get("x-sentry-rate-limits")
        if header:
            self._disabled_until.update(_parse_rate_limits(header))

        # old sentries only communicate global rate limit hits via the
        # retry-after header on 429.  This header can also be emitted on new
        # sentries if a proxy in front wants to globally slow things down.
        elif response.status == 429:
            self._disabled_until[None] = datetime.utcnow() + timedelta(
                seconds=self._retry.get_retry_after(response) or 60
            )

    def _send_request(
        self,
        body,  # type: bytes
        headers,  # type: Dict[str, str]
        endpoint_type="store",  # type: EndpointType
    ):
        # type: (...) -> None
        headers.update(
            {
                "User-Agent": str(self._auth.client),
                "X-Sentry-Auth": str(self._auth.to_header()),
            }
        )
        response = self._pool.request(
            "POST",
            str(self._auth.get_api_url(endpoint_type)),
            body=body,
            headers=headers,
        )

        try:
            self._update_rate_limits(response)

            if response.status == 429:
                # if we hit a 429.  Something was rate limited but we already
                # acted on this in `self._update_rate_limits`.
                pass

            elif response.status >= 300 or response.status < 200:
                logger.error(
                    "Unexpected status code: %s (body: %s)",
                    response.status,
                    response.data,
                )
        finally:
            response.close()

    def _check_disabled(self, category):
        # type: (str) -> bool
        def _disabled(bucket):
            # type: (Any) -> bool
            ts = self._disabled_until.get(bucket)
            return ts is not None and ts > datetime.utcnow()

        return _disabled(category) or _disabled(None)

    def _send_event(
        self, event  # type: Event
    ):
        # type: (...) -> None

        if self._check_disabled("error"):
            return None

        body = io.BytesIO()
        with gzip.GzipFile(fileobj=body, mode="w") as f:
            f.write(json_dumps(event))

        assert self.parsed_dsn is not None
        logger.debug(
            "Sending event, type:%s level:%s event_id:%s project:%s host:%s"
            % (
                event.get("type") or "null",
                event.get("level") or "null",
                event.get("event_id") or "null",
                self.parsed_dsn.project_id,
                self.parsed_dsn.host,
            )
        )
        self._send_request(
            body.getvalue(),
            headers={"Content-Type": "application/json", "Content-Encoding": "gzip"},
        )
        return None

    def _send_envelope(
        self, envelope  # type: Envelope
    ):
        # type: (...) -> None

        # remove all items from the envelope which are over quota
        envelope.items[:] = [
            x for x in envelope.items if not self._check_disabled(x.data_category)
        ]
        if not envelope.items:
            return None

        body = io.BytesIO()
        with gzip.GzipFile(fileobj=body, mode="w") as f:
            envelope.serialize_into(f)

        assert self.parsed_dsn is not None
        logger.debug(
            "Sending envelope [%s] project:%s host:%s",
            envelope.description,
            self.parsed_dsn.project_id,
            self.parsed_dsn.host,
        )
        self._send_request(
            body.getvalue(),
            headers={
                "Content-Type": "application/x-sentry-envelope",
                "Content-Encoding": "gzip",
            },
            endpoint_type="envelope",
        )
        return None

    def _get_pool_options(self, ca_certs):
        # type: (Optional[Any]) -> Dict[str, Any]
        return {
            "num_pools": 2,
            "cert_reqs": "CERT_REQUIRED",
            "ca_certs": ca_certs or certifi.where(),
        }

    def _in_no_proxy(self, parsed_dsn):
        # type: (Dsn) -> bool
        no_proxy = getproxies().get("no")
        if not no_proxy:
            return False
        for host in no_proxy.split(","):
            host = host.strip()
            if parsed_dsn.host.endswith(host) or parsed_dsn.netloc.endswith(host):
                return True
        return False

    def _make_pool(
        self,
        parsed_dsn,  # type: Dsn
        http_proxy,  # type: Optional[str]
        https_proxy,  # type: Optional[str]
        ca_certs,  # type: Optional[Any]
    ):
        # type: (...) -> Union[PoolManager, ProxyManager]
        proxy = None
        no_proxy = self._in_no_proxy(parsed_dsn)

        # try HTTPS first
        if parsed_dsn.scheme == "https" and (https_proxy != ""):
            proxy = https_proxy or (not no_proxy and getproxies().get("https"))

        # maybe fallback to HTTP proxy
        if not proxy and (http_proxy != ""):
            proxy = http_proxy or (not no_proxy and getproxies().get("http"))

        opts = self._get_pool_options(ca_certs)

        if proxy:
            return urllib3.ProxyManager(proxy, **opts)
        else:
            return urllib3.PoolManager(**opts)

    def capture_event(
        self, event  # type: Event
    ):
        # type: (...) -> None
        hub = self.hub_cls.current

        def send_event_wrapper():
            # type: () -> None
            with hub:
                with capture_internal_exceptions():
                    self._send_event(event)

        self._worker.submit(send_event_wrapper)

    def capture_envelope(
        self, envelope  # type: Envelope
    ):
        # type: (...) -> None
        hub = self.hub_cls.current

        def send_envelope_wrapper():
            # type: () -> None
            with hub:
                with capture_internal_exceptions():
                    self._send_envelope(envelope)

        self._worker.submit(send_envelope_wrapper)

    def flush(
        self,
        timeout,  # type: float
        callback=None,  # type: Optional[Any]
    ):
        # type: (...) -> None
        logger.debug("Flushing HTTP transport")
        if timeout > 0:
            self._worker.flush(timeout, callback)

    def kill(self):
        # type: () -> None
        logger.debug("Killing HTTP transport")
        self._worker.kill()


class _FunctionTransport(Transport):
    def __init__(
        self, func  # type: Callable[[Event], None]
    ):
        # type: (...) -> None
        Transport.__init__(self)
        self._func = func

    def capture_event(
        self, event  # type: Event
    ):
        # type: (...) -> None
        self._func(event)
        return None


def make_transport(options):
    # type: (Dict[str, Any]) -> Optional[Transport]
    ref_transport = options["transport"]

    # If no transport is given, we use the http transport class
    if ref_transport is None:
        transport_cls = HttpTransport  # type: Type[Transport]
    elif isinstance(ref_transport, Transport):
        return ref_transport
    elif isinstance(ref_transport, type) and issubclass(ref_transport, Transport):
        transport_cls = ref_transport
    elif callable(ref_transport):
        return _FunctionTransport(ref_transport)  # type: ignore

    # if a transport class is given only instantiate it if the dsn is not
    # empty or None
    if options["dsn"]:
        return transport_cls(options)

    return None

[ Back ]
Name
Size
Last Modified
Owner / Group
Permissions
Options
..
--
July 11 2025 07:53:06
root / root
0755
__pycache__
--
July 02 2025 08:37:25
root / root
0755
integrations
--
July 02 2025 08:36:55
root / root
0755
__init__.py
0.834 KB
June 09 2025 11:12:39
root / root
0644
_compat.py
2.305 KB
June 09 2025 11:12:39
root / root
0644
_functools.py
2.223 KB
June 09 2025 11:12:39
root / root
0644
_queue.py
8.206 KB
June 09 2025 11:12:39
root / root
0644
_types.py
1.262 KB
June 09 2025 11:12:39
root / root
0644
api.py
4.694 KB
June 09 2025 11:12:39
root / root
0644
attachments.py
1.751 KB
June 09 2025 11:12:39
root / root
0644
client.py
14.031 KB
June 09 2025 11:12:39
root / root
0644
consts.py
3.5 KB
June 09 2025 11:12:39
root / root
0644
debug.py
1.105 KB
June 09 2025 11:12:39
root / root
0644
envelope.py
8.175 KB
June 09 2025 11:12:39
root / root
0644
hub.py
21.349 KB
June 09 2025 11:12:39
root / root
0644
py.typed
0 KB
June 09 2025 11:12:39
root / root
0644
scope.py
15.616 KB
June 09 2025 11:12:39
root / root
0644
serializer.py
15.925 KB
June 09 2025 11:12:39
root / root
0644
sessions.py
7.665 KB
June 09 2025 11:12:39
root / root
0644
tracing.py
25.236 KB
June 09 2025 11:12:39
root / root
0644
transport.py
11.477 KB
June 09 2025 11:12:39
root / root
0644
utils.py
26.26 KB
June 09 2025 11:12:39
root / root
0644
worker.py
3.868 KB
June 09 2025 11:12:39
root / root
0644

GRAYBYTE WORDPRESS FILE MANAGER @ 2025
CONTACT ME
Static GIF