Extract Operations

ExtractBase

class tiny_blocks.extract.base.ExtractBase(*, uuid: UUID = None, name: str, version: str = 'v1', description: str = None)

Extract Base Block.

Each extraction Block implement the get_iter method. This method return an Iterator of chunked DataFrames

get_iter() Iterator[DataFrame]

Return an iterator of chunked dataframes

The chunksize is defined as kwargs in each extraction block

FromCSV

class tiny_blocks.extract.from_csv.FromCSV(*, uuid: UUID = None, name: Literal['read_csv'] = 'read_csv', version: str = 'v1', description: str = None, path: pydantic.types.FilePath | pydantic.networks.AnyUrl, kwargs: KwargsFromCSV = KwargsFromCSV(sep='|', header='infer', names=None, index_col=None, usecols=None, squeeze=False, prefix=None, mangle_dupe_cols=True, dtype=None, converters=None, engine=None, true_values=None, false_values=None, chunksize=1000, storage_options=None, skipinitialspace=False, skiprows=None, skipfooter=None, nrows=None, na_values=None, keep_default_na=True, na_filter=True, verbose=False, skip_blank_lines=True, parse_dates=None, infer_datetime_format=False, keep_date_col=False, date_parser=None, dayfirst=False, cache_dates=True, compression='infer', thousands=None, decimal='.', lineterminator=None, quotechar=None, quoting=None, doublequote=True, escapechar=None, comment=None, encoding=None, encoding_errors='strict', dialect=None, on_bad_lines='skip', delim_whitespace=False, low_memory=True, memory_map=False, float_precision=None))

ReadCSV Block. Defines the read CSV Operation

Basic example:
>>> import pandas as pd
>>> from tiny_blocks.extract import FromCSV
>>>
>>> read_csv = FromCSV(path="/path/to/file.csv")
>>>
>>> generator = read_csv.get_iter()
>>> df = pd.concat(generator)

See info about Kwargs: https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html

FromSQLTable

class tiny_blocks.extract.from_sql_table.FromSQLTable(*, uuid: UUID = None, name: Literal['read_sql_table'] = 'read_sql_table', version: str = 'v1', description: str = None, dsn_conn: str, table_name: str, kwargs: KwargsFromSQLTable = KwargsFromSQLTable(schma=None, index_col=None, coerce_float=True, parse_dates=None, columns=None, chunksize=1000))

Read SQL Table Block. Defines the read SQL Table Operation.

Basic example:
>>> import pandas as pd
>>> from tiny_blocks.extract import FromSQLTable
>>>
>>> str_conn = "postgresql+psycopg2://user:pass@postgres:5432/db"
>>> read_sql = FromSQLTable(dsn_conn=str_conn, table_name="test")
>>>
>>> generator = read_sql.get_iter()
>>> df = pd.concat(generator)

See info about Kwargs: https://pandas.pydata.org/docs/reference/api/pandas.read_sql_table.html

connect_db() Connection

Opens a DB transaction. Yields a connection to Database defined in dsn_conn.

Parameters set on the connection are:
  • autocommit mode set to True.

  • Connection mode stream_results set as True.

FromSQLQuery

class tiny_blocks.extract.from_sql_query.FromSQLQuery(*, uuid: UUID = None, name: Literal['read_sql'] = 'read_sql', version: str = 'v1', description: str = None, dsn_conn: str, sql: str, kwargs: KwargsFromSQLQuery = KwargsFromSQLQuery(index_col=None, coerce_float=True, params=None, parse_dates=None, chunksize=1000, dtype=None))

Read SQL Query Block. Defines the read SQL Query Operation

Basic example:
>>> import pandas as pd
>>> from tiny_blocks.extract import FromSQLQuery
>>>
>>> str_conn = "postgresql+psycopg2://user:pass@postgres:5432/db"
>>> sql = "select * from test"
>>> read_sql = FromSQLQuery(dsn_conn=str_conn, sql=sql)
>>>
>>> generator = read_sql.get_iter()
>>> df = pd.concat(generator)

See info about Kwargs: https://pandas.pydata.org/docs/reference/api/pandas.read_sql_query.html

connect_db() Connection

Opens a DB transaction. Yields a connection to Database defined in dsn_conn.

Parameters set on the connection are:
  • autocommit mode set to True.

  • Connection mode stream_results set as True.

FromKafka

class tiny_blocks.extract.from_kafka.FromKafka(*, uuid: UUID = None, name: Literal['from_kafka'] = 'from_kafka', version: str = 'v1', description: str = None, kwargs: KwargsFromKafka = KwargsFromKafka(consumer_timeout=1000), topic: str, group_id: str, bootstrap_servers: List[str])

FromKafka Block. Defines the read Kafka Operation

kafka_consumer() KafkaConsumer

Yields a consumer to a Kafka topic.

Parameters set on the connection are:
  • topic.

  • group_id.

  • bootstrap_servers. List of server strings.

  • auto_offset_reset is set to True.

  • enable_auto_commit is set to True.

  • consumer_timeout_ms by default to 1 second.