===== tests/test_event.py =====
"""
Tests for ``simpy.events.Event``.

"""
# Pytest gets the parameters "env" and "log" from the *conftest.py* file
import re

import pytest


def test_succeed(env):
    """Test for the Environment.event() helper function."""

    def child(env, event):
        value = yield event
        assert value == 'ohai'
        assert env.now == 5

    def parent(env):
        event = env.event()
        env.process(child(env, event))
        yield env.timeout(5)
        event.succeed('ohai')

    env.process(parent(env))
    env.run()


def test_fail(env):
    """Test for the Environment.event() helper function."""

    def child(env, event):
        with pytest.raises(ValueError, match='ohai'):
            yield event
        assert env.now == 5

    def parent(env):
        event = env.event()
        env.process(child(env, event))
        yield env.timeout(5)
        event.fail(ValueError('ohai'))

    env.process(parent(env))
    env.run()


def test_names(env):
    def pem():
        return
        yield

    assert re.match(r'<Event\(\) object at 0x.*>', str(env.event()))

    assert re.match(r'<Timeout\(1\) object at 0x.*>', str(env.timeout(1)))
    assert re.match(
        r'<Timeout\(1, value=2\) object at 0x.*>', str(env.timeout(1, value=2))
    )

    assert re.match(
        r'<Condition\(all_events, \(<Event\(\) object at 0x.*>, '
        r'<Event\(\) object at 0x.*>\)\) object at 0x.*>',
        str(env.event() & env.event()),
    )

    assert re.match(r'<Process\(pem\) object at 0x.*>', str(env.process(pem())))


def test_value(env):
    """After an event has been triggered, its value becomes accessible."""
    event = env.timeout(0, 'I am the value')

    env.run()

    assert event.value == 'I am the value'


def test_unavailable_value(env):
    """If an event has not yet been triggered, its value is not available and
    trying to access it will result in a AttributeError."""
    event = env.event()

    with pytest.raises(AttributeError, match='.* is not yet available$'):
        _ = event.value


def test_triggered(env):
    def pem(env, event):
        value = yield event
        return value

    event = env.event()
    event.succeed('i was already done')

    result = env.run(env.process(pem(env, event)))

    assert result == 'i was already done'


def test_callback_modification(env):
    """The callbacks of an event will get set to None before actually invoking
    the callbacks. This prevents concurrent modifications."""

    def callback(event):
        assert event.callbacks is None

    event = env.event()
    event.callbacks.append(callback)
    event.succeed()
    env.run(until=event)


def test_condition_callback_removal(env):
    """A condition will remove all outstanding callbacks from its events."""
    a, b = env.event(), env.event()
    a.succeed()
    env.run(until=a | b)
    # The condition has removed its callback from event b.
    assert not a.callbacks
    assert not b.callbacks


def test_condition_nested_callback_removal(env):
    """A condition will remove all outstanding callbacks from its events (even
    if nested)."""
    a, b, c = env.event(), env.event(), env.event()
    b_and_c = b & c
    a_or_b_and_c = a | b_and_c
    a.succeed()
    env.run(until=a_or_b_and_c)
    # Callbacks from nested conditions are also removed.
    assert not a.callbacks
    assert not b.callbacks
    assert not c.callbacks
    for cb in b_and_c.callbacks:
        # b_and_c may have a _build_value callback.
        assert cb.__name__ != '_check'
    assert not a_or_b_and_c.callbacks
===== tests/test_process.py =====
"""
Tests for the ``simpy.events.Process``.

"""
# Pytest gets the parameters "env" and "log" from the *conftest.py* file
import pytest

from simpy import Interrupt


def test_start_non_process(env):
    """Check that you cannot start a normal function."""

    def foo():
        pass

    with pytest.raises(ValueError, match='is not a generator'):
        env.process(foo)


def test_get_state(env):
    """A process is alive until it's generator has not terminated."""

    def pem_a(env):
        yield env.timeout(3)

    def pem_b(env, pem_a):
        yield env.timeout(1)
        assert pem_a.is_alive

        yield env.timeout(3)
        assert not pem_a.is_alive

    proc_a = env.process(pem_a(env))
    env.process(pem_b(env, proc_a))
    env.run()


