pytd¶
pytd provides user-friendly interfaces to Treasure Data’s REST APIs, Presto query engine, and Plazma primary storage.
The seamless connection allows your Python code to efficiently read/write a large volume of data from/to Treasure Data. Eventually, pytd makes your day-to-day data analytics work more productive.
Installation¶
pip install pytd
Usage¶
Set your API
key
and
endpoint
to the environment variables, TD_API_KEY
and TD_API_SERVER
,
respectively, and create a client instance:
import pytd
client = pytd.Client(database='sample_datasets')
# or, hard-code your API key, endpoint, and/or query engine:
# >>> pytd.Client(apikey='1/XXX', endpoint='https://api.treasuredata.com/', database='sample_datasets', default_engine='presto')
Query in Treasure Data¶
Issue Presto query and retrieve the result:
client.query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1')
# {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ['AAME', 9252], ..., ['ZUMZ', 2364]]}
In case of Hive:
client.query('select hivemall_version()', engine='hive')
# {'columns': ['_c0'], 'data': [['0.6.0-SNAPSHOT-201901-r01']]} (as of Feb, 2019)
It is also possible to explicitly initialize pytd.Client
for Hive:
client_hive = pytd.Client(database='sample_datasets', default_engine='hive')
client_hive.query('select hivemall_version()')
Write data to Treasure Data¶
Data represented as pandas.DataFrame
can be written to Treasure Data
as follows:
import pandas as pd
df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
client.load_table_from_dataframe(df, 'takuti.foo', writer='bulk_import', if_exists='overwrite')
For the writer
option, pytd supports three different ways to ingest
data to Treasure Data:
Bulk Import API:
bulk_import
(default)Convert data into a CSV file and upload in the batch fashion.
Presto INSERT INTO query:
insert_into
Insert every single row in
DataFrame
by issuing an INSERT INTO query through the Presto query engine.Recommended only for a small volume of data.
td-spark:
spark
Local customized Spark instance directly writes
DataFrame
to Treasure Data’s primary storage system.
Enabling Spark Writer¶
Since td-spark gives special access to the main storage system via PySpark, follow the instructions below:
Contact support@treasuredata.com to activate the permission to your Treasure Data account.
Install pytd with
[spark]
option if you use the third option:pip install pytd[spark]
If you want to use existing td-spark JAR file, creating SparkWriter
with td_spark_path
option would be helpful.
from pytd.writer import SparkWriter
writer = SparkWriter(apikey='1/XXX', endpoint='https://api.treasuredata.com/', td_spark_path='/path/to/td-spark-assembly.jar')
client.load_table_from_dataframe(df, 'mydb.bar', writer=writer, if_exists='overwrite')
How to replace pandas-td¶
pytd offers pandas-td-compatible functions that provide the same functionalities more efficiently. If you are still using pandas-td, we recommend you to switch to pytd as follows.
First, install the package from PyPI:
pip install pytd
# or, `pip install pytd[spark]` if you wish to use `to_td`
Next, make the following modifications on the import statements.
Before:
import pandas_td as td
In [1]: %%load_ext pandas_td.ipython
After:
import pytd.pandas_td as td
In [1]: %%load_ext pytd.pandas_td.ipython
Consequently, all pandas_td
code should keep running correctly with
pytd
. Report an issue from
here if you
noticed any incompatible behaviors.
More Examples¶
Usage Guides¶
Working with DB-API¶
pytd implements Python Database API Specification v2.0 with the help of prestodb/presto-python-client.
Connect to the API first:
from pytd.dbapi import connect
conn = connect(pytd.Client(database='sample_datasets'))
# or, connect with Hive:
# >>> conn = connect(pytd.Client(database='sample_datasets', default_engine='hive'))
Cursor
defined by the specification allows us to flexibly fetch
query results from a custom function:
def query(sql, connection):
cur = connection.cursor()
cur.execute(sql)
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
return {'data': rows, 'columns': columns}
query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn)
Below is an example of generator-based iterative retrieval, just like pandas.DataFrame.iterrows:
def iterrows(sql, connection):
cur = connection.cursor()
cur.execute(sql)
index = 0
columns = None
while True:
row = cur.fetchone()
if row is None:
break
if columns is None:
columns = [desc[0] for desc in cur.description]
yield index, dict(zip(columns, row))
index += 1
for index, row in iterrows('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn):
print(index, row)
# 0 {'cnt': 590, 'symbol': 'AAIT'}
# 1 {'cnt': 82, 'symbol': 'AAL'}
# 2 {'cnt': 9252, 'symbol': 'AAME'}
# 3 {'cnt': 253, 'symbol': 'AAOI'}
# 4 {'cnt': 5980, 'symbol': 'AAON'}
# ...
Use Existing td-spark-assembly.jar File¶
If you want to use existing td-spark JAR file, creating SparkWriter
with td_spark_path
option would be helpful. You can pass a writer to
connect()
function.
import pytd
import pytd.pandas_td as td
import pandas as pd
apikey = '1/XXX'
endpoint = 'https://api.treasuredata.com/'
writer = pytd.writer.SparkWriter(apikey=apikey, endpoint=endpoint, td_spark_path='/path/to/td-spark-assembly.jar')
con = td.connect(apikey=apikey, endpoint=endpoint, writer=writer)
df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
td.to_td(df, 'mydb.buzz', con, if_exists='replace', index=False)
API Reference¶
API Reference¶
Query Engines¶
|
An interface to Treasure Data query engine. |
|
An interface to Treasure Data Presto query engine. |
|
An interface to Treasure Data Hive query engine. |
Writers¶
Methods |
|
A writer module that loads Python data to Treasure Data by issueing INSERT INTO query in Presto. |
|
A writer module that loads Python data to Treasure Data by using td-client-python’s bulk importer. |
|
|
A writer module that loads Python data to Treasure Data. |
Table¶
|
A table controller module. |
pandas-td Compatibility¶
Compatible Functions¶
-
pytd.pandas_td.
create_engine
(url, con=None, header=True, show_progress=5.0, clear_progress=True)[source]¶ Create a handler for query engine based on a URL.
The following environment variables are used for default connection:
TD_API_KEY API key TD_API_SERVER API server (default: https://api.treasuredata.com)
- Parameters
- urlstring
Engine descriptor in the form “type://apikey@host/database?params…” Use shorthand notation “type:database?params…” for the default connection. pytd: “params” will be ignored since pytd.QueryEngine does not have any extra parameters.
- conpytd.Client, optional
Handler returned by pytd.pandas_td.connect. If not given, default client is used.
- headerstring or boolean, default: True
Prepend comment strings, in the form “– comment”, as a header of queries. Set False to disable header.
- show_progressdouble or boolean, default: 5.0
Number of seconds to wait before printing progress. Set False to disable progress entirely. pytd: This argument will be ignored.
- clear_progressboolean, default: True
If True, clear progress when query completed. pytd: This argument will be ignored.
- Returns
- QueryEngine
-
pytd.pandas_td.
read_td_query
(query, engine, index_col=None, parse_dates=None, distributed_join=False, params=None)[source]¶ Read Treasure Data query into a DataFrame.
Returns a DataFrame corresponding to the result set of the query string. Optionally provide an index_col parameter to use one of the columns as the index, otherwise default integer index will be used.
- Parameters
- querystring
Query string to be executed.
- engineQueryEngine
Handler returned by create_engine.
- index_colstring, optional
Column name to use as index for the returned DataFrame object.
- parse_dateslist or dict, optional
List of column names to parse as dates
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps
- distributed_joinboolean, default: False
(Presto only) If True, distributed join is enabled. If False, broadcast join is used. See https://prestodb.io/docs/current/release/release-0.77.html
- paramsdict, optional
Parameters to pass to execute method. pytd does not support parameter
type
(‘hive’, ‘presto’), and query type needs to be defined byengine
.Available parameters:
db
(str): use the databaseresult_url
(str): result output URLpriority
(int or str): priority-2: “VERY LOW”
-1: “LOW”
0: “NORMAL”
1: “HIGH”
2: “VERY HIGH”
retry_limit
(int): max number of automatic retrieswait_interval
(int): sleep interval until job finishwait_callback
(function): called every interval against job itself
- Returns
- DataFrame
-
pytd.pandas_td.
read_td_job
(job_id, engine, index_col=None, parse_dates=None)[source]¶ Read Treasure Data job result into a DataFrame.
Returns a DataFrame corresponding to the result set of the job. This method waits for job completion if the specified job is still running. Optionally provide an index_col parameter to use one of the columns as the index, otherwise default integer index will be used.
- Parameters
- job_idinteger
Job ID.
- engineQueryEngine
Handler returned by create_engine.
- index_colstring, optional
Column name to use as index for the returned DataFrame object.
- parse_dateslist or dict, optional
List of column names to parse as dates
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps
- Returns
- DataFrame
-
pytd.pandas_td.
read_td_table
(table_name, engine, index_col=None, parse_dates=None, columns=None, time_range=None, limit=10000)[source]¶ Read Treasure Data table into a DataFrame.
The number of returned rows is limited by “limit” (default 10,000). Setting limit=None means all rows. Be careful when you set limit=None because your table might be very large and the result does not fit into memory.
- Parameters
- table_namestring
Name of Treasure Data table in database.
- engineQueryEngine
Handler returned by create_engine.
- index_colstring, optional
Column name to use as index for the returned DataFrame object.
- parse_dateslist or dict, optional
List of column names to parse as dates
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps
- columnslist, optional
List of column names to select from table.
- time_rangetuple (start, end), optional
Limit time range to select. “start” and “end” are one of None, integers, strings or datetime objects. “end” is exclusive, not included in the result.
- limitint, default: 10,000
Maximum number of rows to select.
- Returns
- DataFrame
-
pytd.pandas_td.
read_td
(query, engine, index_col=None, parse_dates=None, distributed_join=False, params=None)[source]¶ Read Treasure Data query into a DataFrame.
Returns a DataFrame corresponding to the result set of the query string. Optionally provide an index_col parameter to use one of the columns as the index, otherwise default integer index will be used.
- Parameters
- querystring
Query string to be executed.
- engineQueryEngine
Handler returned by create_engine.
- index_colstring, optional
Column name to use as index for the returned DataFrame object.
- parse_dateslist or dict, optional
List of column names to parse as dates
Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps
- distributed_joinboolean, default: False
(Presto only) If True, distributed join is enabled. If False, broadcast join is used. See https://prestodb.io/docs/current/release/release-0.77.html
- paramsdict, optional
Parameters to pass to execute method. pytd does not support parameter
type
(‘hive’, ‘presto’), and query type needs to be defined byengine
.Available parameters:
db
(str): use the databaseresult_url
(str): result output URLpriority
(int or str): priority-2: “VERY LOW”
-1: “LOW”
0: “NORMAL”
1: “HIGH”
2: “VERY HIGH”
retry_limit
(int): max number of automatic retrieswait_interval
(int): sleep interval until job finishwait_callback
(function): called every interval against job itself
- Returns
- DataFrame
-
pytd.pandas_td.
to_td
(frame, name, con, if_exists='fail', time_col=None, time_index=None, index=True, index_label=None, chunksize=10000, date_format=None, writer='bulk_import')[source]¶ Write a DataFrame to a Treasure Data table.
This method converts the dataframe into a series of key-value pairs and send them using the Treasure Data streaming API. The data is divided into chunks of rows (default 10,000) and uploaded separately. If upload failed, the client retries the process for a certain amount of time (max_cumul_retry_delay; default 600 secs). This method may fail and raise an exception when retries did not success, in which case the data may be partially inserted. Use the bulk import utility if you cannot accept partial inserts.
- Parameters
- frameDataFrame
DataFrame to be written.
- namestring
Name of table to be written, in the form ‘database.table’.
- conpytd.Client
A client for a Treasure Data account returned by pytd.pandas_td.connect.
- if_exists{‘error’ (‘fail’), ‘overwrite’ (‘replace’), ‘append’, ‘ignore’}, default: ‘error’
What happens when a target table already exists. For pandas-td compatibility, ‘error’, ‘overwrite’, ‘append’, ‘ignore’ can respectively be:
fail: If table exists, raise an exception.
replace: If table exists, drop it, recreate it, and insert data.
append: If table exists, insert data. Create if does not exist.
ignore: If table exists, do nothing.
- time_colstring, optional
Column name to use as “time” column for the table. Column type must be integer (unixtime), datetime, or string. If None is given (default), then the current time is used as time values.
- time_indexint, optional
Level of index to use as “time” column for the table. Set 0 for a single index. This parameter implies index=False.
- indexboolean, default: True
Write DataFrame index as a column.
- index_labelstring or sequence, default: None
Column label for index column(s). If None is given (default) and index is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.
- chunksizeint, default: 10,000
Number of rows to be inserted in each chunk from the dataframe. pytd: This argument will be ignored.
- date_formatstring, default: None
Format string for datetime objects
- writerstring, {‘bulk_import’, ‘insert_into’, ‘spark’}, or pytd.writer.Writer, default: ‘bulk_import’
A Writer to choose writing method to Treasure Data. If not given or string value, a temporal Writer instance will be created.
IPython Magics¶
IPython Magics
IPython magics to access to Treasure Data. Load the magics first of all:
In [1]: %load_ext pytd.pandas_td.ipython
-
class
pytd.pandas_td.ipython.
TDMagics
(shell)[source]¶ -
__init__
(self, shell)[source]¶ Create a configurable given a config config.
- Parameters
- configConfig
If this is empty, default values are used. If config is a
Config
instance, it will be used to configure the instance.- parentConfigurable instance, optional
The parent Configurable instance of this object.
Notes
Subclasses of Configurable must call the
__init__()
method ofConfigurable
before doing anything else and usingsuper()
:class MyConfigurable(Configurable): def __init__(self, config=None): super(MyConfigurable, self).__init__(config=config) # Then any other code you need to finish initialization.
This ensures that instances will be configured properly.
-
-
class
pytd.pandas_td.ipython.
DatabasesMagics
(shell)[source]¶ -
td_databases
(self, pattern)¶ List databases in the form of pandas.DataFrame.
%td_databases [<database_name_pattern>]
- Parameters
- ``<database_name_pattern>``string, optional
List databases matched to a given pattern. If not given, all existing databases will be listed.
- Returns
- pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_databases sample Out[2]: name count permission created_at updated_at 0 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 348124 administrator 2019-01-23 05:48:11+00:00 2019-01-23 05:48:11+00:00 1 yyyyyyyyy 0 administrator 2017-12-14 07:52:34+00:00 2017-12-14 07:52:34+00:00 2 zzzzzzzzzzzzz 0 administrator 2016-05-25 23:12:06+00:00 2016-05-25 23:12:06+00:00 ... In [3]: %td_databases sample Out[3]: name count permission created_at updated_at 0 sampledb 2 administrator 2014-04-11 22:29:38+00:00 2014-04-11 22:29:38+00:00 1 sample_xxxxxxxx 2 administrator 2017-06-02 23:37:41+00:00 2017-06-02 23:37:41+00:00 2 sample_datasets 8812278 query_only 2014-10-04 01:13:11+00:00 2018-03-16 04:59:06+00:00 ...
-
magics
= {'cell': {}, 'line': {'td_databases': 'td_databases'}}¶
-
registered
= True¶
-
-
class
pytd.pandas_td.ipython.
TablesMagics
(shell)[source]¶ -
td_tables
(self, pattern)¶ List tables in databases.
%td_tables [<table_identifier_pattern>]
- Parameters
- ``<table_identifier_pattern>``string, optional
List tables matched to a given pattern. Table identifier is represented as
database_name.table_name
. If not given, all existing tables will be listed.
- Returns
- pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_tables Out[2]: db_name name count estimated_storage_size last_log_timestamp created_at 0 xxxxx_demo_aa customer_test 70 1047 2018-02-05 06:20:32+00:00 2018-02-05 06:20:24+00:00 1 xxxxx_demo_aa email_log 0 0 1970-01-01 00:00:00+00:00 2018-02-05 07:19:57+00:00 2 yy_wf topk_similar_items 10598 134208 2018-04-16 09:23:57+00:00 2018-04-16 09:59:48+00:00 ... In [3]: %td_tables sample Out[3]: db_name name count estimated_storage_size last_log_timestamp created_at 0 xx_test aaaaaaaa_sample 0 0 1970-01-01 00:00:00+00:00 2015-10-20 17:37:40+00:00 1 sampledb sampletbl 2 843 1970-01-01 00:00:00+00:00 2014-04-11 22:30:08+00:00 2 zzzz_test_db sample_output_tab 4 889 2018-06-06 08:26:20+00:00 2018-06-06 08:27:12+00:00 ...
-
magics
= {'cell': {}, 'line': {'td_tables': 'td_tables'}}¶
-
registered
= True¶
-
-
class
pytd.pandas_td.ipython.
JobsMagics
(shell)[source]¶ -
td_jobs
(self, line)¶ List job activities in an account.
%td_jobs
- Returns
- pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_jobs Out[2]: status job_id type start_at query 0 error 448650806 hive 2019-04-12 05:33:36+00:00 with null_samples as (\n select\n id,\n ... 1 success 448646994 presto 2019-04-12 05:23:29+00:00 -- read_td_query\n-- set session distributed_j... 2 success 448646986 presto 2019-04-12 05:23:27+00:00 -- read_td_query\n-- set session distributed_j... ...
-
magics
= {'cell': {}, 'line': {'td_jobs': 'td_jobs'}}¶
-
registered
= True¶
-
-
class
pytd.pandas_td.ipython.
UseMagics
(shell)[source]¶ -
td_use
(self, line)¶ Use a specific database.
This magic pushes all table names in a specified database into the current namespace.
%td_use [<database_name>]
- Parameters
- ``<database_name>``string
Database name.
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_use sample_datasets INFO: import nasdaq INFO: import www_access In [3]: nasdaq # describe table columns in the form of DataFrame Out[3]: <pytd.pandas_td.ipython.MagicTable at 0x117651908>
-
magics
= {'cell': {}, 'line': {'td_use': 'td_use'}}¶
-
registered
= True¶
-
-
class
pytd.pandas_td.ipython.
QueryMagics
(shell)[source]¶ -
-
td_job
(self, line)¶ Get job result.
%td_job [--pivot] [--plot] [--dry-run] [--verbose] [--connection <connection>] [--dropna] [--out <out>] [--out-file <out_file>] [--quiet] [--timezone <timezone>] job_id
- Parameters
- ``<job_id>``integer
Job ID.
- ``–pivot``optional
Run pivot_table against dimensions.
- ``–plot``optional
Plot the query result.
- ``–dry_run``, ``-n``optional
Output translated code without running query.
- ``–verbose``, ``-v``optional
Verbose output.
- ``–connection <connection>``, ``-c <connection>``pytd.Client, optional
Use specified connection.
- ``–dropna``, ``-d``optional
Drop columns if all values are NA.
- ``–out <out>``, ``-o <out>``string, optional
Store the result to variable.
- ``–out-file <out_file>``, ``-O <out_file>``string, optional
Store the result to file.
- ``–quiet``, ``-q``optional
Disable progress output.
- ``–timezone <timezone>``, ``-T <timezone>``string, optional
Set timezone to time index.
- Returns
- pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %td_job 451709460 # select * from sample_datasets.nasdaq limit 5 Out[2]: symbol open volume high low close time 1992-08-25 16:00:00 ATRO 0.0 3900 0.7076 0.7076 0.7076 1992-08-25 16:00:00 ALOG 0.0 11200 11.0000 10.6250 11.0000 1992-08-25 16:00:00 ATAX 0.0 11400 11.3750 11.0000 11.0000 1992-08-25 16:00:00 ATRI 0.0 5400 14.3405 14.0070 14.2571 1992-08-25 16:00:00 ABMD 0.0 38800 5.7500 5.2500 5.6875
-
td_hive
(self, line, cell)¶ Run a Hive query.
%%td_hive [<database>] [--pivot] [--plot] [--dry-run] [--verbose] [--connection <connection>] [--dropna] [--out <out>] [--out-file <out_file>] [--quiet] [--timezone <timezone>] <query>
- Parameters
- ``<query>``string
Hive query.
- ``<database>``string, optional
Database name.
- ``–pivot``optional
Run pivot_table against dimensions.
- ``–plot``optional
Plot the query result.
- ``–dry_run``, ``-n``optional
Output translated code without running query.
- ``–verbose``, ``-v``optional
Verbose output.
- ``–connection <connection>``, ``-c <connection>``pytd.Client, optional
Use specified connection.
- ``–dropna``, ``-d``optional
Drop columns if all values are NA.
- ``–out <out>``, ``-o <out>``string, optional
Store the result to variable.
- ``–out-file <out_file>``, ``-O <out_file>``string, optional
Store the result to file.
- ``–quiet``, ``-q``optional
Disable progress output.
- ``–timezone <timezone>``, ``-T <timezone>``string, optional
Set timezone to time index.
- Returns
- pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %%td_hive ...: select hivemall_version() ...: Out[2]: _c0 0 0.6.0-SNAPSHOT-201901-r01
-
td_presto
(self, line, cell)¶ Run a Presto query.
%%td_presto [<database>] [--pivot] [--plot] [--dry-run] [--verbose] [--connection <connection>] [--dropna] [--out <out>] [--out-file <out_file>] [--quiet] [--timezone <timezone>] <query>
- Parameters
- ``<query>``string
Presto query.
- ``<database>``string, optional
Database name.
- ``–pivot``optional
Run pivot_table against dimensions.
- ``–plot``optional
Plot the query result.
- ``–dry_run``, ``-n``optional
Output translated code without running query.
- ``–verbose``, ``-v``optional
Verbose output.
- ``–connection <connection>``, ``-c <connection>``pytd.Client, optional
Use specified connection.
- ``–dropna``, ``-d``optional
Drop columns if all values are NA.
- ``–out <out>``, ``-o <out>``string, optional
Store the result to variable.
- ``–out-file <out_file>``, ``-O <out_file>``string, optional
Store the result to file.
- ``–quiet``, ``-q``optional
Disable progress output.
- ``–timezone <timezone>``, ``-T <timezone>``string, optional
Set timezone to time index.
- Returns
- pandas.DataFrame
Examples
In [1]: %load_ext pytd.pandas_td.ipython In [2]: %%td_presto ...: select * from sample_datasets.nasdaq limit 5 ...: Out[2]: symbol open volume high low close time 1989-01-26 16:00:00 SMTC 0.0 8000 0.4532 0.4532 0.4532 1989-01-26 16:00:00 SEIC 0.0 163200 0.7077 0.6921 0.7025 1989-01-26 16:00:00 SIGI 0.0 2800 3.9610 3.8750 3.9610 1989-01-26 16:00:00 NAVG 0.0 1800 14.6740 14.1738 14.6740 1989-01-26 16:00:00 MOCO 0.0 71101 3.6722 3.5609 3.5980
-
magics
= {'cell': {'td_hive': 'td_hive', 'td_presto': 'td_presto'}, 'line': {'td_job': 'td_job'}}¶
-
registered
= True¶
-
PySpark Integration¶
-
pytd.spark.
download_td_spark
(spark_binary_version='2.11', version='latest', destination=None)[source]¶ Download a td-spark jar file from S3.
- Parameters
- spark_binary_versionstring, default: ‘2.11’
Apache Spark binary version.
- versionstring, default: ‘latest’
td-spark version.
- destionationstring, optional
Where a downloaded jar file to be stored.
-
pytd.spark.
fetch_td_spark_context
(apikey=None, endpoint=None, td_spark_path=None, download_if_missing=True, spark_configs=None)[source]¶ Build TDSparkContext via td-pyspark.
- Parameters
- apikeystring, optional
Treasure Data API key. If not given, a value of environment variable
TD_API_KEY
is used by default.- endpointstring, optional
Treasure Data API server. If not given, https://api.treasuredata.com is used by default. List of available endpoints is: https://support.treasuredata.com/hc/en-us/articles/360001474288-Sites-and-Endpoints
- td_spark_pathstring, optional
Path to td-spark-assembly_x.xx-x.x.x.jar. If not given, seek a path
TDSparkContextBuilder.default_jar_path()
by default.- download_if_missingboolean, default: True
Download td-spark if it does not exist at the time of initialization.
- spark_configsdict, optional
Additional Spark configurations to be set via
SparkConf
’sset
method.
- Returns
- td_pyspark.TDSparkContext
td_pyspark.TDSparkContext¶
pytd.spark.fetch_td_spark_context()
returns td_pyspark.TDSparkContext()
. See the documentation below and sample usage on Google Colab for more information.
DB-API Reference¶
-
class
pytd.dbapi.
Connection
(client)[source]¶ Bases:
object
The DBAPI interface to Treasure Data.
https://www.python.org/dev/peps/pep-0249/
The interface internally bundles pytd.Client. Implementation is technically based on the DBAPI interface to Treasure Data’s Presto query engine, which relies on presto-python-client: https://github.com/prestodb/presto-python-client.
- Parameters
- clientpytd.Client, optional
A client used to connect to Treasure Data.
Development¶
Contributing to pytd¶
Code Formatting and Testing¶
We use black and isort as a formatter, and flake8 as a linter. Our CI checks format with them.
Note that black requires Python 3.6+ while pytd supports 3.5+, so you must need to have Python 3.6+ for development.
We highly recommend you to introduce pre-commit to ensure your commit follows required format.
You can install pre-commit as follows:
pip install pre-commit
pre-commit install
Now, black, isort, and flake8 will check each time you commit changes.
You can skip these check with git commit --no-verify
.
If you want to check code format manually, you can install them as follows:
pip install black isort flake8
Then, you can run those tool manually;
black pytd
flake8 pytd
isort
You can run formatter, linter, and test by using nox as the following:
pip install nox # You should install at the first time
nox
Documenting¶
pip install -r doc/requirements.txt
Edit contents in doc/
:
cd doc
Build HTML files to render Sphinx documentation:
make html
The doc/
folder is monitored and automatically published by Read the Docs.