Load Data between SynxDB Cloud and Spark
Overview
This guide provides instructions for bidirectional data integration between Apache Spark and SynxDB Cloud. It covers both data ingestion from SynxDB Cloud into Spark and data export from Spark to SynxDB Cloud, enabling seamless data movement between these two platforms.
Prerequisites
Before proceeding with the integration, ensure the following components are installed in your Spark environment.
Required JAR Dependencies:
spark-cloudberry_2.12-1.1.0-spark_3.5.jar
aws-java-sdk-bundle-1.11.364.jar
postgresql-42.1.4.jar
Configuration
Step 1. Set up SynxDB Cloud foreign server
As a superuser, configure the foreign server and establish necessary permissions in SynxDB Cloud.
-- Initializes the datalake foreign data wrapper
CREATE EXTENSION datalake_fdw;
-- Configures the foreign data wrapper for Spark integration
CREATE FOREIGN DATA WRAPPER datalake_fdw_spark
HANDLER datalake_fdw_handler
VALIDATOR datalake_fdw_validator
OPTIONS ( mpp_execute 'all segments' );
-- Establishes the foreign server connection
CREATE SERVER foreign_server_spark
FOREIGN DATA WRAPPER datalake_fdw_spark
OPTIONS (
host '10.13.9.167:9000',
protocol 's3',
ishttps 'false'
);
-- Configures user permissions
GRANT USAGE ON FOREIGN SERVER foreign_server_spark TO sparkuser;
GRANT SELECT ON pg_authid TO sparkuser;
GRANT SELECT ON pg_user_mapping TO sparkuser;
After configuring the server and permissions, switch to a regular user to create user mapping with authentication credentials.
-- Creates user mapping with authentication credentials
CREATE USER MAPPING FOR sparkuser
SERVER foreign_server_spark
OPTIONS (
user 'gpadmin',
accesskey 'admin',
secretkey 'admin'
);
-- Configures user environment
ALTER ROLE sparkuser SET warehouse TO w02;
ALTER ROLE sparkuser SET default_tablespace to tbs_001;
Step 2. Configure Spark connection
Configure the Spark environment with the following connection parameters:
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, DateType}
val defaultOptions = Map(
"url" -> "10.13.9.158:32744", // Database endpoint
"user" -> "sparkuser", // Authentication username
"password" -> "sparkuser", // Authentication password
"database" -> "gpadmin", // Target database name
"tempdir" -> "/dfs-stg/spark_test", // Temporary storage location
"server_name" -> "foreign_server_spark" // Foreign server identifier
)
Ingest data from SynxDB Cloud to Spark
Step 1. Create the source table
Create and populate the source table in SynxDB Cloud:
CREATE TABLE employees (
id int,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(100),
hire_date DATE
);
INSERT INTO employees (id, first_name, last_name, email, hire_date) VALUES
(1, 'Jane', 'Smith', 'jane.smith@example.com', '2023-02-10'),
(2, 'Alice', 'Johnson', 'alice.johnson@example.com', '2023-03-05'),
(3, 'Bob', 'Brown', 'bob.brown@example.com', '2023-04-01');
Step 2. Load data
The following methods demonstrate different approaches to load data from SynxDB Cloud into Spark:
// Method 1: Custom SQL Query
val dfQuery = spark.read
.format("cloudberry")
.options(defaultOptions)
.option("query", "select * from public.employees")
.load()
.show(100)
// Method 2: Direct Table Access
val dfTable = spark.read
.format("cloudberry")
.options(defaultOptions)
.option("dbtable", "public.employees")
.load()
.show(100)
// Method 3: Temporary View Creation
val dfTempView = spark.read
.format("cloudberry")
.options(defaultOptions)
.option("dbtable", "public.employees")
.load()
.createOrReplaceTempView("spark_employees")
spark.sql("select * from spark_employees").show
// Method 4: External Table Definition
val ex_tbl_hd_gpadmin_employees = """
CREATE TABLE ex_tbl_hd_gpadmin_employees (
id int,
first_name VARCHAR(50),
last_name VARCHAR(50),
email VARCHAR(100),
hire_date DATE
)
USING cloudberry
OPTIONS (
url '10.13.9.158:32744', // Database endpoint
user 'sparkuser', // Authentication username
password 'sparkuser', // Authentication password
database 'gpadmin', // Target database
tempdir '/dfs-stg/spark_test', // Temporary storage
server_name 'foreign_server_spark', // Foreign server
dbtable 'public.employees', // Source table
compress 'off'
)
"""
spark.sql(ex_tbl_hd_gpadmin_employees)
spark.sql("select * from ex_tbl_hd_gpadmin_employees").show
Export data from Spark to SynxDB Cloud
Given that you have already created a table in SynxDB Cloud, you can directly use Spark to load data from CSV file to the table. The following steps show how to load a CSV file path/to/your/data/employess_spark.csv, prepare the source data in Spark and export them to the public.employees table in SynxDB Cloud.
id,firstname,lastname,email,hiredate
4,John,Doe,john.doe@example.com,2023-01-15
5,Jane,Smith,jane.smith@example.com,2023-02-20
6,Michael,Brown,michael.brown@example.com,2023-03-10
Step 1. Prepare data
Prepare the source data in Spark.
// Define the data schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstname", StringType, true),
StructField("lastname", StringType, true),
StructField("email", StringType, true),
StructField("hiredate", DateType, true)
))
// Load data from CSV source
val df = spark.read
.schema(schema)
.option("header", "true")
.option("dateFormat", "yyyy-MM-dd")
.option("encoding", "UTF-8")
.csv("path/to/your/data/employess_spark.csv")
df.show()
Step 2. Export data
Execute the data export to SynxDB Cloud. The option dbtable is used to specify the target table in SynxDB Cloud. The option columnmap is an optional parameter used to map the column names of the source data to the column names of the target table. The mode of the data export is to specify whether to Append or Overwrite the data in the target table.
df.write
.format("cloudberry")
.options(defaultOptions)
.option("dbtable", "public.employees")
.option("columnmap", Map(
"id" -> "id",
"firstname" -> "first_name",
"lastname" -> "last_name",
"email" -> "email",
"hiredate" -> "hire_date"
).toString())
.mode(SaveMode.Append) // Append or Overwrite
.save()
Step 3. Verify data
Verify the exported data in SynxDB Cloud:
gpadmin=# select * from employees;
id | first_name | last_name | email | hire_date
----+------------+-----------+---------------------------+------------
3 | Bob | Brown | bob.brown@example.com | 2023-04-01
4 | John | Doe | john.doe@example.com | 2023-01-15
5 | Jane | Smith | jane.smith@example.com | 2023-02-20
6 | Michael | Brown | michael.brown@example.com | 2023-03-10
1 | Jane | Smith | jane.smith@example.com | 2023-02-10
2 | Alice | Johnson | alice.johnson@example.com | 2023-03-05
(6 rows)