def test_target(env):
    def pem(env, event):
        yield event

    event = env.timeout(5)
    proc = env.process(pem(env, event))

    # Wait until "proc" is initialized and yielded the event
    while env.peek() < 5:
        env.step()
    assert proc.target is event
    proc.interrupt()


def test_wait_for_proc(env):
    """A process can wait until another process finishes."""

    def finisher(env):
        yield env.timeout(5)

    def waiter(env, finisher):
        proc = env.process(finisher(env))
        yield proc  # Waits until "proc" finishes

        assert env.now == 5

    env.process(waiter(env, finisher))
    env.run()


def test_return_value(env):
    """Processes can set a return value."""

    def child(env):
        yield env.timeout(1)
        return env.now

    def parent(env):
        result1 = yield env.process(child(env))
        result2 = yield env.process(child(env))

        assert [result1, result2] == [1, 2]

    env.process(parent(env))
    env.run()


def test_child_exception(env):
    """A child catches an exception and sends it to its parent."""

    def child(env):
        yield env.timeout(1)
        return RuntimeError('Onoes!')

    def parent(env):
        result = yield env.process(child(env))
        assert isinstance(result, Exception)

    env.process(parent(env))
    env.run()


def test_interrupted_join(env):
    """Interrupts remove a process from the callbacks of its target."""

    def interruptor(env, process):
        yield env.timeout(1)
        process.interrupt()

    def child(env):
        yield env.timeout(2)

    def parent(env):
        child_proc = env.process(child(env))
        try:
            yield child_proc
            pytest.fail('Did not receive an interrupt.')
        except Interrupt:
            assert env.now == 1
            assert child_proc.is_alive

            # We should not get resumed when child terminates.
            yield env.timeout(5)
            assert env.now == 6

    parent_proc = env.process(parent(env))
    env.process(interruptor(env, parent_proc))
    env.run()


def test_interrupted_join_and_rejoin(env):
    """Tests that interrupts are raised while the victim is waiting for
    another process. The victim tries to join again.

    """

    def interruptor(env, process):
        yield env.timeout(1)
        process.interrupt()

    def child(env):
        yield env.timeout(2)

    def parent(env):
        child_proc = env.process(child(env))
        try:
            yield child_proc
            pytest.fail('Did not receive an interrupt.')
        except Interrupt:
            assert env.now == 1
            assert child_proc.is_alive

            yield child_proc
            assert env.now == 2

    parent_proc = env.process(parent(env))
    env.process(interruptor(env, parent_proc))
    env.run()


def test_error_and_interrupted_join(env):
    def child_a(env, process):
        process.interrupt()
        return
        yield  # Dummy yield

    def child_b(env):
        raise AttributeError('spam')
        yield  # Dummy yield

    def parent(env):
        env.process(child_a(env, env.active_process))
        b = env.process(child_b(env))

        try:
            yield b
        # This interrupt unregisters me from b so I won't receive its
        # AttributeError
        except Interrupt:
            pass

        yield env.timeout(0)

    env.process(parent(env))
    pytest.raises(AttributeError, env.run)
===== tests/test_condition.py =====
import pytest


def test_operator_and(env):
    def process(env):
        timeout = [env.timeout(delay, value=delay) for delay in range(3)]
        results = yield timeout[0] & timeout[1] & timeout[2]

        assert results == {
            timeout[0]: 0,
            timeout[1]: 1,
            timeout[2]: 2,
        }

    env.process(process(env))
    env.run()


def test_operator_and_blocked(env):
    def process(env):
        timeout = env.timeout(1)
        event = env.event()
        yield env.timeout(1)

        condition = timeout & event
        assert not condition.triggered

    env.process(process(env))
    env.run()


def test_operator_or(env):
    def process(env):
        timeout = [env.timeout(delay, value=delay) for delay in range(3)]
        results = yield timeout[0] | timeout[1] | timeout[2]

        assert results == {
            timeout[0]: 0,
        }

    env.process(process(env))
    env.run()


