Apache Spark

Spark is a unified analysis engine for large-scale data processing. Hologres and Saprk (Community Edition and EMR Spark Edition) are efficiently opened up, providing Spark Connector, supporting data from Spark to be written into Hologres in stream or batch mode, and quickly helping enterprises to build data warehouses.

Supported Spark Versions

2.4.3 or 2.4.3+

1. JDBC Connector

Hologres is compatible with the PostgreSQL and provides JDBC/ODBC driver, so Spark can also be written through JDBC. JDBC can implement stream or batch writes, which will be introduced from two scenarios below.

Before you start, download the Postgres JDBC driver

Scenerio 1: Batch Writing

To start Spark-shell you can execute below code

./bin/spark-shell --jars /path/to/postgresql-42.2.14.jar

Prepare data and configure batch writing detail as below

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val data = Seq(
    Row(1L, "test"),
    Row(2L, "test2"),
    Row(3L, "test2"),
    Row(4L, "test2"),
    Row(5L, "test2"),
    Row(6L, "test2")
)
val schema = StructType(Array(
 StructField("a", LongType),
 StructField("b", StringType)
))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)

// .option("checkpointLocation", checkpointLocation)
df.write.format("jdbc")
  .option("url", "jdbc:postgresql://hgcn-xxx.yyy.zzz:80/db001")
  .option("driver", "org.postgresql.Driver")
  .option("dbtable", "tb001")
  .option("user", "Lxxxxxxxx")
  .option("password", "Dyyyyyyyyyyyyyyyyyyyy")
  .save()

Scenerio 2: Streaming Writing

1.Generate JDBC Jar

Community Spark’s jdbc is available, so does EMR’s jdbc dirver, with micro-batch processing will enhance performance.

Could be used in Spark-shell like below

./bin/spark-shell --jars /path_to/emr-jdbc_2.11-2.1.0-SNAPSHOT.jar,/path/to/postgresql-42.2.14.jar --driver-class-path /path_to/emr-jdbc_2.11-2.1.0-SNAPSHOT.jar,/path/to/postgresql-42.2.14.jar

2.Data Writing:prepare data and table in Spark, and configure write-in data in streaming mode to do real-time data writing into Hologres

Attention, provider’s name is “jdbc2”, means it uses EMR jdbc driver. Option ("batchsize", 100) represents micro batch processing configuration.

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("jdbc2")
  .option("url", url)
  .option("driver", "org.postgresql.Driver")
  .option("dbtable", table)
  .option("user", user)
  .option("password", pwd)
  .option("batchsize", 100)
  .option("checkpointLocation", checkpointLocation)
  .start()

query.awaitTermination()

2. Hologres Spark Connector

Write Spark data by calling the Spark connector built-in Hologres:

Jar Usage

1.Execute blow command to use jar in Spark Shell

EMR Spark Edition

spark-shell --jars emr-datahub_shaded_2.11-2.0.0-SNAPSHOT.jar --driver-class-path emr-datahub_shaded_2.11-2.0.0-SNAPSHOT.jar

Community Edition

spark-shell --jars emr-datahub_shaded_2.11-2.0.0-SNAPSHOT.jar

2.Acquire Endpoint of Hologres

To acquire Hologres real-time API endpoint through connector, please execute below command

show hg_datahub_endpoints;

3.Spark Configuration

Configure below information in Spark

df.write
  .format("datahub")
  .option("access.key.id", "yourAccessKeyId")
  .option("access.key.secret", "yourAccessKeySecret")
  .option("endpoint", "datahubEndpoint")
  .option("project", "yourDatabase")
  .option("topic, "yourTable")
  .option("batch.size", "100")
  .save()

Highlight:

batch.size:In order to improve writing efficiency of micro-batch processing, by deafult, value 1, represents no micro-batch processing, support streaming mode. For batch processing it shall be replaced with other suitable values.

format: Configured as datahub, write through holo instance by holohub interface.

and decimal.precision set up

df.option("decimal.precision", 38)
  .option("decimal.scale", 18)

Demo of Writing Data through Connector

1.Create one table for data receiving, data type mapping shall be in accordance

BEGIN;
CREATE TABLE tb001 (
  product_id BIGINT,
  product_name TEXT,
  price NUMERIC(38, 18),
  out_of_stock bool,
  weight double precision,
  ts timestamptz
);
COMMIT;

2.Scala Code: Execute below command in terminal to open Spark interactive shell where you can write interactive Spark code for result-observing

spark-shell --jars emr-datahub_shaded_2.11-2.0.0-SNAPSHOT.jar

3.Execute below scala code in Spark Shell, demo showed as below

val data = Seq(
      Row(1L, "iphone", 1234.56, false, 123.45, "2020-05-21 00:00:00"),
      Row(2L, "Huawei", 1234.56, true, 123.45, "2020-05-21 00:00:00")
)

// supported data types
val schema = StructType(Array(
  StructField("product_id", LongType),
  StructField("product_name", StringType),
  StructField("price", DecimalType),
  StructField("out_of_stock", BooleanType),
  StructField("weight", DoubleType),
  StructField("ts", TimestampType)
))

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  schema
)

df.write
  .format("datahub")
  .option(DatahubSourceProvider.OPTION_KEY_ACCESS_ID, 'my_accessKeyId')
  .option(DatahubSourceProvider.OPTION_KEY_ACCESS_KEY, 'my_accessKeySecret')
  .option(DatahubSourceProvider.OPTION_KEY_ENDPOINT, 'hgxxxx.xxxx.xxx:80')
  .option(DatahubSourceProvider.OPTION_KEY_PROJECT, 'testdb')
  .option(DatahubSourceProvider.OPTION_KEY_TOPIC, 'tb001)
  .option("decimal.precision", 38)
  .option("decimal.scale", 18)
  .option("batch.size", "100")
  .save()

Data Types Mapping

Spark Hologres
LONGTYPE BIGINT
STRINGTYPE TEXT
DECIMALTYPE(P,S) NUMERIC(P,S)
BOOLEANTYPE BOOL
DOUBLETYPE DOUBLE PRECISION
TIMESTAMPTYPE TIMESTAMPTZ