Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add CLI Support for Catalyst #337

Open
wants to merge 21 commits into
base: sparkSql
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,20 @@ resultSet.next()
println(resultSet.getInt(1))
```

## Running Shark CLI
* Configure the shark_home/conf/shark-env.sh
* Configure the shark_home/conf/hive-site.xml
* Start the Shark CLI
```
$ bin/shark
catalyst> show tables;
catalyst> set shark.exec.mode=hive;
hive>show tables;
```
But there is a bug, which require show tables before doing anything else.

## Known Missing Features
* Shark CLI
* Restoring cached tables upon restart
* Invalidation of cached tables when data is INSERTed
* Off-heap storage using Tachyon
* TGFs
* ...
* ...
12 changes: 9 additions & 3 deletions project/SharkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ object SharkBuild extends Build {
val excludeXerces = ExclusionRule(organization = "xerces")
val excludeHive = ExclusionRule(organization = "org.apache.hive")


/** Extra artifacts not included in Spark SQL's Hive support. */
val hiveArtifacts = Seq("hive-cli", "hive-jdbc")
val hiveArtifacts = Seq("hive-cli", "hive-jdbc", "hive-exec", "hive-service")

val hiveDependencies = hiveArtifacts.map ( artifactId =>
"org.spark-project.hive" % artifactId % "0.12.0" excludeAll(
excludeGuava, excludeLog4j, excludeAsm, excludeNetty, excludeXerces, excludeServlet)
Expand All @@ -101,15 +101,21 @@ object SharkBuild extends Build {

libraryDependencies ++= hiveDependencies ++ yarnDependency,
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-hive" % SPARK_VERSION,
"org.apache.spark" %% "spark-hive" % SPARK_VERSION excludeAll(excludeHive, excludeServlet) force(),
"org.apache.spark" %% "spark-repl" % SPARK_VERSION,
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) force(),
"org.mortbay.jetty" % "jetty" % "6.1.26" exclude ("org.mortbay.jetty", "servlet-api") force(),
"org.eclipse.jetty.orbit" % "javax.servlet" % "3.0.0.v201112011016" artifacts ( Artifact("javax.servlet", "jar", "jar") ),
"com.typesafe" %% "scalalogging-slf4j" % "1.0.1",
"org.scalatest" %% "scalatest" % "1.9.1" % "test"
),

// Download managed jars into lib_managed.
retrieveManaged := true,
resolvers ++= Seq(
"Maven Repository" at "http://repo.maven.apache.org/maven2",
"Apache Repository" at "https://repository.apache.org/content/repositories/releases",
"JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/",
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
"Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository"
Expand Down
139 changes: 139 additions & 0 deletions src/main/scala/shark/CatalystContext.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql
package hive

import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConversions._

import org.apache.hive.service.cli.TableSchema
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.cli.CliDriver
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.processors.CommandProcessor
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.hadoop.hive.ql.Driver

import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
import org.apache.spark.sql.catalyst.plans.logical.ExplainCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecutionException

import shark.LogHelper

//TODO work around for HiveContext, need to update that in Spark project (sql/hive), not here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at least some of the issues that necessitate this class's existence have been fixed (e.g. EXPLAIN throwing exceptions). I'm fine with leaving these other fixes here for now, but can you file some JIRAs for the ones that aren't fixed in Spark?

case class CatalystContext(sc: SparkContext) extends HiveContext(sc) with LogHelper {
@transient protected[hive] override lazy val hiveconf = sessionState.getConf()
@transient protected[hive] override lazy val sessionState = SessionState.get()

class HiveQLQueryExecution(hql: String) extends QueryExecution {
override def logical: LogicalPlan = HiveQl.parseSql(hql)
override def toString = hql + "\n" + super.toString

/**
* Query Result (errcode, result, exception if any)
* If error code equals 0 means got the result, otherwise failed due to some reason / exception
*/
def result(): (Int, Seq[String], Throwable) = analyzed match {
case NativeCommand(cmd) => runOnHive(cmd)
case ExplainCommand(plan) =>
(0, new QueryExecution { val logical = plan }.toString.split("\n"), null)
case query =>
try{
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
// We need the types so we can output struct field names
val types = analyzed.output.map(_.dataType)
// Reformat to match hive tab delimited output.
(0, result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")).toSeq, null)
} catch {
case e: Throwable => {
logError("Error:\n $cmd\n", e)
(-1, Seq[String](), e)
}
}
}

/**
* Get the result set table schema
*/
def getResultSetSchema: TableSchema = {
logger.warn(s"Result Schema: ${analyzed.output}")
if (analyzed.output.size == 0) {
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
} else {
val schema = analyzed.output.map { attr =>
new FieldSchema(attr.name,
org.apache.spark.sql.hive.HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
new TableSchema(schema)
}
}
}

def runOnHive(cmd: String, maxRows: Int = 1000): (Int, Seq[String], Throwable) = {
try {
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hiveconf)

proc match {
case driver: Driver =>
driver.init()

val results = new JArrayList[String]
val response: CommandProcessorResponse = driver.run(cmd)
// Throw an exception if there is an error in query processing.
if (response.getResponseCode != 0) {
driver.destroy()
(response.getResponseCode, Seq[String](response.getErrorMessage()), new Exception(cmd))
} else {
driver.setMaxRows(maxRows)
driver.getResults(results)
driver.destroy()
(0, results, null)
}
case _ =>
SessionState.get().out.println(tokens(0) + " " + cmd_1)
val res = proc.run(cmd_1)
if(res.getResponseCode == 0) {
(0, Seq[String](), null)
} else {
(res.getResponseCode, Seq[String](res.getErrorMessage()), new Exception(cmd_1))
}
}
} catch {
case e: Throwable =>
logger.error(
s"""
|======================
|HIVE FAILURE OUTPUT
|======================
|${outputBuffer.toString}
|======================
|END HIVE FAILURE OUTPUT
|======================
""".stripMargin)
(-2, Seq[String](), e)
}
}
}
93 changes: 93 additions & 0 deletions src/main/scala/shark/CatalystDriver.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (C) 2012 The Regents of The University California.
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package shark

import java.util.ArrayList

import scala.collection.JavaConversions._

import org.apache.commons.lang.exception.ExceptionUtils

import org.apache.hive.service.cli.TableSchema

import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.metastore.api.Schema
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse

import org.apache.spark.sql.hive.CatalystContext

class CatalystDriver extends Driver with LogHelper {
private val context: CatalystContext = CatalystEnv.cc
private var tschema: TableSchema = _
private var result: (Int, Seq[String], Throwable) = _

override def init(): Unit = {
}

override def run(command: String): CommandProcessorResponse = {
val execution = new context.HiveQLQueryExecution(command)

// TODO unify the error code
try {
result = execution.result
tschema = execution.getResultSetSchema

if(result._1 != 0) {
logError(s"Failed in [$command]", result._3)
new CommandProcessorResponse(result._1, ExceptionUtils.getFullStackTrace(result._3), null)
} else {
new CommandProcessorResponse(result._1)
}
} catch {
case t: Throwable =>
logError(s"Failed in [$command]", t)
new CommandProcessorResponse(-3, ExceptionUtils.getFullStackTrace(t), null)
}
}

override def close(): Int = {
result = null
tschema = null

0
}

/**
* Get the result schema, currently CatalystDriver doesn't support it yet.
* TODO: the TableSchema (org.apache.hive.service.cli.TableSchema) is returned by Catalyst,
* however, the Driver requires the Schema (org.apache.hadoop.hive.metastore.api.Schema)
* Need to figure out how to convert the previous to later.
*/
override def getSchema(): Schema = throw new UnsupportedOperationException("for getSchema")
def getTableSchema = tschema

override def getResults(res: ArrayList[String]): Boolean = {
if(result == null) {
false
} else {
res.addAll(result._2)
result = null
true
}
}

override def destroy() {
result = null
tschema = null
}
}
Loading