def test_operator_nested_and(env):
    def process(env):
        timeout = [env.timeout(delay, value=delay) for delay in range(3)]
        results = yield (timeout[0] & timeout[2]) | timeout[1]

        assert results == {
            timeout[0]: 0,
            timeout[1]: 1,
        }
        assert env.now == 1

    env.process(process(env))
    env.run()


def test_operator_nested_or(env):
    def process(env):
        timeout = [env.timeout(delay, value=delay) for delay in range(3)]
        results = yield (timeout[0] | timeout[1]) & timeout[2]

        assert results == {
            timeout[0]: 0,
            timeout[1]: 1,
            timeout[2]: 2,
        }
        assert env.now == 2

    env.process(process(env))
    env.run()


def test_nested_cond_with_error(env):
    def explode(env):
        yield env.timeout(1)
        raise ValueError('Onoes!')

    def process(env):
        with pytest.raises(ValueError, match='Onoes!'):
            yield env.process(explode(env)) & env.timeout(1)

    env.process(process(env))
    env.run()


def test_cond_with_error(env):
    def explode(env, delay):
        yield env.timeout(delay)
        raise ValueError(f'Onoes, failed after {delay}!')

    def process(env):
        with pytest.raises(ValueError, match='Onoes, failed after 0!'):
            yield env.process(explode(env, 0)) | env.timeout(1)

    env.process(process(env))
    env.run()


def test_cond_with_nested_error(env):
    def explode(env, delay):
        yield env.timeout(delay)
        raise ValueError(f'Onoes, failed after {delay}!')

    def process(env):
        with pytest.raises(ValueError, match='Onoes, failed after 0!'):
            yield env.process(explode(env, 0)) & env.timeout(1) | env.timeout(1)

    env.process(process(env))
    env.run()


def test_cond_with_uncaught_error(env):
    """Errors that happen after the condition has been triggered will not be
    handled by the condition and cause the simulation to crash."""

    def explode(env, delay):
        yield env.timeout(delay)
        raise ValueError(f'Onoes, failed after {delay}!')

    def process(env):
        yield env.timeout(1) | env.process(explode(env, 2))

    env.process(process(env))
    with pytest.raises(ValueError, match='Onoes, failed after'):
        env.run()
    assert env.now == 2


def test_iand_with_and_cond(env):
    def process(env):
        cond = env.timeout(1, value=1) & env.timeout(2, value=2)
        orig = cond

        cond &= env.timeout(0, value=0)
        assert cond is not orig

        results = yield cond
        assert list(results.values()) == [1, 2, 0]

    env.process(process(env))
    env.run()


def test_iand_with_or_cond(env):
    def process(env):
        cond = env.timeout(1, value=1) | env.timeout(2, value=2)
        orig = cond

        cond &= env.timeout(0, value=0)
        assert cond is not orig

        results = yield cond
        assert list(results.values()) == [1, 0]

    env.process(process(env))
    env.run()


def test_ior_with_or_cond(env):
    def process(env):
        cond = env.timeout(1, value=1) | env.timeout(2, value=2)
        orig = cond

        cond |= env.timeout(0, value=0)
        assert cond is not orig

        results = yield cond
        assert list(results.values()) == [0]

    env.process(process(env))
    env.run()


def test_ior_with_and_cond(env):
    def process(env):
        cond = env.timeout(1, value=1) & env.timeout(2, value=2)
        orig = cond

        cond |= env.timeout(0, value=0)
        assert cond is not orig

        results = yield cond
        assert list(results.values()) == [0]

    env.process(process(env))
    env.run()


def test_immutable_results(env):
    """Results of conditions should not change after they have been
    triggered."""

    def process(env):
        timeout = [env.timeout(delay, value=delay) for delay in range(3)]
        # The or condition in this expression will trigger immediately. The and
        # condition will trigger later on.
        condition = timeout[0] | (timeout[1] & timeout[2])

        results = yield condition
        assert results == {timeout[0]: 0}

        # Make sure that the results of condition were frozen. The results of
        # the nested and condition do not become visible afterwards.
        yield env.timeout(2)
        assert results == {timeout[0]: 0}

    env.process(process(env))
    env.run()


