Skip to content

Commit

Permalink
feat: support set spark_config and use to request taskmanager (4parad…
Browse files Browse the repository at this point in the history
…igm#3613)

* Support set spark_config and use to request taskmanager

* Format cpp code

* Format cpp string
  • Loading branch information
tobegit3hub authored Nov 29, 2023
1 parent b005cd1 commit 71cf61e
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/en/reference/sql/ddl/SET_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The following format is also equivalent.
| @@session.enable_trace|@@enable_trace | When the value is `true`, an error message stack will be printed when the SQL statement has a syntax error or an error occurs during the plan generation process. <br />When the value is `false`, only the basic error message will be printed if there is a SQL syntax error or an error occurs during the plan generation process. | `true`, <br /> `false` | `false` |
| @@session.sync_job|@@sync_job | When the value is `true`, the offline command will be executed synchronously, waiting for the final result of the execution.<br />When the value is `false`, the offline command returns immediately. If you need to check the execution, please use `SHOW JOB` command. | `true`, <br /> `false` | `false` |
| @@session.sync_timeout|@@sync_timeout | When `sync_job=true`, you can configure the waiting time for synchronization commands. The timeout will return immediately. After the timeout returns, you can still view the command execution through `SHOW JOB`. | Int | 20000 |
| @@session.spark_config|@@spark_config | Set the Spark configuration for offline jobs, configure like 'spark.executor.memory=2g;spark.executor.cores=2'. Notice that the priority of this Spark configuration is higer than TaskManager Spark configuration but lower than CLI Spark configuration file. | String | "" |

## Example

Expand Down
2 changes: 1 addition & 1 deletion docs/zh/openmldb_sql/ddl/SET_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ sessionVariableName ::= '@@'Identifier | '@@session.'Identifier | '@@global.'Ide
| @@session.enable_trace|@@enable_trace | 当该变量值为 `true`,SQL语句有语法错误或者在计划生成过程发生错误时,会打印错误信息栈。<br />当该变量值为 `false`,SQL语句有语法错误或者在计划生成过程发生错误时,仅打印基本错误信息。 | "true" \| "false" | "false" |
| @@session.sync_job|@@sync_job | 当该变量值为 `true`,离线的命令将变为同步,等待执行的最终结果。<br />当该变量值为 `false`,离线的命令即时返回,若要查看命令的执行情况,请使用`SHOW JOB`| "true" \| "false" | "false" |
| @@session.job_timeout|@@job_timeout | 可配置离线异步命令或离线管理命令的等待时间(以*毫秒*为单位),将立即返回。离线异步命令返回后仍可通过`SHOW JOB`查看命令执行情况。 | Int | "20000" |

| @@session.spark_config|@@spark_config | 设置离线任务的 Spark 参数,配置项参考 'spark.executor.memory=2g;spark.executor.cores=2'。注意此 Spark 配置优先级高于 TaskManager 默认 Spark 配置,低于命令行的 Spark 配置文件。 | String | "" |
## Example

### 设置和显示会话系统变量
Expand Down
64 changes: 63 additions & 1 deletion src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
#include <algorithm>
#include <fstream>
#include <future>
#include <iostream>
#include <memory>
#include <string>
#include <sstream>
#include <unordered_map>
#include <utility>

Expand Down Expand Up @@ -319,6 +321,7 @@ bool SQLClusterRouter::Init() {
session_variables_.emplace("enable_trace", "false");
session_variables_.emplace("sync_job", "false");
session_variables_.emplace("job_timeout", "60000"); // rpc request timeout for taskmanager
session_variables_.emplace("spark_config", "");
}
return true;
}
Expand Down Expand Up @@ -2980,7 +2983,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteOfflineQuery(
bool is_sync_job, int job_timeout,
::hybridse::sdk::Status* status) {
RET_IF_NULL_AND_WARN(status, "output status is nullptr");
std::map<std::string, std::string> config;
std::map<std::string, std::string> config = ParseSparkConfigString(GetSparkConfig());
ReadSparkConfFromFile(std::dynamic_pointer_cast<SQLRouterOptions>(options_)->spark_conf_path, &config);

if (is_sync_job) {
Expand Down Expand Up @@ -3049,6 +3052,16 @@ int SQLClusterRouter::GetJobTimeout() {
return 60000;
}

std::string SQLClusterRouter::GetSparkConfig() {
std::lock_guard<::openmldb::base::SpinMutex> lock(mu_);
auto it = session_variables_.find("spark_config");
if (it != session_variables_.end()) {
return it->second;
}

return "";
}

::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNode* node) {
std::string key = node->Key();
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
Expand Down Expand Up @@ -3083,13 +3096,34 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod
if (!absl::SimpleAtoi(value, &new_timeout)) {
return {StatusCode::kCmdError, "Fail to parse value, can't set the request timeout"};
}
} else if (key == "spark_config") {
if (!CheckSparkConfigString(value)) {
return {
StatusCode::kCmdError,
"Fail to parse spark config, set like 'spark.executor.memory=2g;spark.executor.cores=2'"
};
}
} else {
return {};
}
session_variables_[key] = value;
return {};
}

bool SQLClusterRouter::CheckSparkConfigString(const std::string& input) {
std::istringstream iss(input);
std::string keyValue;

while (std::getline(iss, keyValue, ';')) {
// Check if the substring starts with "spark."
if (keyValue.find("spark.") != 0) {
return false;
}
}

return true;
}

::hybridse::sdk::Status SQLClusterRouter::ParseNamesFromArgs(const std::string& db,
const std::vector<std::string>& args, std::string* db_name, std::string* name) {
if (args.size() == 1) {
Expand Down Expand Up @@ -4523,6 +4557,34 @@ bool SQLClusterRouter::CheckTableStatus(const std::string& db, const std::string
return check_succeed;
}

std::map<std::string, std::string> SQLClusterRouter::ParseSparkConfigString(const std::string& input) {
std::map<std::string, std::string> configMap;

std::istringstream iss(input);
std::string keyValue;

while (std::getline(iss, keyValue, ';')) {
// Split the key-value pair
size_t equalPos = keyValue.find('=');
if (equalPos != std::string::npos) {
std::string key = keyValue.substr(0, equalPos);
std::string value = keyValue.substr(equalPos + 1);

// Check if the key starts with "spark."
if (key.find("spark.") == 0) {
// Add to the map
configMap[key] = value;
} else {
std::cerr << "Error: Key does not start with 'spark.' - " << key << std::endl;
}
} else {
std::cerr << "Error: Invalid key-value pair - " << keyValue << std::endl;
}
}

return configMap;
}

void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::map<std::string, std::string>* config) {
if (!conf_file_path.empty()) {
boost::property_tree::ptree pt;
Expand Down
6 changes: 6 additions & 0 deletions src/sdk/sql_cluster_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ class SQLClusterRouter : public SQLRouter {
// get job timeout from the session variables, we will use the timeout when sending requests to the taskmanager
int GetJobTimeout();

std::string GetSparkConfig();

std::map<std::string, std::string> ParseSparkConfigString(const std::string& input);

bool CheckSparkConfigString(const std::string& input);

::openmldb::base::Status ExecuteOfflineQueryAsync(const std::string& sql,
const std::map<std::string, std::string>& config,
const std::string& default_db, int job_timeout,
Expand Down

0 comments on commit 71cf61e

Please sign in to comment.