-
Notifications
You must be signed in to change notification settings - Fork 0
Spark Core Module
This module includes support for HDFS and FS(CSV & JSON). User can write data to a file at HDFS or in file system as CSV or JSON. Also, this data can be read from there. SQL queries can also be performed over data.
To use it, you need to have the following dependency in your pom.xml
.
<dependency>
<groupId>com.impetus.kundera.client</groupId>
<artifactId>kundera-spark</artifactId>
<version>${kundera.version}</version>
</dependency>
<persistence-unit name="spark_hdfs_pu">
<provider>com.impetus.kundera.KunderaPersistence</provider>
<properties>
<property name="kundera.nodes" value="localhost" />
<property name="kundera.port" value="7077" />
<property name="kundera.keyspace" value="sparktest" />
<property name="kundera.dialect" value="spark" />
<property name="kundera.client" value="hdfs" />
<property name="kundera.client.lookup.class" value="com.impetus.spark.client.SparkClientFactory" />
<property name="kundera.client.property" value="KunderaSparkTest.xml" />
</properties>
</persistence-unit>
Spark Related Properties are added using xml file. For example in above persistence.xml we mentioned KunderaSparkTest.xml
.
Sample Property File:
<?xml version="1.0" encoding="UTF-8"?>
<clientProperties>
<datastores>
<dataStore>
<name>hdfs</name>
<connection>
<properties>
<property name="spark.master" value="local" />
<property name="spark.app.name" value="sparkhdfs" />
<property name="spark.executor.memory" value="1g" />
<property name="spark.driver.allowMultipleContexts" value="true" />
</properties>
</connection>
</dataStore>
</datastores>
</clientProperties>
Here "spark.master" and "spark.app.name" properties are mendatory. User can add more [spark related properties] (http://spark.apache.org/docs/latest/configuration.html#available-properties) as per their need.
@Entity
@Table(name = "spark_person")
public class Person implements Serializable
{
/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;
/** The person id. */
@Id
private String personId;
/** The person name. */
private String personName;
/** The age. */
private int age;
/** The salary. */
private Double salary;
// setters and getters.
}
User need to set the path for FS/HDFS file where he/she wants to save, read or write data and format of data. User will set these parameters using entity manager level property as shown below:
For HDFS:
em.setProperty("kundera.hdfs.inputfile.path", "hdfs://localhost:9000/sparkInputTest/input");
em.setProperty("kundera.hdfs.outputfile.path", "hdfs://localhost:9000/sparkOutputTest/output");
For FS:
em.setProperty("kundera.fs.inputfile.path", "src/test/resources/csv_input/")
em.setProperty("kundera.fs.outputfile.path", "src/test/resources/csv_output/")
Format:
em.setProperty("format", "json");
Note: currently CSV & JSON formats are supported.
EntityManagerFactory emf = Persistence.createEntityManagerFactory("spark_hdfs_pu");
EntityManager em = emf.createEntityManager();
Person person = new Person();
person.setAge(23);
person.setPersonId("1");
person.setPersonName("Dev");
person.setSalary(100000.0);
// save data
em.persist(person);
em.clear();
Person peronFound = em.find(Person.class, "1");
em.close();
emf.close();
Select all :
String query = "select * from spark_person";
List results = em.createNativeQuery(query).getResultList();
Select with WHERE :
String query = "select * from spark_person where salary > 35000";
List results = em.createNativeQuery(query).getResultList();
Select with LIKE :
String query = "select * from spark_person where personName like 'kp%'";
List results = em.createNativeQuery(query).getResultList();
Sum (Aggregation) :
String query = "select sum(salary) from spark_person";
List results = em.createNativeQuery(query).getResultList();
User can save the results of the query in HDFS/FS or Cassandra as CSV or JSON.
General Format:
INSERT INTO <source>.<path to table/file> [AS <fileType>] FROM <SQL query>
-
source: If user wants to save in file system, its value can be
FS
orHDFS
and if wants to save in database, its value is cassandra (currently cassandra is supported) -
path to table/file: For FS/HDFS, its path to directory and for database, its dbnname.tablename
-
fileType: It is required only for FS/HDFS. It's value casn be CSV or JSON
-
SQL query: Result of this SQL Query is saved according the above mentioned other parameters
Example:
String query = "INSERT INTO fs.[src/test/resources/testspark_csv] AS CSV FROM (select * from spark_person)";
Query q = em.createNativeQuery(query, Person.class);
q.executeUpdate();
query = "INSERT INTO fs.[src/test/resources/testspark_json] AS JSON FROM (select * from spark_person)";
q = em.createNativeQuery(query, Person.class);
q.executeUpdate();
For more details find this testcase.