def test_shared_and_condition(env):
    timeout = [env.timeout(delay, value=delay) for delay in range(3)]
    c1 = timeout[0] & timeout[1]
    c2 = c1 & timeout[2]

    def p1(_, condition):
        results = yield condition
        assert results == {timeout[0]: 0, timeout[1]: 1}

    def p2(_, condition):
        results = yield condition
        assert results == {timeout[0]: 0, timeout[1]: 1, timeout[2]: 2}

    env.process(p1(env, c1))
    env.process(p2(env, c2))
    env.run()


def test_shared_or_condition(env):
    timeout = [env.timeout(delay, value=delay) for delay in range(3)]
    c1 = timeout[0] | timeout[1]
    c2 = c1 | timeout[2]

    def p1(_, condition):
        results = yield condition
        assert results == {timeout[0]: 0}

    def p2(_, condition):
        results = yield condition
        assert results == {timeout[0]: 0}

    env.process(p1(env, c1))
    env.process(p2(env, c2))
    env.run()


def test_condition_value(env):
    """The value of a condition behaves like a readonly dictionary."""
    timeouts = [env.timeout(delay, value=delay) for delay in range(3)]

    def p(env, timeouts):
        results = yield env.all_of(timeouts)
        assert list(results) == timeouts
        assert list(results.keys()) == timeouts
        assert list(results.values()) == [0, 1, 2]
        assert list(results.items()) == list(zip(timeouts, [0, 1, 2]))
        assert timeouts[0] in results
===== tests/test_environment.py =====
"""
General test for the `simpy.core.Environment`.

"""
# Pytest gets the parameters "env" and "log" from the *conftest.py* file
import pytest


def test_event_queue_empty(env, log):
    """The simulation should stop if th<response clipped><NOTE>Due to the max output limit, only part of the full response has been shown to you.</NOTE> parent(env):
        child_proc = env.process(child(env))
        yield child_proc

    def grandparent(env):
        parent_proc = env.process(parent(env))
        yield parent_proc

    env.process(grandparent(env))
    try:
        env.run()
        pytest.fail('There should have been an exception')
    except RuntimeError:
        trace = traceback.format_exc()

        expected = (
            re.escape(
                textwrap.dedent(
                    """\
        Traceback (most recent call last):
          File "{path}tests/test_exceptions.py", line {line}, in child
            raise RuntimeError('foo')
        RuntimeError: foo

        The above exception was the direct cause of the following exception:

        Traceback (most recent call last):
          File "{path}tests/test_exceptions.py", line {line}, in parent
            yield child_proc
        RuntimeError: foo

        The above exception was the direct cause of the following exception:

        Traceback (most recent call last):
          File "{path}tests/test_exceptions.py", line {line}, in grandparent
            yield parent_proc
        RuntimeError: foo

        The above exception was the direct cause of the following exception:

        Traceback (most recent call last):
          File "{path}tests/test_exceptions.py", line {line}, in test_exception_chaining
            env.run()
          File "{path}simpy/core.py", line {line}, in run
            self.step()
          File "{path}simpy/core.py", line {line}, in step
            raise exc
        RuntimeError: foo
        """
                )
            )
            .replace(r'\{line\}', r'\d+')
            .replace(r'\{path\}', r'.*')
        )

        if platform.system() == 'Windows':
            expected = expected.replace(r'\/', r'\\')

        assert re.match(expected, trace), 'Traceback mismatch'


def test_invalid_event(env):
    """Invalid yield values will cause the simulation to fail."""

    def root(_):
        yield None

    env.process(root(env))
    with pytest.raises(RuntimeError, match='Invalid yield value "None"'):
        env.run()


def test_exception_handling(env):
    """If failed events are not defused (which is the default) the simulation
    crashes."""

    event = env.event()
    event.fail(RuntimeError())
    with pytest.raises(RuntimeError):
        env.run(until=1)


def test_callback_exception_handling(env):
    """Callbacks of events may handle exception by setting the ``defused``
    attribute of ``event`` to ``True``."""

    def callback(event):
        event.defused = True

    event = env.event()
    event.callbacks.append(callback)
    event.fail(RuntimeError())
    assert not event.defused, 'Event has been defused immediately'
    env.run(until=1)
    assert event.defused, 'Event has not been defused'


def test_process_exception_handling(env):
    """Processes can't ignore failed events and auto-handle exceptions."""

    def pem(_, event):
        try:
            yield event
            pytest.fail('Hey, the event should fail!')
        except RuntimeError:
            pass

    event = env.event()
    env.process(pem(env, event))
    event.fail(RuntimeError())

    assert not event.defused, 'Event has been defused immediately'
    env.run(until=1)
    assert event.defused, 'Event has not been defused'


