Load Data between SynxDB Cloud and Spark

Overview

This guide provides instructions for bidirectional data integration between Apache Spark and SynxDB Cloud. It covers both data ingestion from SynxDB Cloud into Spark and data export from Spark to SynxDB Cloud, enabling seamless data movement between these two platforms.

Prerequisites

Before proceeding with the integration, ensure the following components are installed in your Spark environment.

Required JAR Dependencies:

  • spark-cloudberry_2.12-1.1.0-spark_3.5.jar

  • aws-java-sdk-bundle-1.11.364.jar

  • postgresql-42.1.4.jar

Configuration

Step 1. Set up SynxDB Cloud foreign server

As a superuser, configure the foreign server and establish necessary permissions in SynxDB Cloud.

-- Initializes the datalake foreign data wrapper
CREATE EXTENSION datalake_fdw;

-- Configures the foreign data wrapper for Spark integration
CREATE FOREIGN DATA WRAPPER datalake_fdw_spark
    HANDLER datalake_fdw_handler
    VALIDATOR datalake_fdw_validator
    OPTIONS ( mpp_execute 'all segments' );

-- Establishes the foreign server connection
CREATE SERVER foreign_server_spark
    FOREIGN DATA WRAPPER datalake_fdw_spark
    OPTIONS (
        host '10.13.9.167:9000',
        protocol 's3',
        ishttps 'false'
    );

-- Configures user permissions
GRANT USAGE ON FOREIGN SERVER foreign_server_spark TO sparkuser;
GRANT SELECT ON pg_authid TO sparkuser;
GRANT SELECT ON pg_user_mapping TO sparkuser;

After configuring the server and permissions, switch to a regular user to create user mapping with authentication credentials.

-- Creates user mapping with authentication credentials
CREATE USER MAPPING FOR sparkuser
    SERVER foreign_server_spark
    OPTIONS (
        user 'gpadmin',
        accesskey 'admin',
        secretkey 'admin'
    );

-- Configures user environment
ALTER ROLE sparkuser SET warehouse TO w02;
ALTER ROLE sparkuser SET default_tablespace to tbs_001;

Step 2. Configure Spark connection

Configure the Spark environment with the following connection parameters:

import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType, DateType}

val defaultOptions = Map(
   "url" -> "10.13.9.158:32744",           // Database endpoint
   "user" -> "sparkuser",                  // Authentication username
   "password" -> "sparkuser",              // Authentication password
   "database" -> "gpadmin",                // Target database name
   "tempdir" -> "/dfs-stg/spark_test",     // Temporary storage location
   "server_name" -> "foreign_server_spark" // Foreign server identifier
)

Ingest data from SynxDB Cloud to Spark

Step 1. Create the source table

Create and populate the source table in SynxDB Cloud:

CREATE TABLE employees (
    id int,
    first_name VARCHAR(50),
    last_name VARCHAR(50),
    email VARCHAR(100),
    hire_date DATE
);

INSERT INTO employees (id, first_name, last_name, email, hire_date) VALUES
(1, 'Jane', 'Smith', 'jane.smith@example.com', '2023-02-10'),
(2, 'Alice', 'Johnson', 'alice.johnson@example.com', '2023-03-05'),
(3, 'Bob', 'Brown', 'bob.brown@example.com', '2023-04-01');

Step 2. Load data

The following methods demonstrate different approaches to load data from SynxDB Cloud into Spark:

// Method 1: Custom SQL Query
val dfQuery = spark.read
      .format("cloudberry")
      .options(defaultOptions)
      .option("query", "select * from public.employees")
      .load()
      .show(100)

// Method 2: Direct Table Access
val dfTable = spark.read
      .format("cloudberry")
      .options(defaultOptions)
      .option("dbtable", "public.employees")
      .load()
      .show(100)

// Method 3: Temporary View Creation
val dfTempView = spark.read
      .format("cloudberry")
      .options(defaultOptions)
      .option("dbtable", "public.employees")
      .load()
      .createOrReplaceTempView("spark_employees")
spark.sql("select * from spark_employees").show

// Method 4: External Table Definition
val ex_tbl_hd_gpadmin_employees = """
    CREATE TABLE ex_tbl_hd_gpadmin_employees (
      id int,
      first_name VARCHAR(50),
      last_name VARCHAR(50),
      email VARCHAR(100),
      hire_date DATE
    )
    USING cloudberry
    OPTIONS (
      url '10.13.9.158:32744',             // Database endpoint
      user 'sparkuser',                    // Authentication username
      password 'sparkuser',                // Authentication password
      database 'gpadmin',                  // Target database
      tempdir '/dfs-stg/spark_test',       // Temporary storage
      server_name 'foreign_server_spark',  // Foreign server
      dbtable 'public.employees',          // Source table
      compress 'off'
    )
"""
spark.sql(ex_tbl_hd_gpadmin_employees)
spark.sql("select * from ex_tbl_hd_gpadmin_employees").show

Export data from Spark to SynxDB Cloud

Given that you have already created a table in SynxDB Cloud, you can directly use Spark to load data from CSV file to the table. The following steps show how to load a CSV file path/to/your/data/employess_spark.csv, prepare the source data in Spark and export them to the public.employees table in SynxDB Cloud.

id,firstname,lastname,email,hiredate
4,John,Doe,john.doe@example.com,2023-01-15
5,Jane,Smith,jane.smith@example.com,2023-02-20
6,Michael,Brown,michael.brown@example.com,2023-03-10

Step 1. Prepare data

Prepare the source data in Spark.

// Define the data schema
val schema = StructType(Array(
    StructField("id", IntegerType, true),
    StructField("firstname", StringType, true),
    StructField("lastname", StringType, true),
    StructField("email", StringType, true),
    StructField("hiredate", DateType, true)
))

// Load data from CSV source
val df = spark.read
      .schema(schema)
      .option("header", "true")
      .option("dateFormat", "yyyy-MM-dd")
      .option("encoding", "UTF-8")
      .csv("path/to/your/data/employess_spark.csv")
df.show()

Step 2. Export data

Execute the data export to SynxDB Cloud. The option dbtable is used to specify the target table in SynxDB Cloud. The option columnmap is an optional parameter used to map the column names of the source data to the column names of the target table. The mode of the data export is to specify whether to Append or Overwrite the data in the target table.

df.write
   .format("cloudberry")
   .options(defaultOptions)
   .option("dbtable", "public.employees")
   .option("columnmap", Map(
       "id" -> "id",
       "firstname" -> "first_name",
       "lastname" -> "last_name",
       "email" -> "email",
       "hiredate" -> "hire_date"
   ).toString())
   .mode(SaveMode.Append) // Append or Overwrite
   .save()

Step 3. Verify data

Verify the exported data in SynxDB Cloud:

gpadmin=# select * from employees;
 id | first_name | last_name |           email           | hire_date
----+------------+-----------+---------------------------+------------
  3 | Bob        | Brown     | bob.brown@example.com     | 2023-04-01
  4 | John       | Doe       | john.doe@example.com      | 2023-01-15
  5 | Jane       | Smith     | jane.smith@example.com    | 2023-02-20
  6 | Michael    | Brown     | michael.brown@example.com | 2023-03-10
  1 | Jane       | Smith     | jane.smith@example.com    | 2023-02-10
  2 | Alice      | Johnson   | alice.johnson@example.com | 2023-03-05
(6 rows)