PySpark Integration

pytd.spark.download_td_spark(spark_binary_version='2.11', version='latest', destination=None)[source]

Download a td-spark jar file from S3.

Parameters
  • spark_binary_version (str, default: '2.11') – Apache Spark binary version.

  • version (str, default: 'latest') – td-spark version.

  • destination (str, optional) – Where a downloaded jar file to be stored.

pytd.spark.fetch_td_spark_context(apikey=None, endpoint=None, td_spark_path=None, download_if_missing=True, spark_configs=None)[source]

Build TDSparkContext via td-pyspark.

Parameters
  • apikey (str, optional) – Treasure Data API key. If not given, a value of environment variable TD_API_KEY is used by default.

  • endpoint (str, optional) – Treasure Data API server. If not given, https://api.treasuredata.com is used by default. List of available endpoints is: https://support.treasuredata.com/hc/en-us/articles/360001474288-Sites-and-Endpoints

  • td_spark_path (str, optional) – Path to td-spark-assembly_x.xx-x.x.x.jar. If not given, seek a path TDSparkContextBuilder.default_jar_path() by default.

  • download_if_missing (bool, default: True) – Download td-spark if it does not exist at the time of initialization.

  • spark_configs (dict, optional) – Additional Spark configurations to be set via SparkConf’s set method.

Returns

Connection of td-spark

Return type

td_pyspark.TDSparkContext

td_pyspark.TDSparkContext

pytd.spark.fetch_td_spark_context() returns td_pyspark.TDSparkContext(). See the documentation below and sample usage on Google Colab for more information.

class td_pyspark.TDSparkContext(spark)

Treasure Data Spark Context

__init__(spark)
Parameters

spark (pyspark.sql.SparkSession) – SparkSession already connected to Spark.

df(table)

Load Treasure Data table into Spark DataFrame

Parameters

table (str) – Table name of Treasure Data.

Returns

Loaded table data.

Return type

pyspark.sql.DataFrame

presto(sql, database=None)

Submit Presto Query

Parameters
  • sql (str) – A SQL to be executed.

  • database (str, optional) – Target database name.

Returns

SQL result

Return type

pyspark.sql.DataFrame

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> spark = SparkSession.builder.master("local").getOrCreate()
>>> td = TDSparkContext(spark)
>>> sql = "select code, count(*) from sample_datasets.www_access group by 1"
>>> q = td.presto(sql)
>>> q.show()
2019-06-13 20:09:13.245Z  info [TDPrestoJDBCRDD]  - (TDPrestoRelation.scala:106)
Submit Presto query:
select code, count(*) cnt from sample_datasets.www_access group by 1
+----+----+
|code| cnt|
+----+----+
| 200|4981|
| 500|   2|
| 404|  17|
+----+----+
execute_presto(sql, database=None)

Run non query statements (e.g., INSERT INTO, CREATE TABLE)

Parameters
  • sql (str) – A SQL to be executed.

  • database (str, optional) – Target database name.

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")
table(table)

Fetch TreasureData table

Parameters

table (str) – Table name

Returns

TD table data. df() method should be called to treat as spark.sql.DataFrame.

Return type

TDTable

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.table("sample_datasets.www_access")
<td_pyspark.td_pyspark.TDTable object at 0x10eedf240>
db(name)

Fetch TreasureData database

Parameters

name (str) – Database name

Returns

TD database data. df() method should be called to treat as spark.sql.DataFrame.

Return type

TDDatabase

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.db("sample_datasets")
<td_pyspark.td_pyspark.TDDatabase object at 0x10eedfa58>
set_log_level(log_level)

Set log level for Spark

Parameters

log_level (str) –

Log level for Spark process. {“ALL”, “DEBUG”, “ERROR”, “FATAL”, “INFO”, “OFF”, “TRACE”,

”WARN”}

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.set_log_level("DEBUG")
2019-09-06T18:19:12.398-0700  info [TDSparkContext] Setting the log level of com.treasuredata.spark to DEBUG - (TDSparkContext.scala:62)
use(name)

Change current database

Parameters

name (str) – Target database name to be changed.

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.use("mydb")
2019-09-06T18:19:49.469-0700  info [TDSparkContext] Use mydb - (TDSparkContext.scala:150)
write(df, table_name, mode='error')

Write a DataFrame as a TreasureData table

Parameters

df – Target DataFrame to be ingested to TreasureData.

:param table_name:Target table name to be inserted. :param mode: Save mode same as Spark. {“error”. “overwrite”, “append”, “ignore”}

  • error: raise an exception.

  • overwrite: drop the existing table, recreate it, and insert data.

  • append: insert data. Create if doesn not exist.

  • ignore: do nothing.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import pandas as pd
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.write(df, "mydb.table1", "error")
insert_into(df, table_name)