def test_process_exception_chaining(env):
    """Because multiple processes can be waiting for an event, exceptions of
    failed events are copied before being thrown into a process. Otherwise, the
    traceback of the exception gets modified by a process.

    See https://bitbucket.org/simpy/simpy/issue/60 for more details."""
    import traceback

    def process_a(event):
        try:
            yield event
        except RuntimeError:
            stacktrace = traceback.format_exc()
            assert 'process_b' not in stacktrace

    def process_b(event):
        try:
            yield event
        except RuntimeError:
            stacktrace = traceback.format_exc()
            assert 'process_a' not in stacktrace

    event = env.event()
    event.fail(RuntimeError('foo'))

    env.process(process_a(event))
    env.process(process_b(event))

    env.run()


def test_sys_excepthook(env):
    """Check that the default exception hook reports exception chains."""

    def process_a(event):
        yield event

    def process_b(event):
        yield event

    event = env.event()
    event.fail(RuntimeError('foo'))

    env.process(process_b(env.process(process_a(event))))

    try:
        env.run()
    except BaseException:
        # Let the default exception hook print the traceback to the redirected
        # standard error channel.
        import sys
        from io import StringIO

        stderr, sys.stderr = sys.stderr, StringIO()

        typ, e, tb = sys.exc_info()
        assert typ is not None
        assert e is not None
        sys.excepthook(typ, e, tb)

        traceback = sys.stderr.getvalue()

        sys.stderr = stderr
===== tests/test_rt.py =====
"""
Tests for SimPy's real-time behavior.

"""
from time import monotonic, sleep

import pytest

from simpy.rt import RealtimeEnvironment


def process(env, log, sleep_time, timeout=1):
    """Test process."""
    while True:
        sleep(sleep_time)
        yield env.timeout(timeout)
        log.append(env.now)


def check_duration(real, expected):
    return expected <= real < (expected + 0.02)


@pytest.mark.parametrize('factor', [0.1, 0.05, 0.15])
def test_rt(log, factor):
    """Basic tests for run()."""
    start = monotonic()
    env = RealtimeEnvironment(factor=factor)
    env.process(process(env, log, 0.01, 1))
    env.process(process(env, log, 0.02, 1))

    env.run(2)
    duration = monotonic() - start

    assert check_duration(duration, 2 * factor)
    assert log == [1, 1]


def test_rt_multiple_call(log):
    """Test multiple calls to run()."""
    start = monotonic()
    env = RealtimeEnvironment(factor=0.05)

    env.process(process(env, log, 0.01, 2))
    env.process(process(env, log, 0.01, 3))

    env.run(5)
    duration = monotonic() - start

    # assert almost_equal(duration, 0.2)
    assert check_duration(duration, 5 * 0.05)
    assert log == [2, 3, 4]

    env.run(12)
    duration = monotonic() - start

    assert check_duration(duration, 12 * 0.05)
    assert log == [2, 3, 4, 6, 6, 8, 9, 10]


def test_rt_slow_sim_default_behavior(log):
    """By default, SimPy should raise an error if a simulation is too
    slow for the selected real-time factor."""
    env = RealtimeEnvironment(factor=0.05)
    env.process(process(env, log, 0.1, 1))

    err = pytest.raises(RuntimeError, env.run, 3)
    assert 'Simulation too slow for real time' in str(err.value)
    assert log == []


