PySpark Integration
- pytd.spark.download_td_spark(spark_binary_version='3.0.1', version='latest', destination=None)[source]
Download a td-spark jar file from S3.
- Parameters
spark_binary_version (str, default: '3.0.1') – Apache Spark binary version.
version (str, default: 'latest') – td-spark version.
destination (str, optional) – Where a downloaded jar file to be stored.
- pytd.spark.fetch_td_spark_context(apikey=None, endpoint=None, td_spark_path=None, download_if_missing=True, spark_configs=None)[source]
Build TDSparkContext via td-pyspark.
- Parameters
apikey (str, optional) – Treasure Data API key. If not given, a value of environment variable
TD_API_KEY
is used by default.endpoint (str, optional) – Treasure Data API server. If not given,
https://api.treasuredata.com
is used by default. List of available endpoints is: https://docs.treasuredata.com/display/public/PD/Sites+and+Endpointstd_spark_path (str, optional) – Path to td-spark-assembly-{td-spark-version}_spark{spark-version}.jar. If not given, seek a path
TDSparkContextBuilder.default_jar_path()
by default.download_if_missing (bool, default: True) – Download td-spark if it does not exist at the time of initialization.
spark_configs (dict, optional) – Additional Spark configurations to be set via
SparkConf
’sset
method.
- Returns
Connection of td-spark
- Return type
td_pyspark.TDSparkContext
pytd.spark.fetch_td_spark_context()
returns td_pyspark.TDSparkContext()
. See the documentation below and sample usage on Google Colab for more information.
- class td_pyspark.TDSparkContext(spark, td=None)
Treasure Data Spark Context
- __init__(spark, td=None)
- Parameters
spark (pyspark.sql.SparkSessio) – SparkSession already connected to Spark.
td (TDSparkContext, optional) – Treasure Data Spark Context.
- df(table)
Load Treasure Data table into Spark DataFrame
- Parameters
table (str) – Table name of Treasure Data.
- Returns
Loaded table data.
- Return type
pyspark.sql.DataFrame
- presto(sql, database=None)
Submit Presto Query
- Parameters
sql (str) – A SQL to be executed.
database (str, optional) – Target database name.
- Returns
SQL result
- Return type
pyspark.sql.DataFrame
- Example
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> spark = SparkSession.builder.master("local").getOrCreate() >>> td = TDSparkContext(spark) >>> sql = "select code, count(*) from sample_datasets.www_access group by 1" >>> q = td.presto(sql) >>> q.show() 2019-06-13 20:09:13.245Z info [TDPrestoJDBCRDD] - (TDPrestoRelation.scala:106) Submit Presto query: select code, count(*) cnt from sample_datasets.www_access group by 1 +----+----+ |code| cnt| +----+----+ | 200|4981| | 500| 2| | 404| 17| +----+----+
- execute_presto(sql, database=None)
Run non query statements (e.g., INSERT INTO, CREATE TABLE)
- Parameters
sql (str) – A SQL to be executed.
database (str, optional) – Target database name.
- Example
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")
- table(table)
Fetch TreasureData table
- Parameters
table (str) – Table name
- Returns
TD table data. df() method should be called to treat as spark.sql.DataFrame.
- Return type
TDTable
- Example
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.table("sample_datasets.www_access") <td_pyspark.td_pyspark.TDTable object at 0x10eedf240>
- db(name)
Fetch TreasureData database
- Parameters
name (str) – Database name
- Returns
TD database data. df() method should be called to treat as spark.sql.DataFrame.
- Return type
TDDatabase
- Example
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.db("sample_datasets") <td_pyspark.td_pyspark.TDDatabase object at 0x10eedfa58>
- set_log_level(log_level)
Set log level for Spark
- Parameters
log_level (str) – Log level for Spark process. {“ALL”, “DEBUG”, “ERROR”, “FATAL”, “INFO”, “OFF”, “TRACE”, “WARN”}
- Example
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.set_log_level("DEBUG") 2019-09-06T18:19:12.398-0700 info [TDSparkContext] Setting the log level of com.treasuredata.spark to DEBUG - (TDSparkContext.scala:62)
- use(name)
Change current database
- Parameters
name (str) – Target database name to be changed.
- Example
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.use("mydb") 2019-09-06T18:19:49.469-0700 info [TDSparkContext] Use mydb - (TDSparkContext.scala:150)
- with_apikey(apikey)
Set an additional apikey
- Parameters
apikey (str) – apikey for TreasureData
- Example
>>> from td_pyspark import TDSparkContext >>> from pyspark.sql import SparkSession >>> spark = SparkSession.builder.master("local").getOrCreate() >>> td = TDSparkContext(spark) >>> td2 = td.with_apikey("key2")
- write(df, table_name, mode='error')
Write a DataFrame as a TreasureData table
- Parameters
df – Target DataFrame to be ingested to TreasureData.
table_name – Target table name to be inserted.
mode –
Save mode same as Spark. {“error”. “overwrite”, “append”, “ignore”}
error: raise an exception.
overwrite: drop the existing table, recreate it, and insert data.
append: insert data. Create if does not exist.
ignore: do nothing.
- Example
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import pandas as pd >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]})) >>> td.write(df, "mydb.table1", "error")
- insert_into(df, table_name)
Insert a DataFrame into existing TreasureData table
- Parameters
df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.
table_name (str) – Target table name to be inserted.
- Example
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import pandas as pd >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]})) >>> td.insert_into(df, "mydb.table1") 2019-09-09T10:57:37.558-0700 info [TDWriter] Uploading data to mydb.table1 (mode: Append) - (TDWriter.scala:66) 2019-09-09T10:57:38.187-0700 info [TDWriter] [txx:8184891a] Starting a new transaction for updating mydb.table1 - (TDWriter.scala:95) 2019-09-09T10:57:42.897-0700 info [TDWriter] [txx:8184891a] Finished uploading 1 partitions (1 records, size:132B) to mydb.table1 - (TDWriter.scala:132)
- create_or_replace(df, table_name)
Create or replace a TreasureData table wity a DataFrame
- Parameters
df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.
table_name (str) – Target table name to be ingested.
- Example
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]})) >>> td.create_or_replace(df, "mydb.table1") 2019-09-09T10:57:56.381-0700 warn [DefaultSource] Dropping mydb.table1 (Overwrite mode) - (DefaultSource.scala:94) 2019-09-09T10:57:56.923-0700 info [TDWriter] Uploading data to mydb.table1 (mode: Overwrite) - (TDWriter.scala:66) 2019-09-09T10:57:57.106-0700 info [TDWriter] [txx:a69bce97] Starting a new transaction for updating aki.tds_test - (TDWriter.scala:95) 2019-09-09T10:57:59.179-0700 info [TDWriter] [txx:a69bce97] Finished uploading 1 partitions (1 records, size:132B) to aki.tds_test - (TDWriter.scala:132)
- create_table_if_not_exists(table_name)
Create a table if not exists
- Parameters
table_name (str) – Target table name to be created.
- Example
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.create_table_if_not_exists(df, "mydb.table1") 2019-09-09T13:43:41.142-0700 warn [TDTable] Creating table aki.tds_test if not exists - (TDTable.scala:67)
- drop_table_if_exists(table_name)
Drop a table if exists
- Parameters
table_name (str) – Target table name to be dropped.
- Example
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.drop_table_if_exists(df, "mydb.table1")
- create_database_if_not_exists(db_name)
Create a database if not exits
- Parameters
db_name (str) – Target database name to be created.
- Example
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.create_database_if_not_exists(df, "mydb")
- drop_database_if_exists(db_name)
Drop a database if exists
- Parameters
db_name (str) – Target database name to be dropped
- Example
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> import os >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.drop_database_if_exists(df, "mydb")
- create_udp_l(table_name, long_column_name)
Create an User-Defined Partition Table partitioned by Long type column
User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by Long type column.
- Parameters
table_name (str) – Target table name to be created as a UDP table.
long_column_name (str) – Partition column with Long (bigint) type column
- Example
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.create_udp_l("mydb.departments", "dept_id") 2019-09-09T10:43:20.913-0700 info [UDP] - (UDP.scala:41) Preparing UDP table: -- td-spark: UDP creation create table if not exists "mydb"."departments" ( time bigint, "dept_id" bigint ) with ( bucketed_on = array['dept_id'], bucket_count = 512 )
- create_udp_s(table_name, string_column_name)
Create an User-Defined Partition Table partitioned by string type column
User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by string type column.
- Parameters
table_name – Target table name to be created as a UDP table.
string_column_name – Partition column with string type column
- Example
>>> import os >>> from td_pyspark import TDSparkContext, TDSparkContextBuilder >>> from pyspark.sql import SparkSession, Row >>> builder = SparkSession.builder >>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")). ... jars(TDSparkContextBuilder.default_jar_path()).build() >>> td.create_udp_s("mydb.user_list", "id") 2019-09-09T10:45:27.802-0700 info [UDP] - (UDP.scala:41) Preparing UDP table: -- td-spark: UDP creation create table if not exists "mydb"."user_list" ( time bigint, "id" varchar ) with ( bucketed_on = array['id'], bucket_count = 512 )
- swap_tables(table1, table2)
Swap table contents within the same database.
- Parameters
table1 –
table2 –
- Example
>>> td.swap_tables("mydb.tbl1", "mydb.tbl2")