Load Operations

LoadBase

class tiny_blocks.load.base.LoadBase(*, uuid: UUID = None, name: str, version: str = 'v1', description: str = None)

Load Base Block

All blocks inheriting the LoadBase class must implement the exhaust method.

exhaust(source: Iterator[DataFrame])

Implement the exhaustion of the incoming iterator.

It is the end of the Pipe.

ToCSV

class tiny_blocks.load.to_csv.ToCSV(*, uuid: UUID = None, name: Literal['to_csv'] = 'to_csv', version: str = 'v1', description: str = None, kwargs: KwargsToCSV = KwargsToCSV(sep='|', na_rep=None, float_format=None, columns=None, header=True, index=False, index_label=None, mode=None, encoding=None, compression='infer', quoting=None, quotechar=None, line_terminator=None, chunksize=1000, date_format=None, doublequote=None, escapechar=None, decimal=None, errors=None, storage_options=None), path: pathlib.Path | pydantic.networks.AnyUrl)

Write CSV Block. Defines the load to CSV Operation

Basic example:
>>> from tiny_blocks.load import ToCSV
>>> from tiny_blocks.extract import FromCSV
>>>
>>> from_csv = FromCSV(path="path/to/source.csv")
>>> to_csv = ToCSV(path="path/to/sink.csv")
>>>
>>> generator = from_csv.get_iter()
>>> to_csv.exhaust(generator)

See info about Kwargs: https://pandas.pydata.org/docs/reference/api/pandas.to_csv.html

ToSQL

class tiny_blocks.load.to_sql.ToSQL(*, uuid: UUID = None, name: Literal['to_sql'] = 'to_sql', version: str = 'v1', description: str = None, dsn_conn: str, table_name: str, kwargs: KwargsToSQL = KwargsToSQL(schma=None, if_exists='append', index=False, index_label=None, dtype=None, chunksize=1000, method=None))

Load SQL Block. Defines the Loading operation to a SQL Database

Basic example:
>>> from tiny_blocks.extract import FromSQLTable
>>> from tiny_blocks.load import ToSQL
>>>
>>> str_conn = "postgresql+psycopg2://user:pass@postgres:5432/db"
>>> from_sql = FromSQLTable(dsn_conn=str_conn, table_name="source")
>>> to_sql = ToSQL(dsn_conn=str_conn, table_name="sink")
>>>
>>> generator = from_sql.get_iter()
>>> to_sql.exhaust(generator)

For more Kwargs info: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html

connect_db() Connection

Opens a DB transaction. Yields a connection to Database defined in dsn_conn.

Parameters set on the connection are:
  • autocommit mode set to True.

  • Connection mode stream_results set as True.

ToKafka

class tiny_blocks.load.to_kafka.ToKafka(*, uuid: UUID = None, name: Literal['to_kafka'] = 'to_kafka', version: str = 'v1', description: str = None, kwargs: KwargsToKafka = KwargsToKafka(consumer_timeout=1000), topic: str, group_id: str, bootstrap_servers: List[str])

Write CSV Block. Defines the load to CSV Operation

kafka_producer() KafkaProducer

Yields a Producer.

Parameters set on the connection are:
  • group_id.

  • bootstrap_servers. List of server strings.

  • auto_offset_reset is set to True.

  • enable_auto_commit is set to True.

  • consumer_timeout_ms by default to 1 second.