def test_rt_slow_sim_no_error(log):
    """Test ignoring slow simulations."""
    start = monotonic()
    env = RealtimeEnvironment(factor=0.05, strict=False)
    env.process(process(env, log, 0.1, 1))

    env.run(2)
    duration = monotonic() - start

    assert check_duration(duration, 2 * 0.1)
    assert log == [1]


def test_rt_illegal_until():
    """Test illegal value for *until*."""
    env = RealtimeEnvironment()
    with pytest.raises(
        ValueError,
        match=r'until \(-1\) must be greater than the current simulation time',
    ):
        env.run(-1)


def test_rt_sync(log):
    """Test resetting the internal wall-clock reference time."""
    env = RealtimeEnvironment(factor=0.05)
    env.process(process(env, log, 0.01))
    sleep(0.06)  # Simulate massive workload :-)
    env.sync()
    env.run(3)


def test_run_with_untriggered_event(env):
    env = RealtimeEnvironment(factor=0.05)
    excinfo = pytest.raises(RuntimeError, env.run, until=env.event())
    assert str(excinfo.value).startswith(
        'No scheduled events left but "until" event was not triggered:'
    )
===== tests/test_util.py =====
"""
Tests for the utility functions from :mod:`simpy.util`.

"""
import pytest

from simpy import Interrupt
from simpy.events import ConditionValue
from simpy.util import start_delayed, subscribe_at


def test_start_delayed(env):
    def pem(env):
        assert env.now == 5
        yield env.timeout(1)

    start_delayed(env, pem(env), delay=5)
    env.run()


def test_start_delayed_error(env):
    """Check if delayed() raises an error if you pass a negative dt."""

    def pem(env):
        yield env.timeout(1)

    with pytest.raises(ValueError, match='delay.*must be > 0'):
        start_delayed(env, pem(env), delay=-1)


def test_subscribe(env):
    """Check async. interrupt if a process terminates."""

    def child(env):
        yield env.timeout(3)
        return 'ohai'

    def parent(env):
        child_proc = env.process(child(env))
        subscribe_at(child_proc)

        try:
            yield env.event()
        except Interrupt as interrupt:
            assert interrupt.cause is not None  # noqa: PT017
            assert interrupt.cause[0] is child_proc  # noqa: PT017
            assert interrupt.cause[1] == 'ohai'  # noqa: PT017
            assert env.now == 3

    env.process(parent(env))
    env.run()


def test_subscribe_terminated_proc(env):
    """subscribe() proc should send a signal immediately if
    "other" has already terminated.

    """

    def child(env):
        yield env.timeout(1)

    def parent(env):
        child_proc = env.process(child(env))
        yield env.timeout(2)
        pytest.raises(RuntimeError, subscribe_at, child_proc)

    env.process(parent(env))
    env.run()


def test_subscribe_with_join(env):
    """Test that subscribe() works if a process waits for another one."""

    def child(env, i):
        yield env.timeout(i)

    def parent(env):
        child_proc1 = env.process(child(env, 1))
        child_proc2 = env.process(child(env, 2))
        try:
            subscribe_at(child_proc1)
            yield child_proc2
        except Interrupt as interrupt:
            assert env.now == 1
            assert interrupt.cause is not None  # noqa: PT017
            assert interrupt.cause[0] is child_proc1  # noqa: PT017
            assert child_proc2.is_alive

    env.process(parent(env))
    env.run()


def test_subscribe_at_timeout(env):
    """You should be able to subscribe at arbitrary events."""

    def pem(env):
        to = env.timeout(2)
        subscribe_at(to)
        try:
            yield env.timeout(10)
        except Interrupt as interrupt:
            assert interrupt.cause == (to, None)  # noqa: PT017
            assert env.now == 2

    env.process(pem(env))
    env.run()