Insert a DataFrame into existing TreasureData table

Parameters
  • df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.

  • table_name (str) – Target table name to be inserted.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import pandas as pd
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.insert_into(df, "mydb.table1")
2019-09-09T10:57:37.558-0700  info [TDWriter] Uploading data to mydb.table1 (mode: Append) - (TDWriter.scala:66)
2019-09-09T10:57:38.187-0700  info [TDWriter] [txx:8184891a] Starting a new transaction for updating mydb.table1 - (TDWriter.scala:95)
2019-09-09T10:57:42.897-0700  info [TDWriter] [txx:8184891a] Finished uploading 1 partitions (1 records, size:132B) to mydb.table1 - (TDWriter.scala:132)
create_or_replace(df, table_name)

Create or replace a TreasureData table wity a DataFrame

Parameters
  • df (pyspark.sql.DataFrame) – Target DataFrame to be ingested to TreasureData.

  • table_name (str) – Target table name to be ingested.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> df = td.spark.createDataFrame(pd.DataFrame({"name": ["Alice"], "age": [1]}))
>>> td.create_or_replace(df, "mydb.table1")
2019-09-09T10:57:56.381-0700  warn [DefaultSource] Dropping mydb.table1 (Overwrite mode) - (DefaultSource.scala:94)
2019-09-09T10:57:56.923-0700  info [TDWriter] Uploading data to mydb.table1 (mode: Overwrite) - (TDWriter.scala:66)
2019-09-09T10:57:57.106-0700  info [TDWriter] [txx:a69bce97] Starting a new transaction for updating aki.tds_test - (TDWriter.scala:95)
2019-09-09T10:57:59.179-0700  info [TDWriter] [txx:a69bce97] Finished uploading 1 partitions (1 records, size:132B) to aki.tds_test - (TDWriter.scala:132)
create_table_if_not_exists(table_name)

Create a table if not exists

Parameters

table_name (str) – Target table name to be created.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_table_if_not_exists(df, "mydb.table1")
2019-09-09T13:43:41.142-0700  warn [TDTable] Creating table aki.tds_test if not exists - (TDTable.scala:67)
drop_table_if_exists(table_name)

Drop a table if exists

Parameters

table_name (str) – Target table name to be dropped.

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.drop_table_if_exists(df, "mydb.table1")
create_database_if_not_exists(db_name)

Create a database if not exits

Parameters

db_name (str) – Target database name to be created.

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_database_if_not_exists(df, "mydb")
drop_database_if_exists(db_name)

Drop a database if exists

Parameters

db_name (str) – Target database name to be dropped

Example

>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> import os
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.drop_database_if_exists(df, "mydb")
create_udp_l(table_name, long_column_name)

Create an User-Defined Partition Table partitioned by Long type column

User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by Long type column.

Parameters
  • table_name (str) – Target table name to be created as a UDP table.

  • long_column_name (str) – Partition column with Long (bigint) type column

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_udp_l("mydb.departments", "dept_id")
2019-09-09T10:43:20.913-0700  info [UDP]  - (UDP.scala:41)
Preparing UDP table:
-- td-spark: UDP creation
create table if not exists "mydb"."departments" (
  time bigint,
  "dept_id" bigint
)
with (
  bucketed_on = array['dept_id'],
  bucket_count = 512
)
create_udp_s(table_name, string_column_name)

Create an User-Defined Partition Table partitioned by string type column

User-defined partitioning (UDP) is useful if you know a column in the table that has unique identifiers (e.g., IDs, category values). This method is for creating a UDP table partitioned by string type column.

Parameters
  • table_name – Target table name to be created as a UDP table.

  • string_column_name – Partition column with string type column

Example

>>> import os
>>> from td_pyspark import TDSparkContext, TDSparkContextBuilder
>>> from pyspark.sql import SparkSession, Row
>>> builder = SparkSession.builder
>>> td = TDSparkContextBuilder(builder).apikey(os.getenv("TD_API_KEY")).         ...      jars(TDSparkContextBuilder.default_jar_path()).build()
>>> td.create_udp_s("mydb.user_list", "id")
2019-09-09T10:45:27.802-0700  info [UDP]  - (UDP.scala:41)
Preparing UDP table:
-- td-spark: UDP creation
create table if not exists "mydb"."user_list" (
  time bigint,
  "id" varchar
)
with (
  bucketed_on = array['id'],
  bucket_count = 512
)