Usage Guides
Working with DB-API
pytd implements Python Database API Specification v2.0 with the help of prestodb/presto-python-client.
Connect to the API first:
from pytd.dbapi import connect
conn = connect(pytd.Client(database='sample_datasets'))
# or, connect with Hive:
# >>> conn = connect(pytd.Client(database='sample_datasets', default_engine='hive'))
Cursor
defined by the specification allows us to flexibly fetch
query results from a custom function:
def query(sql, connection):
cur = connection.cursor()
cur.execute(sql)
rows = cur.fetchall()
columns = [desc[0] for desc in cur.description]
return {'data': rows, 'columns': columns}
query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn)
Below is an example of generator-based iterative retrieval, just like pandas.DataFrame.iterrows:
def iterrows(sql, connection):
cur = connection.cursor()
cur.execute(sql)
index = 0
columns = None
while True:
row = cur.fetchone()
if row is None:
break
if columns is None:
columns = [desc[0] for desc in cur.description]
yield index, dict(zip(columns, row))
index += 1
for index, row in iterrows('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn):
print(index, row)
# 0 {'cnt': 590, 'symbol': 'AAIT'}
# 1 {'cnt': 82, 'symbol': 'AAL'}
# 2 {'cnt': 9252, 'symbol': 'AAME'}
# 3 {'cnt': 253, 'symbol': 'AAOI'}
# 4 {'cnt': 5980, 'symbol': 'AAON'}
# ...
Use Existing td-spark-assembly.jar File
If you want to use existing td-spark JAR file, creating SparkWriter
with td_spark_path
option would be helpful. You can pass a writer to
connect()
function.
import pytd
import pytd.pandas_td as td
import pandas as pd
apikey = '1/XXX'
endpoint = 'https://api.treasuredata.com/'
writer = pytd.writer.SparkWriter(td_spark_path='/path/to/td-spark-assembly.jar')
con = td.connect(apikey=apikey, endpoint=endpoint, writer=writer)
df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
td.to_td(df, 'mydb.buzz', con, if_exists='replace', index=False)