Load Operations
LoadBase
- class tiny_blocks.load.base.LoadBase(*, uuid: UUID = None, name: str, version: str = 'v1', description: str = None)
Load Base Block
All blocks inheriting the LoadBase class must implement the exhaust method.
- exhaust(source: Iterator[DataFrame])
Implement the exhaustion of the incoming iterator.
It is the end of the Pipe.
ToCSV
- class tiny_blocks.load.to_csv.ToCSV(*, uuid: UUID = None, name: Literal['to_csv'] = 'to_csv', version: str = 'v1', description: str = None, kwargs: KwargsToCSV = KwargsToCSV(sep='|', na_rep=None, float_format=None, columns=None, header=True, index=False, index_label=None, mode=None, encoding=None, compression='infer', quoting=None, quotechar=None, line_terminator=None, chunksize=1000, date_format=None, doublequote=None, escapechar=None, decimal=None, errors=None, storage_options=None), path: pathlib.Path | pydantic.networks.AnyUrl)
Write CSV Block. Defines the load to CSV Operation
- Basic example:
>>> from tiny_blocks.load import ToCSV >>> from tiny_blocks.extract import FromCSV >>> >>> from_csv = FromCSV(path="path/to/source.csv") >>> to_csv = ToCSV(path="path/to/sink.csv") >>> >>> generator = from_csv.get_iter() >>> to_csv.exhaust(generator)
See info about Kwargs: https://pandas.pydata.org/docs/reference/api/pandas.to_csv.html
ToSQL
- class tiny_blocks.load.to_sql.ToSQL(*, uuid: UUID = None, name: Literal['to_sql'] = 'to_sql', version: str = 'v1', description: str = None, dsn_conn: str, table_name: str, kwargs: KwargsToSQL = KwargsToSQL(schma=None, if_exists='append', index=False, index_label=None, dtype=None, chunksize=1000, method=None))
Load SQL Block. Defines the Loading operation to a SQL Database
- Basic example:
>>> from tiny_blocks.extract import FromSQLTable >>> from tiny_blocks.load import ToSQL >>> >>> str_conn = "postgresql+psycopg2://user:pass@postgres:5432/db" >>> from_sql = FromSQLTable(dsn_conn=str_conn, table_name="source") >>> to_sql = ToSQL(dsn_conn=str_conn, table_name="sink") >>> >>> generator = from_sql.get_iter() >>> to_sql.exhaust(generator)
For more Kwargs info: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
- connect_db() Connection
Opens a DB transaction. Yields a connection to Database defined in dsn_conn.
- Parameters set on the connection are:
autocommit mode set to True.
Connection mode stream_results set as True.
ToKafka
- class tiny_blocks.load.to_kafka.ToKafka(*, uuid: UUID = None, name: Literal['to_kafka'] = 'to_kafka', version: str = 'v1', description: str = None, kwargs: KwargsToKafka = KwargsToKafka(consumer_timeout=1000), topic: str, group_id: str, bootstrap_servers: List[str])
Write CSV Block. Defines the load to CSV Operation
- kafka_producer() KafkaProducer
Yields a Producer.
- Parameters set on the connection are:
group_id.
bootstrap_servers. List of server strings.
auto_offset_reset is set to True.
enable_auto_commit is set to True.
consumer_timeout_ms by default to 1 second.