def test_subscribe_at_timeout_with_value(env):
    """An event's value should be accessible via the interrupt cause."""

    def pem(env):
        val = 'ohai'
        to = env.timeout(2, value=val)
        subscribe_at(to)
        try:
            yield env.timeout(10)
        except Interrupt as interrupt:
            assert interrupt.cause == (to, val)  # noqa: PT017
            assert env.now == 2

    env.process(pem(env))
    env.run()


def test_all_of(env):
    """Wait for all events to be triggered."""

    def parent(env):
        # Start 10 events.
        events = [env.timeout(i, value=i) for i in range(10)]
        results = yield env.all_of(events)

        assert results == {events[i]: i for i in range(10)}
        assert env.now == 9

    env.process(parent(env))
    env.run()


def test_all_of_generator(env):
    """Conditions also work with generators."""

    def parent(env):
        # Start 10 events.
        events = (env.timeout(i, value=i) for i in range(10))
        results = yield env.all_of(events)

        assert list(results.values()) == list(range(10))
        assert env.now == 9

    env.process(parent(env))
    env.run()


def test_wait_for_all_with_errors(env):
    """On default AllOf should fail immediately if one of its events
    fails."""

    def child_with_error(env, value):
        yield env.timeout(value)
        raise RuntimeError('crashing')

    def parent(env):
        events = [
            env.timeout(1, value=1),
            env.process(child_with_error(env, 2)),
            env.timeout(3, value=3),
        ]

        condition = env.all_of(events)
        with pytest.raises(RuntimeError, match='crashing'):
            yield condition

        # Although the condition has failed, interim values are available.
        assert condition._events[0].value == 1
        assert condition._events[1].value.args[0] == 'crashing'
        # The last child has not terminated yet.
        assert not events[2].processed

    env.process(parent(env))
    env.run()


def test_all_of_chaining(env):
    """If a wait_for_all condition A is chained to a wait_for_all condition B,
    B will be merged into A."""

    def parent(env):
        condition_a = env.all_of([env.timeout(i, value=i) for i in range(2)])
        condition_b = env.all_of([env.timeout(i, value=i) for i in range(2)])

        condition_a &= condition_b

        results = yield condition_a
        assert list(results.values()) == [0, 1, 0, 1]

    env.process(parent(env))
    env.run()


def test_all_of_chaining_intermediate_results(env):
    """If a wait_for_all condition A with intermediate results is merged into
    another wait_for_all condition B, the results are copied into condition
    A."""

    def parent(env):
        condition_a = env.all_of([env.timeout(i, value=i) for i in range(2)])
        condition_b = env.all_of([env.timeout(i, value=i) for i in range(2)])

        yield env.timeout(0)

        condition = condition_a & condition_b
        result = ConditionValue()
        condition._populate_value(result)
        assert list(result.values()) == [0, 0]

        results = yield condition
        assert list(results.values()) == [0, 1, 0, 1]

    env.process(parent(env))
    env.run()


def test_all_of_with_triggered_events(env):
    """Processed events can be added to a condition. Confirm this with
    all_of."""

    def parent(env):
        events = [env.timeout(0, value='spam'), env.timeout(1, value='eggs')]
        yield env.timeout(2)

        values = list((yield env.all_of(events)).values())
        assert values == ['spam', 'eggs']

    env.process(parent(env))
    env.run()


def test_any_of(env):
    """Wait for any event to be triggered."""

    def parent(env):
        # Start 10 events.
        events = [env.timeout(i, value=i) for i in range(10)]
        results = yield env.any_of(events)

        assert results == {events[0]: 0}
        assert env.now == 0

    env.process(parent(env))
    env.run()


def test_any_of_with_errors(env):
    """On default any_of should fail if the event has failed too."""

    def child_with_error(env, value):
        yield env.timeout(value)
===== tests/test_version.py =====
"""
Ensure simpy.__version__ is populated.

"""

import simpy


def test_simpy_version():
    assert simpy.__version__.startswith('4.')
[The command completed with exit code 0.]
[Current working directory: /workspace/simpy]
[Python interpreter: /usr/bin/python]
[Command finished with exit code 0]