Skip to content

Commit

Permalink
fixed python notebooks
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Aralihalli <[email protected]>
  • Loading branch information
SurajAralihalli committed Jul 20, 2022
1 parent af6e966 commit 1c04ffb
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
"source": [
"# The input path of dataset\n",
"# dataRoot = os.getenv(\"DATA_ROOT\", \"/data\")\n",
"dataRoot = os.getenv(\"DATA_ROOT\", \"/data\")\n",
"orig_raw_path = dataRoot + \"/mortgage/\""
"dataRoot = os.getenv(\"DATA_ROOT\", \"/mortgage\")\n",
"orig_raw_path = dataRoot + \"/input/\""
]
},
{
Expand All @@ -77,11 +77,11 @@
"\n",
"is_save_dataset=True\n",
"# the path to save the train dataset\n",
"output_path_train=dataRoot + \"/mortgage/output/train/\"\n",
"output_path_train=dataRoot + \"/output/train/\"\n",
"# the path to save the test dataset\n",
"output_path_test=dataRoot + \"/mortgage/output/test/\"\n",
"output_path_test=dataRoot + \"/output/test/\"\n",
"# the path to save the xgboost model\n",
"output_path_model=dataRoot + \"/mortgage/new-model-path\""
"output_path_model=dataRoot + \"/output/model/\""
]
},
{
Expand Down Expand Up @@ -109,7 +109,7 @@
" StructField(\"master_servicer\", StringType()),\n",
" StructField(\"orig_interest_rate\", DoubleType()),\n",
" StructField(\"interest_rate\", DoubleType()),\n",
" StructField(\"orig_upb\", IntegerType()),\n",
" StructField(\"orig_upb\", DoubleType()),\n",
" StructField(\"upb_at_issuance\", StringType()),\n",
" StructField(\"current_actual_upb\", DoubleType()),\n",
" StructField(\"orig_loan_term\", IntegerType()),\n",
Expand Down Expand Up @@ -384,59 +384,13 @@
"def read_raw_csv(spark, path):\n",
" return spark.read.format('csv') \\\n",
" .option('nullValue', '') \\\n",
" .option('header', 'false') \\\n",
" .option('header', False) \\\n",
" .option('delimiter', '|') \\\n",
" .schema(_csv_raw_schema) \\\n",
" .load(path) \\\n",
" .withColumn('quarter', _get_quarter_from_csv_file_name())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* Define function to read Performance CSV data file"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"def read_perf_csv(spark, path):\n",
" return spark.read.format(\"csv\") \\\n",
" .option(\"nullValue\", \"\") \\\n",
" .option(\"header\", \"false\") \\\n",
" .option(\"delimiter\", \"|\") \\\n",
" .schema(_csv_perf_schema) \\\n",
" .load(path) \\\n",
" .withColumn(\"quarter\", _get_quarter_from_csv_file_name())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* Define function to read Acquisition CSV file"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"def read_acq_csv(spark, path):\n",
" return spark.read.format(\"csv\") \\\n",
" .option(\"nullValue\", \"\") \\\n",
" .option(\"header\", \"false\") \\\n",
" .option(\"delimiter\", \"|\") \\\n",
" .schema(_csv_acq_schema) \\\n",
" .load(path) \\\n",
" .withColumn(\"quarter\", _get_quarter_from_csv_file_name())"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@
"def read_raw_csv(spark, path):\n",
" return spark.read.format('csv') \\\n",
" .option('nullValue', '') \\\n",
" .option('header', 'false') \\\n",
" .option('header', False) \\\n",
" .option('delimiter', '|') \\\n",
" .schema(_csv_raw_schema) \\\n",
" .load(path) \\\n",
Expand Down Expand Up @@ -734,25 +734,8 @@
"outputs": [],
"source": [
"# You need to update them to your real paths!\n",
"dataRoot = os.getenv(\"DATA_ROOT\", \"/data\")\n",
"orig_raw_path = dataRoot + '/input'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* Define temporary folder path "
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [],
"source": [
"tmp_perf_path=dataRoot + '/mortgage/perf/'\n",
"tmp_acq_path=dataRoot + '/mortgage/acq/'"
"dataRoot = os.getenv(\"DATA_ROOT\", \"/mortgage\")\n",
"orig_raw_path = dataRoot + '/input/'"
]
},
{
Expand All @@ -768,10 +751,10 @@
"metadata": {},
"outputs": [],
"source": [
"output_path = dataRoot + '/mortgage/output/'\n",
"output_path_train = dataRoot + '/mortgage/train/'\n",
"output_path_test = dataRoot + '/mortgage/test/'\n",
"save_train_test_dataset = True"
"output_path = dataRoot + '/output/data/'\n",
"output_path_train = dataRoot + '/output/train/'\n",
"output_path_eval = dataRoot + '/output/eval/'\n",
"save_train_eval_dataset = True"
]
},
{
Expand Down Expand Up @@ -803,33 +786,18 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read Raw File and Transcode to Parquet"
"### Read Raw File"
]
},
{
"cell_type": "code",
"execution_count": 59,
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"185.46884560585022\n"
]
}
],
"outputs": [],
"source": [
"start = time.time()\n",
"# read data and transcode to qarquet\n",
"\n",
"rawDf = read_raw_csv(spark, orig_raw_path)\n",
"acq = extract_acq_columns(rawDf)\n",
"acq.repartition(12).write.parquet(tmp_acq_path, mode='overwrite')\n",
"perf = extract_perf_columns(rawDf)\n",
"perf.coalesce(96).write.parquet(tmp_perf_path, mode='overwrite')\n",
"end = time.time()\n",
"print(end - start)"
"perf = extract_perf_columns(rawDf)"
]
},
{
Expand Down Expand Up @@ -1748,9 +1716,6 @@
],
"source": [
"start = time.time()\n",
"# read parquet\n",
"perf = spark.read.parquet(tmp_perf_path)\n",
"acq = spark.read.parquet(tmp_acq_path)\n",
"\n",
"# run main function to process data\n",
"out = run_mortgage(spark, perf, acq)\n",
Expand All @@ -1762,9 +1727,9 @@
"splits = out.randomSplit([0.8, 0.2])\n",
"\n",
"# save processed data\n",
"if save_train_test_dataset:\n",
" splits[0].coalesce(1).write.format('csv').save(output_path_train, header = 'true', mode=\"overwrite\")\n",
" splits[1].coalesce(1).write.format('csv').save(output_path_test, header = 'true', mode=\"overwrite\")\n",
"if save_train_eval_dataset:\n",
" splits[0].write.format('parquet').save(output_path_train, mode=\"overwrite\")\n",
" splits[1].write.format('parquet').save(output_path_eval, mode=\"overwrite\")\n",
"\n",
"# print explain and time\n",
"print(out.explain())\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
"from pyspark.ml.tuning import ParamGridBuilder\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import FloatType, IntegerType, StructField, StructType\n",
"from pyspark.sql.types import FloatType, IntegerType, StructField, StructType, DoubleType\n",
"from time import time\n",
"import os"
]
Expand Down Expand Up @@ -92,7 +92,7 @@
" StructField('seller_name', FloatType()),\n",
" StructField('mod_flag', FloatType()),\n",
" StructField('orig_interest_rate', FloatType()),\n",
" StructField('orig_upb', IntegerType()),\n",
" StructField('orig_upb', DoubleType()),\n",
" StructField('orig_loan_term', IntegerType()),\n",
" StructField('orig_ltv', FloatType()),\n",
" StructField('orig_cltv', FloatType()),\n",
Expand All @@ -113,9 +113,12 @@
"features = [ x.name for x in schema if x.name != label ]\n",
"\n",
"# You need to update them to your real paths!\n",
"dataRoot = os.getenv(\"DATA_ROOT\", \"/data\")\n",
"train_data = spark.read.parquet(dataRoot + '/mortgage/parquet/train')\n",
"trans_data = spark.read.parquet(dataRoot + '/mortgage/parquet/eval')"
"dataRoot = os.getenv(\"DATA_ROOT\", \"/mortgage\")\n",
"train_path = dataRoot + \"/train\"\n",
"eval_path = dataRoot + \"/eval\"\n",
"\n",
"train_data = spark.read.parquet(train_path)\n",
"trans_data = spark.read.parquet(eval_path)"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"from ml.dmlc.xgboost4j.scala.spark import XGBoostClassificationModel, XGBoostClassifier\n",
"from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import FloatType, IntegerType, StructField, StructType\n",
"from pyspark.sql.types import FloatType, IntegerType, StructField, StructType, DoubleType\n",
"from time import time\n",
"import os"
]
Expand Down Expand Up @@ -96,7 +96,7 @@
" StructField('seller_name', FloatType()),\n",
" StructField('mod_flag', FloatType()),\n",
" StructField('orig_interest_rate', FloatType()),\n",
" StructField('orig_upb', IntegerType()),\n",
" StructField('orig_upb', DoubleType()),\n",
" StructField('orig_loan_term', IntegerType()),\n",
" StructField('orig_ltv', FloatType()),\n",
" StructField('orig_cltv', FloatType()),\n",
Expand All @@ -117,9 +117,12 @@
"features = [ x.name for x in schema if x.name != label ]\n",
"\n",
"# You need to update them to your real paths!\n",
"dataRoot = os.getenv(\"DATA_ROOT\", \"/data\")\n",
"train_data = reader.schema(schema).option('header', True).csv(dataRoot + '/mortgage/csv/train')\n",
"trans_data = reader.schema(schema).option('header', True).csv(dataRoot + '/mortgage/csv/test')"
"dataRoot = os.getenv(\"DATA_ROOT\", \"/mortgage\")\n",
"train_path = dataRoot + \"/train\"\n",
"eval_path = dataRoot + \"/eval\"\n",
"\n",
"train_data = reader.parquet(train_path)\n",
"trans_data = reader.parquet(eval_path)"
]
},
{
Expand Down Expand Up @@ -232,8 +235,8 @@
"metadata": {},
"outputs": [],
"source": [
"model.write().overwrite().save(dataRoot + '/new-model-path')\n",
"loaded_model = XGBoostClassificationModel().load(dataRoot + '/new-model-path')"
"model.write().overwrite().save(dataRoot + '/model')\n",
"loaded_model = XGBoostClassificationModel().load(dataRoot + '/model')"
]
},
{
Expand Down

0 comments on commit 1c04ffb

Please sign in to comment.