Skip to content

Stream class

ravyn.core.datastructures.stream.Stream

Bases: ResponseContainer[StreamingResponse]

iterator instance-attribute

iterator

Any iterable function.

to_response

to_response(headers, media_type, status_code, app)
Source code in ravyn/core/datastructures/stream.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def to_response(
    self,
    headers: dict[str, Any],
    media_type: Union["MediaType", str],
    status_code: int,
    app: Type["Ravyn"],
) -> StreamingResponse:  # pragma: no cover
    return StreamingResponse(
        background=self.background,
        content=(
            self.iterator
            if isinstance(self.iterator, (Iterable, AsyncIterable))
            else self.iterator()
        ),
        headers=headers,
        media_type=media_type,
        status_code=status_code,
    )