Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/qubes-exc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ call for effect). Instead consider writing in negative form, implying expected
state: "Domain is not running" instead of "Domain is paused" (yeah, what's wrong
with that?).

Also avoid implying the personhood of the computer, including adressing user in
Also avoid implying the personhood of the computer, including addressing user in
second person. For example, write "Sending message failed" instead of "I failed
to send the message".

Expand Down
94 changes: 72 additions & 22 deletions qubes/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import re
import shutil
import socket
import string
import struct
import traceback
from typing import Union, Any
Expand Down Expand Up @@ -189,6 +190,11 @@ def __init__(
self._handler = candidates[0]
self._running_handler = None

# pylint: disable=invalid-name
self.EXC_ARG_NOT_IN_DEST_VOLUMES = "{} volumes".format(self.dest.name)
# pylint: disable=invalid-name
self.EXC_ARG_NOT_IN_POOLS = "pools"

@classmethod
def list_methods(cls, select_method=None):
for attr in dir(cls):
Expand Down Expand Up @@ -239,26 +245,76 @@ def fire_event_for_filter(self, iterable, **kwargs):
"""Fire an event on the source qube to filter for permission"""
return apply_filters(iterable, self.fire_event_for_permission(**kwargs))

def enforce_arg(
self,
wants: Union[None, bool, str, list, tuple] = None,
short_reason: str = "",
) -> None:
"""Enforce argument to be absent or be in iterable."""
if wants is None:
self.enforce(not self.arg, reason="Argument is present")
elif wants is True:
self.enforce(self.arg, reason="Argument is empty")
else:
assert isinstance(self.arg, str)
assert short_reason
if wants is None or isinstance(wants, bool):
assertion = self.arg is wants
elif isinstance(wants, str):
assertion = self.arg == wants
else:
assertion = self.arg in wants
self.enforce(
assertion,
reason="Argument not in {!r}".format(short_reason),
)

def enforce_dest_dom0(self, wants: bool = True) -> None:
"""Enforce destination to be or not to be dom0."""
if wants:
self.enforce(
self.dest.name == "dom0", reason="Destination is not dom0"
)
else:
self.enforce(self.dest.name != "dom0", reason="Destination is dom0")

@staticmethod
def enforce(predicate):
"""An assert replacement, but works even with optimisations."""
def enforce(predicate, reason: str = "") -> None:
"""If predicate is false, raise an exception to terminate handling
the request.

This will raise :py:class:`ProtocolError` if the predicate is false.
See the documentation of that class for details.

:param str reason: Exception motive.
"""
if not predicate:
raise PermissionDenied()
raise ProtocolError(reason)

def validate_size(
self, untrusted_size: bytes, allow_negative: bool = False
self, untrusted_size: bytes, name: str, allow_negative: bool = False
) -> int:
self.enforce(isinstance(untrusted_size, bytes))
allowed_chars = string.ascii_letters + string.digits + "-_. "
assert all(c in allowed_chars for c in name)
if not isinstance(untrusted_size, bytes):
raise ProtocolError(
"Expected {!r} to be of type bytes, got {!r}".format(
name, type(untrusted_size).__name__
)
)
coefficient = 1
if allow_negative and untrusted_size.startswith(b"-"):
coefficient = -1
untrusted_size = untrusted_size[1:]
name_cap = name.capitalize()
if not untrusted_size.isdigit():
raise qubes.exc.ProtocolError("Size must be ASCII digits (only)")
raise ProtocolError(name_cap + " size contains non ASCII digits")
if len(untrusted_size) >= 20:
raise qubes.exc.ProtocolError("Sizes limited to 19 decimal digits")
raise ProtocolError(
name_cap + " size is bigger than 19 decimal digits"
)
if untrusted_size[0] == 48 and untrusted_size != b"0":
raise qubes.exc.ProtocolError("Spurious leading zeros not allowed")
raise ProtocolError(name_cap + " contains spurious leading zeros")
return int(untrusted_size) * coefficient


Expand Down Expand Up @@ -345,21 +401,15 @@ async def respond(self, src, meth, dest, arg, *, untrusted_payload):

# except clauses will fall through to transport.abort() below

except PermissionDenied:
self.app.log.warning(
"permission denied for call %s+%s (%s → %s) "
"with payload of %d bytes",
meth,
arg,
src,
dest,
len(untrusted_payload),
)

except ProtocolError:
except (PermissionDenied, ProtocolError) as exc:
log_prefix = None
if isinstance(exc, PermissionDenied):
log_prefix = "permission denied"
elif isinstance(exc, ProtocolError):
log_prefix = "protocol error"
self.app.log.warning(
"protocol error for call %s+%s (%s → %s) "
"with payload of %d bytes",
"%s for call %s+%s (%s → %s) with payload of %d bytes",
log_prefix,
meth,
arg,
src,
Expand Down
Loading