pandas-td Compatibility
Compatible Functions
- pytd.pandas_td.connect(apikey=None, endpoint=None, **kwargs)[source]
Create a connection to Treasure Data
- Parameters:
apikey (str, optional) – Treasure Data API key. If not given, a value of environment variable
TD_API_KEY
is used by default.endpoint (str, optional) – Treasure Data API server. If not given,
https://api.treasuredata.com
is used by default. List of available endpoints is: https://docs.treasuredata.com/display/public/PD/Sites+and+Endpointskwargs (dict, optional) – Optional arguments
- Return type:
- pytd.pandas_td.create_engine(url, con=None, header=True, show_progress=5.0, clear_progress=True)[source]
Create a handler for query engine based on a URL.
The following environment variables are used for default connection:
TD_API_KEY API key TD_API_SERVER API server (default:
https://api.treasuredata.com
)- Parameters:
url (str) – Engine descriptor in the form “type://apikey@host/database?params…” Use shorthand notation “type:database?params…” for the default connection. pytd: “params” will be ignored since pytd.QueryEngine does not have any extra parameters.
con (
pytd.Client
, optional) – Handler returned bypytd.pandas_td.connect()
. If not given, default client is used.header (str or bool, default: True) – Prepend comment strings, in the form “– comment”, as a header of queries. Set False to disable header.
show_progress (double or bool, default: 5.0) – Number of seconds to wait before printing progress. Set False to disable progress entirely. pytd: This argument will be ignored.
clear_progress (bool, default: True) – If True, clear progress when query completed. pytd: This argument will be ignored.
- Return type:
Examples
>>> import pytd.pandas_td as td >>> con = td.connect(apikey=apikey, endpoint="https://api.treasuredata.com") >>> engine = td.create_engine("presto:sample_datasets")
- pytd.pandas_td.read_td_query(query, engine, index_col=None, parse_dates=None, distributed_join=False, params=None)[source]
Read Treasure Data query into a DataFrame.
Returns a DataFrame corresponding to the result set of the query string. Optionally provide an index_col parameter to use one of the columns as the index, otherwise default integer index will be used.
While Presto in pytd has two options to issue a query, by either
tdclient
orprestodb
, pytd.pandas_td#read_td_query always uses the former to be compatible with the original pandas-td. Usepytd.Client
to take advantage of the latter option.- Parameters:
query (str) – Query string to be executed.
engine (
pytd.query_engine.QueryEngine
) – Handler returned by create_engine.index_col (str, optional) – Column name to use as index for the returned DataFrame object.
parse_dates (list or dict, optional) –
List of column names to parse as dates
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps
distributed_join (bool, default: False) – (Presto only) If True, distributed join is enabled. If False, broadcast join is used. See https://trino.io/docs/current/admin/properties-general.html
params (dict, optional) –
Parameters to pass to execute method. pytd does not support parameter
type
(‘hive’, ‘presto’), and query type needs to be defined byengine
.Available parameters:
db
(str): use the databaseresult_url
(str): result output URLpriority
(int or str): priority-2: “VERY LOW”
-1: “LOW”
0: “NORMAL”
1: “HIGH”
2: “VERY HIGH”
retry_limit
(int): max number of automatic retrieswait_interval
(int): sleep interval until job finishwait_callback
(function): called every interval against job itselfengine_version
(str): run query with Hive 2 if this parameter is set to"stable"
inHiveQueryEngine
. https://docs.treasuredata.com/display/public/PD/Writing+Hive+Queries
- Returns:
Query result in a DataFrame
- Return type:
pandas.DataFrame
- pytd.pandas_td.read_td_job(job_id, engine, index_col=None, parse_dates=None)[source]
Read Treasure Data job result into a DataFrame.
Returns a DataFrame corresponding to the result set of the job. This method waits for job completion if the specified job is still running. Optionally provide an index_col parameter to use one of the columns as the index, otherwise default integer index will be used.
- Parameters:
job_id (int) – Job ID.
engine (
pytd.query_engine.QueryEngine
) – Handler returned by create_engine.index_col (str, optional) – Column name to use as index for the returned DataFrame object.
parse_dates (list or dict, optional) –
List of column names to parse as dates
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps
- Returns:
Job result in a dataframe
- Return type:
pandas.DataFrame
- pytd.pandas_td.read_td_table(table_name, engine, index_col=None, parse_dates=None, columns=None, time_range=None, limit=10000)[source]
Read Treasure Data table into a DataFrame.
The number of returned rows is limited by “limit” (default 10,000). Setting limit=None means all rows. Be careful when you set limit=None because your table might be very large and the result does not fit into memory.
- Parameters:
table_name (str) – Name of Treasure Data table in database.
engine (
pytd.query_engine.QueryEngine
) – Handler returned by create_engine.index_col (str, optional) – Column name to use as index for the returned DataFrame object.
parse_dates (list or dict, optional) –
List of column names to parse as dates
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps
columns (list, optional) – List of column names to select from table.
time_range (tuple (start, end), optional) – Limit time range to select. “start” and “end” are one of None, integers, strings or datetime objects. “end” is exclusive, not included in the result.
limit (int, default: 10,000) – Maximum number of rows to select.
- Return type:
pandas.DataFrame
- pytd.pandas_td.read_td(query, engine, index_col=None, parse_dates=None, distributed_join=False, params=None)[source]
Read Treasure Data query into a DataFrame.
Returns a DataFrame corresponding to the result set of the query string. Optionally provide an index_col parameter to use one of the columns as the index, otherwise default integer index will be used.
While Presto in pytd has two options to issue a query, by either
tdclient
orprestodb
, pytd.pandas_td#read_td_query always uses the former to be compatible with the original pandas-td. Usepytd.Client
to take advantage of the latter option.- Parameters:
query (str) – Query string to be executed.
engine (
pytd.query_engine.QueryEngine
) – Handler returned by create_engine.index_col (str, optional) – Column name to use as index for the returned DataFrame object.
parse_dates (list or dict, optional) –
List of column names to parse as dates
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps
distributed_join (bool, default: False) – (Presto only) If True, distributed join is enabled. If False, broadcast join is used. See https://trino.io/docs/current/admin/properties-general.html
params (dict, optional) –
Parameters to pass to execute method. pytd does not support parameter
type
(‘hive’, ‘presto’), and query type needs to be defined byengine
.Available parameters:
db
(str): use the databaseresult_url
(str): result output URLpriority
(int or str): priority-2: “VERY LOW”
-1: “LOW”
0: “NORMAL”
1: “HIGH”
2: “VERY HIGH”
retry_limit
(int): max number of automatic retrieswait_interval
(int): sleep interval until job finishwait_callback
(function): called every interval against job itselfengine_version
(str): run query with Hive 2 if this parameter is set to"stable"
inHiveQueryEngine
. https://docs.treasuredata.com/display/public/PD/Writing+Hive+Queries
- Returns:
Query result in a DataFrame
- Return type:
pandas.DataFrame
- pytd.pandas_td.to_td(frame, name, con, if_exists='fail', time_col=None, time_index=None, index=True, index_label=None, chunksize=10000, date_format=None, writer='bulk_import', **kwargs)[source]
Write a DataFrame to a Treasure Data table.
This method converts the dataframe into a series of key-value pairs and send them using the Treasure Data streaming API. The data is divided into chunks of rows (default 10,000) and uploaded separately. If upload failed, the client retries the process for a certain amount of time (max_cumul_retry_delay; default 600 secs). This method may fail and raise an exception when retries did not success, in which case the data may be partially inserted. Use the bulk import utility if you cannot accept partial inserts.
- Parameters:
frame (
pandas.DataFrame
) – DataFrame to be written.name (str) – Name of table to be written, in the form ‘database.table’.
con (
pytd.Client
) – A client for a Treasure Data account returned bypytd.pandas_td.connect()
.if_exists (str, {'error' ('fail'), 'overwrite' ('replace'), 'append', 'ignore'}, default: 'error') –
What happens when a target table already exists. For pandas-td compatibility, ‘error’, ‘overwrite’, ‘append’, ‘ignore’ can respectively be:
fail: If table exists, raise an exception.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if does not exist.
ignore: If table exists, do nothing.
time_col (str, optional) – Column name to use as “time” column for the table. Column type must be integer (unixtime), datetime, or string. If None is given (default), then the current time is used as time values.
time_index (int, optional) – Level of index to use as “time” column for the table. Set 0 for a single index. This parameter implies index=False.
index (bool, default: True) – Write DataFrame index as a column.
index_label (str or sequence, default: None) – Column label for index column(s). If None is given (default) and index is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.
chunksize (int, default: 10,000) – Number of rows to be inserted in each chunk from the dataframe. pytd: This argument will be ignored.
date_format (str, default: None) – Format string for datetime objects
writer (str, {‘bulk_import’, ‘insert_into’, ‘spark’}, or
pytd.writer.Writer
, default: ‘bulk_import’) – A Writer to choose writing method to Treasure Data. If not given or string value, a temporal Writer instance will be created.fmt (str, {'csv', 'msgpack'}, default: 'csv') –
Format for bulk_import.
- csv
Convert dataframe to temporary CSV file. Stable option but slower than msgpack option because pytd saves dataframe as temporary CSV file, then td-client converts it to msgpack. Types of columns are guessed by
pandas.read_csv
and it causes unintended type conversion e.g., 0-padded string"00012"
into integer12
.
- msgpack
Convert to temporary msgpack.gz file. Fast option but there is a slight difference on type conversion compared to csv.
IPython Magics
IPython Magics
IPython magics to access to Treasure Data. Load the magics first of all:
In [1]: %load_ext pytd.pandas_td.ipython
- class pytd.pandas_td.ipython.TDMagics(**kwargs: Any)[source]
- __init__(shell)[source]
Create a configurable given a config config.
- Parameters:
config (Config) – If this is empty, default values are used. If config is a
Config
instance, it will be used to configure the instance.parent (Configurable instance, optional) – The parent Configurable instance of this object.
Notes
Subclasses of Configurable must call the
__init__()
method ofConfigurable
before doing anything else and usingsuper()
:class MyConfigurable(Configurable): def __init__(self, config=None): super(MyConfigurable, self).__init__(config=config) # Then any other code you need to finish initialization.
This ensures that instances will be configured properly.
- class pytd.pandas_td.ipython.DatabasesMagics(**kwargs: Any)[source]
- td_databases(pattern)
List databases in the form of pandas.DataFrame.
%td_databases [<database_name_pattern>]
- Parameters:
<database_name_pattern> (string, optional) – List databases matched to a given pattern. If not given, all existing databases will be listed.
- Return type:
pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_databases sample Out[2]: name count permission created_at updated_at 0 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 348124 administrator 2019-01-23 05:48:11+00:00 2019-01-23 05:48:11+00:00 1 yyyyyyyyy 0 administrator 2017-12-14 07:52:34+00:00 2017-12-14 07:52:34+00:00 2 zzzzzzzzzzzzz 0 administrator 2016-05-25 23:12:06+00:00 2016-05-25 23:12:06+00:00 ... In [3]: %td_databases sample Out[3]: name count permission created_at updated_at 0 sampledb 2 administrator 2014-04-11 22:29:38+00:00 2014-04-11 22:29:38+00:00 1 sample_xxxxxxxx 2 administrator 2017-06-02 23:37:41+00:00 2017-06-02 23:37:41+00:00 2 sample_datasets 8812278 query_only 2014-10-04 01:13:11+00:00 2018-03-16 04:59:06+00:00 ...
- magics = {'cell': {}, 'line': {'td_databases': 'td_databases'}}
- registered = True
- class pytd.pandas_td.ipython.TablesMagics(**kwargs: Any)[source]
- td_tables(pattern)
List tables in databases.
%td_tables [<table_identifier_pattern>]
- Parameters:
<table_identifier_pattern> (string, optional) – List tables matched to a given pattern. Table identifier is represented as
database_name.table_name
. If not given, all existing tables will be listed.- Return type:
pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_tables Out[2]: db_name name count estimated_storage_size last_log_timestamp created_at 0 xxxxx_demo_aa customer_test 70 1047 2018-02-05 06:20:32+00:00 2018-02-05 06:20:24+00:00 1 xxxxx_demo_aa email_log 0 0 1970-01-01 00:00:00+00:00 2018-02-05 07:19:57+00:00 2 yy_wf topk_similar_items 10598 134208 2018-04-16 09:23:57+00:00 2018-04-16 09:59:48+00:00 ... In [3]: %td_tables sample Out[3]: db_name name count estimated_storage_size last_log_timestamp created_at 0 xx_test aaaaaaaa_sample 0 0 1970-01-01 00:00:00+00:00 2015-10-20 17:37:40+00:00 1 sampledb sampletbl 2 843 1970-01-01 00:00:00+00:00 2014-04-11 22:30:08+00:00 2 zzzz_test_db sample_output_tab 4 889 2018-06-06 08:26:20+00:00 2018-06-06 08:27:12+00:00 ...
- magics = {'cell': {}, 'line': {'td_tables': 'td_tables'}}
- registered = True
- class pytd.pandas_td.ipython.JobsMagics(**kwargs: Any)[source]
- td_jobs(line)
List job activities in an account.
%td_jobs
- Return type:
pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_jobs Out[2]: status job_id type start_at query 0 error 448650806 hive 2019-04-12 05:33:36+00:00 with null_samples as (\n select\n id,\n ... 1 success 448646994 presto 2019-04-12 05:23:29+00:00 -- read_td_query\n-- set session distributed_j... 2 success 448646986 presto 2019-04-12 05:23:27+00:00 -- read_td_query\n-- set session distributed_j... ...
- magics = {'cell': {}, 'line': {'td_jobs': 'td_jobs'}}
- registered = True
- class pytd.pandas_td.ipython.UseMagics(**kwargs: Any)[source]
- td_use(line)
Use a specific database.
This magic pushes all table names in a specified database into the current namespace.
%td_use [<database_name>]
- Parameters:
<database_name> (string) – Database name.
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_use sample_datasets INFO: import nasdaq INFO: import www_access In [3]: nasdaq # describe table columns in the form of DataFrame Out[3]: <pytd.pandas_td.ipython.MagicTable at 0x117651908>
- magics = {'cell': {}, 'line': {'td_use': 'td_use'}}
- registered = True
- class pytd.pandas_td.ipython.QueryMagics(**kwargs: Any)[source]
-
- td_job(line)
Get job result.
%td_job [--pivot] [--plot] [--dry-run] [--verbose] [--connection <connection>] [--dropna] [--out <out>] [--out-file <out_file>] [--quiet] [--timezone <timezone>] job_id
- Parameters:
<job_id> (integer) – Job ID.
--pivot (optional) – Run pivot_table against dimensions.
--plot (optional) – Plot the query result.
--dry_run (optional) – Output translated code without running query.
-n (optional) – Output translated code without running query.
--verbose (optional) – Verbose output.
-v (optional) – Verbose output.
<connection> (-c) – Use specified connection.
<connection> – Use specified connection.
--dropna (optional) – Drop columns if all values are NA.
-d (optional) – Drop columns if all values are NA.
<out> (-o) – Store the result to variable.
<out> – Store the result to variable.
<out_file> (-O) – Store the result to file.
<out_file> – Store the result to file.
--quiet (optional) – Disable progress output.
-q (optional) – Disable progress output.
<timezone> (-T) – Set timezone to time index.
<timezone> – Set timezone to time index.
- Return type:
pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_job 451709460 # select * from sample_datasets.nasdaq limit 5 Out[2]: symbol open volume high low close time 1992-08-25 16:00:00 ATRO 0.0 3900 0.7076 0.7076 0.7076 1992-08-25 16:00:00 ALOG 0.0 11200 11.0000 10.6250 11.0000 1992-08-25 16:00:00 ATAX 0.0 11400 11.3750 11.0000 11.0000 1992-08-25 16:00:00 ATRI 0.0 5400 14.3405 14.0070 14.2571 1992-08-25 16:00:00 ABMD 0.0 38800 5.7500 5.2500 5.6875
- td_hive(line, cell)
Run a Hive query.
%%td_hive [<database>] [--pivot] [--plot] [--dry-run] [--verbose] [--connection <connection>] [--dropna] [--out <out>] [--out-file <out_file>] [--quiet] [--timezone <timezone>] <query>
- Parameters:
<query> (string) – Hive query.
<database> (string, optional) – Database name.
--pivot (optional) – Run pivot_table against dimensions.
--plot (optional) – Plot the query result.
--dry_run (optional) – Output translated code without running query.
-n (optional) – Output translated code without running query.
--verbose (optional) – Verbose output.
-v (optional) – Verbose output.
<connection> (-c) – Use specified connection.
<connection> – Use specified connection.
--dropna (optional) – Drop columns if all values are NA.
-d (optional) – Drop columns if all values are NA.
<out> (-o) – Store the result to variable.
<out> – Store the result to variable.
<out_file> (-O) – Store the result to file.
<out_file> – Store the result to file.
--quiet (optional) – Disable progress output.
-q (optional) – Disable progress output.
<timezone> (-T) – Set timezone to time index.
<timezone> – Set timezone to time index.
- Return type:
pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %%td_hive ...: select hivemall_version() ...: Out[2]: _c0 0 0.6.0-SNAPSHOT-201901-r01
- td_presto(line, cell)
Run a Presto query.
%%td_presto [<database>] [--pivot] [--plot] [--dry-run] [--verbose] [--connection <connection>] [--dropna] [--out <out>] [--out-file <out_file>] [--quiet] [--timezone <timezone>] <query>
- Parameters:
<query> (string) – Presto query.
<database> (string, optional) – Database name.
--pivot (optional) – Run pivot_table against dimensions.
--plot (optional) – Plot the query result.
--dry_run (optional) – Output translated code without running query.
-n (optional) – Output translated code without running query.
--verbose (optional) – Verbose output.
-v (optional) – Verbose output.
<connection> (-c) – Use specified connection.
<connection> – Use specified connection.
--dropna (optional) – Drop columns if all values are NA.
-d (optional) – Drop columns if all values are NA.
<out> (-o) – Store the result to variable.
<out> – Store the result to variable.
<out_file> (-O) – Store the result to file.
<out_file> – Store the result to file.
--quiet (optional) – Disable progress output.
-q (optional) – Disable progress output.
<timezone> (-T) – Set timezone to time index.
<timezone> – Set timezone to time index.
- Return type:
pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %%td_presto ...: select * from sample_datasets.nasdaq limit 5 ...: Out[2]: symbol open volume high low close time 1989-01-26 16:00:00 SMTC 0.0 8000 0.4532 0.4532 0.4532 1989-01-26 16:00:00 SEIC 0.0 163200 0.7077 0.6921 0.7025 1989-01-26 16:00:00 SIGI 0.0 2800 3.9610 3.8750 3.9610 1989-01-26 16:00:00 NAVG 0.0 1800 14.6740 14.1738 14.6740 1989-01-26 16:00:00 MOCO 0.0 71101 3.6722 3.5609 3.5980
- magics = {'cell': {'td_hive': 'td_hive', 'td_presto': 'td_presto'}, 'line': {'td_job': 'td_job'}}
- registered = True