pytd.writer.BulkImportWriter
- class pytd.writer.BulkImportWriter[source]
A writer module that loads Python data to Treasure Data by using td-client-python’s bulk importer.
Methods
__init__()close()from_string(writer, **kwargs)write_dataframe(dataframe, table, if_exists)Write a given DataFrame to a Treasure Data table.
Attributes
closed- write_dataframe(dataframe: DataFrame, table: Table, if_exists: Literal['error', 'overwrite', 'append', 'ignore'], fmt: Literal['csv', 'msgpack'] = 'csv', keep_list: bool = False, max_workers: int = 5, chunk_record_size: int = 10000, show_progress: bool = False, bulk_import_name: str | None = None, commit_timeout: int | None = None, perform_timeout: int | None = None, perform_wait_callback: Callable[[...], Any] | None = None) None[source]
Write a given DataFrame to a Treasure Data table.
This method internally converts a given
pandas.DataFrameinto a temporary CSV/msgpack file, and upload the file to Treasure Data via bulk import API.Note
If you pass a dataframe with
Int64column, the column will be converted asvarcharon Treasure Data schema due to BulkImport API restriction.- Parameters:
dataframe (
pandas.DataFrame) – Data loaded to a target table.table (
pytd.table.Table) – Target table.if_exists ({'error', 'overwrite', 'append', 'ignore'}) –
What happens when a target table already exists.
error: raise an exception.
overwrite: drop it, recreate it, and insert data.
append: insert data. Create if does not exist.
ignore: do nothing.
fmt ({'csv', 'msgpack'}, default: 'csv') –
Format for bulk_import.
- csv
Convert dataframe to temporary CSV file. Stable option but slower than msgpack option because pytd saves dataframe as temporary CSV file, then td-client converts it to msgpack. Types of columns are guessed by
pandas.read_csvand it causes unintended type conversion e.g., 0-padded string"00012"into integer12.
- msgpack
Convert to temporary msgpack.gz file. Fast option but there is a slight difference on type conversion compared to csv.
keep_list (boolean, default: False) – If this argument is True, keep list or numpy.ndarray column as list, which will be converted array<T> on Treasure Data table. Each type of element of list will be converted by
numpy.array(your_list).tolist(). If True,fmtargument will be overwritten withmsgpack.
- show_progressboolean, default: False
If this argument is True, shows a TQDM progress bar for chunking data into msgpack format and uploading before performing a bulk import.
A dataframe containing list will be treated array<T> in TD.
>>> import pytd >>> import numpy as np >>> import pandas as pd >>> df = pd.DataFrame( ... { ... "a": [[1, 2, 3], [2, 3, 4]], ... "b": [[0, None, 2], [2, 3, 4]], ... "c": [np.array([1, np.nan, 3]), [2, 3, 4]] ... } ... ) >>> client = pytd.Client() >>> table = pytd.table.Table(client, "mydb", "test") >>> writer = pytd.writer.BulkImportWriter() >>> writer.write_dataframe(df, table, if_exists="overwrite", keep_list=True)
In this case, the type of columns will be:
{"a": array<int>, "b": array<string>, "c": array<string>}If you want to set the type after ingestion, you need to run
tdclient.Client.update_schemalike:>>> client.api_client.update_schema( ... "mydb", ... "test", ... [ ... ["a", "array<long>", "a"], ... ["b", "array<int>", "b"], ... ["c", "array<int>", "c"], ... ], ... )
Note that
numpy.nanwill be converted as a string value as"NaN"or"nan", so pytd will convertnumpy.nantoNoneonly when the dtype of a ndarray is float. Also, numpy converts integer array includingnumpy.naninto float array becausenumpy.nanis a Floating Point Special Value. See also: https://docs.scipy.org/doc/numpy-1.13.0/user/misc.html#ieee-754-floating-point-special-valuesOr, you can use
Client.load_table_from_dataframe()function as well.>>> client.load_table_from_dataframe(df, "bulk_import", keep_list=True)
- max_workersint, optional, default: 5
The maximum number of threads that can be used to execute the given calls. This is used only when
fmtismsgpack.- chunk_record_sizeint, optional, default: 10_000
The number of records to be written in a single file. This is used only when
fmtismsgpack.- bulk_import_namestr, optional, default: None
Custom name for the bulk import job. If not provided, a UUID-based name will be automatically generated.
- commit_timeoutint, optional, default: None
Timeout in seconds for the bulk import commit operation. If None, no timeout is applied.
- perform_timeoutint, optional, default: None
Timeout in seconds for the bulk import perform operation. If None, no timeout is applied.
- perform_wait_callbackcallable, optional, default: None
A callable to be called on every tick of wait interval during bulk import job execution.