pytd.writer.BulkImportWriter

class pytd.writer.BulkImportWriter[source]

A writer module that loads Python data to Treasure Data by using td-client-python’s bulk importer.

__init__()[source]

Methods

__init__()

close()

from_string(writer, **kwargs)

write_dataframe(dataframe, table, if_exists)

Write a given DataFrame to a Treasure Data table.

write_dataframe(dataframe, table, if_exists, fmt='csv', keep_list=False)[source]

Write a given DataFrame to a Treasure Data table.

This method internally converts a given pandas.DataFrame into a temporary CSV/msgpack file, and upload the file to Treasure Data via bulk import API.

Note

If you pass a dataframe with Int64 column, the column will be converted as varchar on Treasure Data schema due to BulkImport API restriction.

Parameters:
  • dataframe (pandas.DataFrame) – Data loaded to a target table.

  • table (pytd.table.Table) – Target table.

  • if_exists ({'error', 'overwrite', 'append', 'ignore'}) –

    What happens when a target table already exists.

    • error: raise an exception.

    • overwrite: drop it, recreate it, and insert data.

    • append: insert data. Create if does not exist.

    • ignore: do nothing.

  • fmt ({'csv', 'msgpack'}, default: 'csv') –

    Format for bulk_import.

    • csv

      Convert dataframe to temporary CSV file. Stable option but slower than msgpack option because pytd saves dataframe as temporary CSV file, then td-client converts it to msgpack. Types of columns are guessed by pandas.read_csv and it causes unintended type conversion e.g., 0-padded string "00012" into integer 12.

    • msgpack

      Convert to temporary msgpack.gz file. Fast option but there is a slight difference on type conversion compared to csv.

  • keep_list (boolean, default: False) –

    If this argument is True, keep list or numpy.ndarray column as list, which will be converted array<T> on Treasure Data table. Each type of element of list will be converted by numpy.array(your_list).tolist().

    If True, fmt argument will be overwritten with msgpack.

    Examples

    A dataframe containing list will be treated array<T> in TD.

    >>> import pytd
    >>> import numpy as np
    >>> import pandas as pd
    >>> df = pd.DataFrame(
    ...     {
    ...         "a": [[1, 2, 3], [2, 3, 4]],
    ...         "b": [[0, None, 2], [2, 3, 4]],
    ...         "c": [np.array([1, np.nan, 3]), [2, 3, 4]]
    ...     }
    ... )
    >>> client = pytd.Client()
    >>> table = pytd.table.Table(client, "mydb", "test")
    >>> writer = pytd.writer.BulkImportWriter()
    >>> writer.write_dataframe(df, table, if_exists="overwrite", keep_list=True)
    

    In this case, the type of columns will be: {"a": array<int>, "b": array<string>, "c": array<string>}

    If you want to set the type after ingestion, you need to run tdclient.Client.update_schema like:

    >>> client.api_client.update_schema(
    ...     "mydb",
    ...     "test",
    ...     [
    ...         ["a", "array<long>", "a"],
    ...         ["b", "array<int>", "b"],
    ...         ["c", "array<int>", "c"],
    ...     ],
    ... )
    

    Note that numpy.nan will be converted as a string value as "NaN" or "nan", so pytd will convert numpy.nan to None only when the dtype of a ndarray is float. Also, numpy converts integer array including numpy.nan into float array because numpy.nan is a Floating Point Special Value. See also: https://docs.scipy.org/doc/numpy-1.13.0/user/misc.html#ieee-754-floating-point-special-values

    Or, you can use Client.load_table_from_dataframe() function as well.

    >>> client.load_table_from_dataframe(df, "bulk_import", keep_list=True)