Source code for rhodes.states._parameters

"""Decorators for adding parameters and methods to State classes."""
from typing import Any

import attr
from attr.validators import instance_of, optional

from rhodes._converters import convert_to_json_path
from rhodes._util import RHODES_ATTRIB, docstring_with_param
from rhodes.exceptions import InvalidDefinitionError
from rhodes.structures import JsonPath, Parameters

__all__ = ("state", "task_type")


def _next_and_end(cls: "StateMirror") -> "StateMirror":
    """Add "Next" and "End" parameters to the class.
    Also adds the "then()" and "end()" helper methods.
    """

    def _validate_next(instance, attribute: attr.Attribute, value: Any):
        if value is not None and instance.End is not None:
            raise ValueError("Only one of 'Next' and 'End' is allowed")

    cls.Next = RHODES_ATTRIB(validator=(optional(instance_of(str)), _validate_next))
    cls.__doc__ = docstring_with_param(cls, "Next", description="The state that will follow this state")

    def _validate_end(instance, attribute: attr.Attribute, value: Any):
        if value is not None and instance.Next is not None:
            raise ValueError("Only one of 'Next' and 'End' is allowed")

        if value is not None and value is not True:
            raise ValueError("If 'End' is set, value must be True")

    cls.End = RHODES_ATTRIB(validator=(optional(instance_of(bool)), _validate_end))
    cls.__doc__ = docstring_with_param(cls, "End", bool, description="This state is a terminal state")

    def _then(instance, next_state):
        """Set the next state in this state machine."""

        if instance.End is not None:
            raise InvalidDefinitionError(
                f"Cannot set state transition. State {instance.title!r} already has an end condition."
            )

        if instance.Next is not None:
            raise InvalidDefinitionError(
                f"Cannot set state transition. State {instance.title!r} already has a state transition."
            )

        instance.member_of.add_state(next_state)
        # TODO: set reference rather than extracting name
        instance.Next = next_state.title
        return next_state

    cls.then = _then

    def _end(instance):
        """Make this state a terminal state."""

        if instance.Next is not None:
            raise InvalidDefinitionError(
                "Cannot set end condition." f"State {instance.title!r} already has a state transition."
            )

        instance.End = True

        return instance

    cls.end = _end

    return cls


def _parameters(cls: "StateMirror") -> "StateMirror":
    """Add the "Parameters" parameter to the class."""
    cls.Parameters = RHODES_ATTRIB(validator=optional(instance_of(Parameters)))
    cls.__doc__ = docstring_with_param(
        cls,
        "Parameters",
        Parameters,
        description="Additional parameters for Step Functions to provide to connected resource",
    )

    return cls


def _result_path(cls: "StateMirror") -> "StateMirror":
    """Add the "ResultPath" parameter to the class."""
    cls.ResultPath = RHODES_ATTRIB(
        default=JsonPath("$"), validator=optional(instance_of(JsonPath)), converter=convert_to_json_path
    )
    cls.__doc__ = docstring_with_param(
        cls,
        "ResultPath",
        JsonPath,
        description="Where in the state input data to place the results of this state",
        default=JsonPath("$"),
    )

    return cls


def _catch_retry(cls: "StateMirror") -> "StateMirror":
    """Add the "Catch" and "Retry" parameters to the class."""
    cls.Retry = RHODES_ATTRIB()
    cls.__doc__ = docstring_with_param(cls, "Retry")

    cls.Catch = RHODES_ATTRIB()
    cls.__doc__ = docstring_with_param(cls, "Catch")

    return cls


def _input_output(cls: "StateMirror") -> "StateMirror":
    """Add the "InputPath" and "OutputPath" parameters to the class."""

    cls.InputPath = RHODES_ATTRIB(
        default=JsonPath("$"), validator=optional(instance_of(JsonPath)), converter=convert_to_json_path
    )
    cls.__doc__ = docstring_with_param(
        cls,
        "InputPath",
        JsonPath,
        description="The portion of the state input data to be used as input for the state",
        default=JsonPath("$"),
    )

    cls.OutputPath = RHODES_ATTRIB(
        default=JsonPath("$"), validator=optional(instance_of(JsonPath)), converter=convert_to_json_path
    )
    cls.__doc__ = docstring_with_param(
        cls,
        "OutputPath",
        JsonPath,
        description="The portion of the state input data to be passed to the next state",
        default=JsonPath("$"),
    )

    return cls


def task_type(cls: "StateMirror") -> "StateMirror":
    """Add common parameters used by all "Task" types."""

    cls = state(cls)
    cls = _next_and_end(cls)
    cls = _input_output(cls)
    cls = _result_path(cls)
    cls = _catch_retry(cls)

    def _validate_positive_value(instance, attribute: attr.Attribute, value: int):
        if value is not None and not value > 0:
            raise ValueError(f"{instance.__class__.__name__} parameter '{attribute.name}' value must be positive")

    # default=99999999
    cls.TimeoutSeconds = RHODES_ATTRIB(validator=(optional(instance_of(int)), _validate_positive_value))
    cls.__doc__ = docstring_with_param(
        cls, "TimeoutSeconds", int, description="Maximum time that this state is allowed to run"
    )

    cls.HeartbeatSeconds = RHODES_ATTRIB(validator=(optional(instance_of(int)), _validate_positive_value))
    cls.__doc__ = docstring_with_param(
        cls, "HeartbeatSeconds", int, description="Maximum time allowed between heartbeat responses from state"
    )

    return cls


def state(cls: "StateMirror") -> "StateMirror":
    """Add common parameters used by all states."""

    cls.__doc__ = docstring_with_param(cls, "title", str, description="Name of state in state machine")

    cls.__doc__ = docstring_with_param(
        cls, "Comment", str, description="Human-readable description of the state", default=""
    )

    return cls