Source code for noob.testing.nodes

import random
import sqlite3
import string
from collections.abc import Generator
from datetime import datetime
from itertools import count, cycle
from typing import Annotated as A
from typing import Any

import xarray as xr
from faker import Faker

from noob import Name, process_method
from noob.node import Node


[docs] def count_source(limit: int = 1000, start: int = 0) -> Generator[A[int, Name("index")], None, None]: counter = count(start=start) while (val := next(counter)) < limit: yield val
[docs] def letter_source() -> Generator[A[str, Name("letter")]]: yield from cycle(string.ascii_lowercase)
[docs] def word_source() -> Generator[A[str, Name("word")]]: fake = Faker() while True: yield fake.unique.word()
[docs] def multi_words_source(n: int) -> Generator[A[list[str], Name("multi_words")]]: fake = Faker() while True: yield [fake.unique.word() for _ in range(n)]
[docs] def sporadic_word(every: int = 3) -> Generator[A[str, Name("word")] | None, None, None]: fake = Faker() i = 0 while True: i += 1 if i % every == 0: yield fake.unique.word() else: yield None
[docs] def word_counts() -> Generator[tuple[A[str, Name("word")], A[list[int], Name("counts")]]]: fake = Faker() while True: n_counts = random.randint(2, 5) yield fake.unique.word(), [random.randint(1, 100) for _ in range(n_counts)]
[docs] def multiply(left: int, right: int = 2) -> int: """ Return value purposely unnamed, to be used as `{nodename}.value` """ return left * right
[docs] def divide(numerator: int, denominator: int = 5) -> A[float, Name("ratio")]: return numerator / denominator
[docs] def concat(strings: list[str]) -> str: return "".join(strings)
[docs] def exclaim(string: str) -> str: return string + "!"
[docs] def repeat(string: str, times: int) -> str: return string * times
[docs] def dictify(key: str, items: list[Any]) -> dict[str, Any]: return {key: items}
[docs] def error(value: Any) -> None: raise ValueError("This node just emits errors")
[docs] class CountSource(Node): limit: int = 1000 start: int = 0
[docs] def process(self) -> Generator[A[int, Name("index")], None, None]: counter = count(start=self.start) while (val := next(counter)) < self.limit: yield val
[docs] class UnannotatedGenerator(Node): limit: int = 1000 start: int = 0
[docs] def process(self): # noqa: ANN201 counter = count(start=self.start) while (val := next(counter)) < self.limit: yield val
[docs] class Multiply(Node):
[docs] def process(self, left: int, right: int = 2) -> A[int, Name("product")]: return multiply(left=left, right=right)
[docs] class VolumeProcess: def __init__(self, height: int = 2): self.height = height
[docs] def process(self, width: int, depth: int) -> A[int, Name("volume")]: return self.height * multiply(left=width, right=depth)
[docs] class Volume: def __init__(self, height: int = 2): self.height = height
[docs] @process_method def volume(self, width: int, depth: int) -> A[int, Name("volume")]: return self.height * multiply(left=width, right=depth)
[docs] class Now: def __init__(self): self.now = datetime.now()
[docs] @process_method def print(self, prefix: str = "Now: ") -> A[str, Name("timestamp")]: return f"{prefix}{self.now.isoformat()}"
[docs] def array_add_to_left(left: xr.DataArray, right: xr.DataArray) -> xr.DataArray: left += right return left
[docs] class CountSourceDecor: def __init__(self, start: int = 0) -> None: self.gen = count(start=start)
[docs] @process_method def process(self) -> Generator[A[int, Name("count")], None, None]: yield from self.gen
[docs] def input_party( one: int, two: float, three: str, four: bool, five: list, six: dict, seven: set ) -> A[bool, Name("works")]: return True
[docs] def read_db(conn: sqlite3.Connection) -> A[tuple[int, str], Name("payload")]: return conn.cursor().execute("SELECT * FROM users").fetchone()