import random
import sqlite3
import string
from collections.abc import Generator
from datetime import datetime
from itertools import count, cycle
from typing import Annotated as A
from typing import Any
import xarray as xr
from faker import Faker
from noob import Name, process_method
from noob.node import Node
[docs]
def count_source(limit: int = 1000, start: int = 0) -> Generator[A[int, Name("index")], None, None]:
counter = count(start=start)
while (val := next(counter)) < limit:
yield val
[docs]
def letter_source() -> Generator[A[str, Name("letter")]]:
yield from cycle(string.ascii_lowercase)
[docs]
def word_source() -> Generator[A[str, Name("word")]]:
fake = Faker()
while True:
yield fake.unique.word()
[docs]
def multi_words_source(n: int) -> Generator[A[list[str], Name("multi_words")]]:
fake = Faker()
while True:
yield [fake.unique.word() for _ in range(n)]
[docs]
def sporadic_word(every: int = 3) -> Generator[A[str, Name("word")] | None, None, None]:
fake = Faker()
i = 0
while True:
i += 1
if i % every == 0:
yield fake.unique.word()
else:
yield None
[docs]
def word_counts() -> Generator[tuple[A[str, Name("word")], A[list[int], Name("counts")]]]:
fake = Faker()
while True:
n_counts = random.randint(2, 5)
yield fake.unique.word(), [random.randint(1, 100) for _ in range(n_counts)]
[docs]
def multiply(left: int, right: int = 2) -> int:
"""
Return value purposely unnamed,
to be used as `{nodename}.value`
"""
return left * right
[docs]
def divide(numerator: int, denominator: int = 5) -> A[float, Name("ratio")]:
return numerator / denominator
[docs]
def concat(strings: list[str]) -> str:
return "".join(strings)
[docs]
def exclaim(string: str) -> str:
return string + "!"
[docs]
def repeat(string: str, times: int) -> str:
return string * times
[docs]
def dictify(key: str, items: list[Any]) -> dict[str, Any]:
return {key: items}
[docs]
def error(value: Any) -> None:
raise ValueError("This node just emits errors")
[docs]
class CountSource(Node):
limit: int = 1000
start: int = 0
[docs]
def process(self) -> Generator[A[int, Name("index")], None, None]:
counter = count(start=self.start)
while (val := next(counter)) < self.limit:
yield val
[docs]
class UnannotatedGenerator(Node):
limit: int = 1000
start: int = 0
[docs]
def process(self): # noqa: ANN201
counter = count(start=self.start)
while (val := next(counter)) < self.limit:
yield val
[docs]
class Multiply(Node):
[docs]
def process(self, left: int, right: int = 2) -> A[int, Name("product")]:
return multiply(left=left, right=right)
[docs]
class VolumeProcess:
def __init__(self, height: int = 2):
self.height = height
[docs]
def process(self, width: int, depth: int) -> A[int, Name("volume")]:
return self.height * multiply(left=width, right=depth)
[docs]
class Volume:
def __init__(self, height: int = 2):
self.height = height
[docs]
@process_method
def volume(self, width: int, depth: int) -> A[int, Name("volume")]:
return self.height * multiply(left=width, right=depth)
[docs]
class Now:
def __init__(self):
self.now = datetime.now()
[docs]
@process_method
def print(self, prefix: str = "Now: ") -> A[str, Name("timestamp")]:
return f"{prefix}{self.now.isoformat()}"
[docs]
def array_add_to_left(left: xr.DataArray, right: xr.DataArray) -> xr.DataArray:
left += right
return left
[docs]
class CountSourceDecor:
def __init__(self, start: int = 0) -> None:
self.gen = count(start=start)
[docs]
@process_method
def process(self) -> Generator[A[int, Name("count")], None, None]:
yield from self.gen
[docs]
def read_db(conn: sqlite3.Connection) -> A[tuple[int, str], Name("payload")]:
return conn.cursor().execute("SELECT * FROM users").fetchone()