pytd.writer.BulkImportWriter

class pytd.writer.BulkImportWriter[source]

A writer module that loads Python data to Treasure Data by using td-client-python’s bulk importer.

__init__() None[source]

Methods

__init__()

close()

from_string(writer, **kwargs)

write_dataframe(dataframe, table, if_exists)

Write a given DataFrame to a Treasure Data table.

Attributes

closed

write_dataframe(dataframe: DataFrame, table: Table, if_exists: Literal['error', 'overwrite', 'append', 'ignore'], fmt: Literal['csv', 'msgpack'] = 'csv', keep_list: bool = False, max_workers: int = 5, chunk_record_size: int = 10000, show_progress: bool = False, bulk_import_name: str | None = None, commit_timeout: int | None = None, perform_timeout: int | None = None, perform_wait_callback: Callable[[...], Any] | None = None) None[source]

Write a given DataFrame to a Treasure Data table.

This method internally converts a given pandas.DataFrame into a temporary CSV/msgpack file, and upload the file to Treasure Data via bulk import API.

Note

If you pass a dataframe with Int64 column, the column will be converted as varchar on Treasure Data schema due to BulkImport API restriction.

Parameters:
  • dataframe (pandas.DataFrame) – Data loaded to a target table.

  • table (pytd.table.Table) – Target table.

  • if_exists ({'error', 'overwrite', 'append', 'ignore'}) –

    What happens when a target table already exists.

    • error: raise an exception.

    • overwrite: drop it, recreate it, and insert data.

    • append: insert data. Create if does not exist.

    • ignore: do nothing.

  • fmt ({'csv', 'msgpack'}, default: 'csv') –

    Format for bulk_import.

    • csv

      Convert dataframe to temporary CSV file. Stable option but slower than msgpack option because pytd saves dataframe as temporary CSV file, then td-client converts it to msgpack. Types of columns are guessed by pandas.read_csv and it causes unintended type conversion e.g., 0-padded string "00012" into integer 12.

    • msgpack

      Convert to temporary msgpack.gz file. Fast option but there is a slight difference on type conversion compared to csv.

  • keep_list (boolean, default: False) – If this argument is True, keep list or numpy.ndarray column as list, which will be converted array<T> on Treasure Data table. Each type of element of list will be converted by numpy.array(your_list).tolist(). If True, fmt argument will be overwritten with msgpack.

show_progressboolean, default: False

If this argument is True, shows a TQDM progress bar for chunking data into msgpack format and uploading before performing a bulk import.

A dataframe containing list will be treated array<T> in TD.

>>> import pytd
>>> import numpy as np
>>> import pandas as pd
>>> df = pd.DataFrame(
...     {
...         "a": [[1, 2, 3], [2, 3, 4]],
...         "b": [[0, None, 2], [2, 3, 4]],
...         "c": [np.array([1, np.nan, 3]), [2, 3, 4]]
...     }
... )
>>> client = pytd.Client()
>>> table = pytd.table.Table(client, "mydb", "test")
>>> writer = pytd.writer.BulkImportWriter()
>>> writer.write_dataframe(df, table, if_exists="overwrite", keep_list=True)

In this case, the type of columns will be: {"a": array<int>, "b": array<string>, "c": array<string>}

If you want to set the type after ingestion, you need to run tdclient.Client.update_schema like:

>>> client.api_client.update_schema(
...     "mydb",
...     "test",
...     [
...         ["a", "array<long>", "a"],
...         ["b", "array<int>", "b"],
...         ["c", "array<int>", "c"],
...     ],
... )

Note that numpy.nan will be converted as a string value as "NaN" or "nan", so pytd will convert numpy.nan to None only when the dtype of a ndarray is float. Also, numpy converts integer array including numpy.nan into float array because numpy.nan is a Floating Point Special Value. See also: https://docs.scipy.org/doc/numpy-1.13.0/user/misc.html#ieee-754-floating-point-special-values

Or, you can use Client.load_table_from_dataframe() function as well.

>>> client.load_table_from_dataframe(df, "bulk_import", keep_list=True)
max_workersint, optional, default: 5

The maximum number of threads that can be used to execute the given calls. This is used only when fmt is msgpack.

chunk_record_sizeint, optional, default: 10_000

The number of records to be written in a single file. This is used only when fmt is msgpack.

bulk_import_namestr, optional, default: None

Custom name for the bulk import job. If not provided, a UUID-based name will be automatically generated.

commit_timeoutint, optional, default: None

Timeout in seconds for the bulk import commit operation. If None, no timeout is applied.

perform_timeoutint, optional, default: None

Timeout in seconds for the bulk import perform operation. If None, no timeout is applied.

perform_wait_callbackcallable, optional, default: None

A callable to be called on every tick of wait interval during bulk import job execution.