diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs
index fd6d04b6778b..39ef12b0bf40 100644
--- a/src/common/error/src/ext.rs
+++ b/src/common/error/src/ext.rs
@@ -39,17 +39,25 @@ pub trait ErrorExt: StackError {
     where
         Self: Sized,
     {
-        let error = self.last();
-        if let Some(external_error) = error.source() {
-            let external_root = external_error.sources().last().unwrap();
-
-            if error.to_string().is_empty() {
-                format!("{external_root}")
-            } else {
-                format!("{error}: {external_root}")
+        match self.status_code() {
+            StatusCode::Unknown | StatusCode::Unexpected | StatusCode::Internal => {
+                // masks internal error from end user
+                format!("Internal error: {}", self.status_code() as u32)
+            }
+            _ => {
+                let error = self.last();
+                if let Some(external_error) = error.source() {
+                    let external_root = external_error.sources().last().unwrap();
+
+                    if error.to_string().is_empty() {
+                        format!("{external_root}")
+                    } else {
+                        format!("{error}: {external_root}")
+                    }
+                } else {
+                    format!("{error}")
+                }
             }
-        } else {
-            format!("{error}")
         }
     }
 }
diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs
index 0b50f4497f03..e58639b18ed8 100644
--- a/src/common/procedure/src/local/runner.rs
+++ b/src/common/procedure/src/local/runner.rs
@@ -1047,6 +1047,6 @@ mod tests {
         // Run the runner and execute the procedure.
         runner.run().await;
         let err = meta.state().error().unwrap().output_msg();
-        assert!(err.contains("subprocedure failed"), "{err}");
+        assert!(err.contains("Internal error"), "{err}");
     }
 }
diff --git a/src/query/src/error.rs b/src/query/src/error.rs
index ff2842f20820..7999b4b49871 100644
--- a/src/query/src/error.rs
+++ b/src/query/src/error.rs
@@ -316,9 +316,13 @@ impl ErrorExt for Error {
             ParseSql { source, .. } => source.status_code(),
             CreateRecordBatch { source, .. } => source.status_code(),
             QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(),
-            DataFusion { .. } | MissingTimestampColumn { .. } | RoutePartition { .. } => {
-                StatusCode::Internal
-            }
+            DataFusion { error, .. } => match error {
+                DataFusionError::Internal(_) => StatusCode::Internal,
+                DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
+                DataFusionError::Plan(_) => StatusCode::PlanQuery,
+                _ => StatusCode::EngineExecuteQuery,
+            },
+            MissingTimestampColumn { .. } | RoutePartition { .. } => StatusCode::EngineExecuteQuery,
             Sql { source, .. } => source.status_code(),
             PlanSql { .. } => StatusCode::PlanQuery,
             ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(),
diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs
index 3efc2d3c65fa..db9724aaec51 100644
--- a/src/servers/src/error.rs
+++ b/src/servers/src/error.rs
@@ -392,7 +392,6 @@ impl ErrorExt for Error {
             Internal { .. }
             | InternalIo { .. }
             | TokioIo { .. }
-            | CollectRecordbatch { .. }
             | StartHttp { .. }
             | StartGrpc { .. }
             | AlreadyStarted { .. }
@@ -403,6 +402,8 @@ impl ErrorExt for Error {
             | GrpcReflectionService { .. }
             | BuildHttpResponse { .. } => StatusCode::Internal,
 
+            CollectRecordbatch { .. } => StatusCode::EngineExecuteQuery,
+
             InsertScript { source, .. }
             | ExecuteScript { source, .. }
             | ExecuteQuery { source, .. }
diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs
index c260ccf1749f..a12d54db614a 100644
--- a/src/servers/src/opentsdb/handler.rs
+++ b/src/servers/src/opentsdb/handler.rs
@@ -169,7 +169,7 @@ mod tests {
             .await
             .unwrap();
         let resp = client.read_line().await.unwrap();
-        assert_eq!(resp, Some("Internal error: expected".to_string()));
+        assert_eq!(resp, Some("Internal error: 1003".to_string()));
 
         client.write_line("get".to_string()).await.unwrap();
         let resp = client.read_line().await.unwrap();
diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs
index 552ef6ecb940..388e8b6c0e44 100644
--- a/src/servers/tests/http/opentsdb_test.rs
+++ b/src/servers/tests/http/opentsdb_test.rs
@@ -164,10 +164,7 @@ async fn test_opentsdb_put() {
         .send()
         .await;
     assert_eq!(result.status(), 500);
-    assert_eq!(
-        result.text().await,
-        "{\"error\":\"Internal error: Internal error: expected\"}"
-    );
+    assert_eq!(result.text().await, "{\"error\":\"Internal error: 1003\"}");
 
     let mut metrics = vec![];
     while let Ok(s) = rx.try_recv() {
@@ -206,7 +203,7 @@ async fn test_opentsdb_debug_put() {
         .send()
         .await;
     assert_eq!(result.status(), 200);
-    assert_eq!(result.text().await, "{\"success\":0,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: expected\"}]}");
+    assert_eq!(result.text().await, "{\"success\":0,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: 1003\"}]}");
 
     // multiple data point summary debug put
     let result = client
@@ -231,7 +228,7 @@ async fn test_opentsdb_debug_put() {
         .send()
         .await;
     assert_eq!(result.status(), 200);
-    assert_eq!(result.text().await, "{\"success\":1,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: expected\"}]}");
+    assert_eq!(result.text().await, "{\"success\":1,\"failed\":1,\"errors\":[{\"datapoint\":{\"metric\":\"should_failed\",\"timestamp\":1000,\"value\":1.0,\"tags\":{\"host\":\"web01\"}},\"error\":\"Internal error: 1003\"}]}");
 
     let mut metrics = vec![];
     while let Ok(s) = rx.try_recv() {
diff --git a/tests/cases/distributed/optimizer/filter_push_down.result b/tests/cases/distributed/optimizer/filter_push_down.result
index e6954bc84807..2b2d7af35550 100644
--- a/tests/cases/distributed/optimizer/filter_push_down.result
+++ b/tests/cases/distributed/optimizer/filter_push_down.result
@@ -237,7 +237,7 @@ SELECT i FROM (SELECT * FROM integers i1 UNION SELECT * FROM integers i2) a WHER
 -- SELECT * FROM (SELECT i1.i AS a, i2.i AS b, row_number() OVER (ORDER BY i1.i, i2.i) FROM integers i1, integers i2 WHERE i1.i IS NOT NULL AND i2.i IS NOT NULL) a1 WHERE a=b ORDER BY 1;
 SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2) a1 WHERE cond ORDER BY 1;
 
-Error: 1003(Internal), Invalid argument error: must either specify a row count or at least one column
+Error: 3001(EngineExecuteQuery), Invalid argument error: must either specify a row count or at least one column
 
 SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2 GROUP BY 1) a1 WHERE cond ORDER BY 1;
 
diff --git a/tests/cases/standalone/common/order/order_by_exceptions.result b/tests/cases/standalone/common/order/order_by_exceptions.result
index bd3bfe545a44..f5d049f0f495 100644
--- a/tests/cases/standalone/common/order/order_by_exceptions.result
+++ b/tests/cases/standalone/common/order/order_by_exceptions.result
@@ -13,7 +13,7 @@ Error: 3000(PlanQuery), Error during planning: Order by column out of bounds, sp
 -- Not work in greptimedb
 SELECT a FROM test ORDER BY 'hello', a;
 
-Error: 1003(Internal), Error during planning: Sort operation is not applicable to scalar value hello
+Error: 3001(EngineExecuteQuery), Error during planning: Sort operation is not applicable to scalar value hello
 
 -- Ambiguous reference in union alias, give and error in duckdb, but works in greptimedb
 SELECT a AS k, b FROM test UNION SELECT a, b AS k FROM test ORDER BY k;
@@ -54,7 +54,7 @@ Error: 3000(PlanQuery), Error during planning: Order by column out of bounds, sp
 
 SELECT a % 2, b FROM test UNION SELECT a % 2 AS k, b FROM test ORDER BY -1;
 
-Error: 1003(Internal), Error during planning: Sort operation is not applicable to scalar value -1
+Error: 3001(EngineExecuteQuery), Error during planning: Sort operation is not applicable to scalar value -1
 
 SELECT a % 2, b FROM test UNION SELECT a % 2 AS k FROM test ORDER BY -1;
 
diff --git a/tests/cases/standalone/common/range/error.result b/tests/cases/standalone/common/range/error.result
index 508cad0adff3..e673800d952e 100644
--- a/tests/cases/standalone/common/range/error.result
+++ b/tests/cases/standalone/common/range/error.result
@@ -37,7 +37,7 @@ Error: 2000(InvalidSyntax), sql parser error: Illegal Range select, no RANGE key
 
 SELECT min(val) RANGE '10s', max(val) FROM host ALIGN '5s';
 
-Error: 1003(Internal), No field named "MAX(host.val)". Valid fields are "MIN(host.val) RANGE 10s FILL NULL", host.ts, host.host.
+Error: 3001(EngineExecuteQuery), No field named "MAX(host.val)". Valid fields are "MIN(host.val) RANGE 10s FILL NULL", host.ts, host.host.
 
 SELECT min(val) * 2 RANGE '10s' FROM host ALIGN '5s';
 
@@ -50,12 +50,12 @@ Error: 2000(InvalidSyntax), sql parser error: Can't use the RANGE keyword in Exp
 -- 2.2 no align param
 SELECT min(val) RANGE '5s' FROM host;
 
-Error: 1003(Internal), Error during planning: Missing argument in range select query
+Error: 3000(PlanQuery), Error during planning: Missing argument in range select query
 
 -- 2.3 type mismatch
 SELECT covar(ceil(val), floor(val)) RANGE '20s' FROM host ALIGN '10s';
 
-Error: 1003(Internal), Internal error: Unsupported data type Int64 for function ceil. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
+Error: 3001(EngineExecuteQuery), Internal error: Unsupported data type Int64 for function ceil. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
 
 -- 2.4 nest query
 SELECT min(max(val) RANGE '20s') RANGE '20s' FROM host ALIGN '10s';
@@ -70,11 +70,11 @@ Error: 2000(InvalidSyntax), Range Query: Window functions is not allowed in Rang
 -- 2.6 invalid fill
 SELECT min(val) RANGE '5s', min(val) RANGE '5s' FILL NULL FROM host ALIGN '5s';
 
-Error: 1003(Internal), Schema contains duplicate unqualified field name "MIN(host.val) RANGE 5s FILL NULL"
+Error: 3001(EngineExecuteQuery), Schema contains duplicate unqualified field name "MIN(host.val) RANGE 5s FILL NULL"
 
 SELECT min(val) RANGE '5s' FROM host ALIGN '5s' FILL 3.0;
 
-Error: 1003(Internal), Error during planning: 3.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '3.0' to value of Int64 type }
+Error: 3000(PlanQuery), Error during planning: 3.0 is not a valid fill option, fail to convert to a const value. { Arrow error: Cast error: Cannot cast string '3.0' to value of Int64 type }
 
 DROP TABLE host;
 
diff --git a/tests/cases/standalone/common/types/interval/interval.result b/tests/cases/standalone/common/types/interval/interval.result
index 1b9f3a64d1f7..2d9e4ddbe6d2 100644
--- a/tests/cases/standalone/common/types/interval/interval.result
+++ b/tests/cases/standalone/common/types/interval/interval.result
@@ -249,11 +249,11 @@ SELECT TIMESTAMP '1992-09-20 11:30:00.123456' - interval_value as new_value from
 -- Interval type does not support aggregation functions.
 SELECT MIN(interval_value) from intervals;
 
-Error: 1003(Internal), Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
+Error: 3001(EngineExecuteQuery), Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
 
 SELECT MAX(interval_value) from intervals;
 
-Error: 1003(Internal), Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
+Error: 3001(EngineExecuteQuery), Internal error: Min/Max accumulator not implemented for type Interval(MonthDayNano). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
 
 SELECT SUM(interval_value) from intervals;
 
diff --git a/tests/cases/standalone/common/types/timestamp/timestamp.result b/tests/cases/standalone/common/types/timestamp/timestamp.result
index 70063acfab7c..9de8db65c8e2 100644
--- a/tests/cases/standalone/common/types/timestamp/timestamp.result
+++ b/tests/cases/standalone/common/types/timestamp/timestamp.result
@@ -83,19 +83,19 @@ Error: 3000(PlanQuery), Error during planning: The function Avg does not support
 
 SELECT t+t FROM timestamp;
 
-Error: 1003(Internal), Cast error: Cannot perform arithmetic operation between array of type Timestamp(Millisecond, None) and array of type Timestamp(Millisecond, None)
+Error: 3001(EngineExecuteQuery), Cast error: Cannot perform arithmetic operation between array of type Timestamp(Millisecond, None) and array of type Timestamp(Millisecond, None)
 
 SELECT t*t FROM timestamp;
 
-Error: 1003(Internal), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
+Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
 
 SELECT t/t FROM timestamp;
 
-Error: 1003(Internal), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
+Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
 
 SELECT t%t FROM timestamp;
 
-Error: 1003(Internal), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
+Error: 3001(EngineExecuteQuery), Invalid argument error: column types must match schema types, expected Interval(DayTime) but found Timestamp(Millisecond, None) at column index 0
 
 -- TODO(dennis): It can't run on distributed mode, uncomment it when the issue is fixed: https://github.com/GreptimeTeam/greptimedb/issues/2071 --
 -- SELECT t-t FROM timestamp; --
diff --git a/tests/cases/standalone/common/types/timestamp/timestamp_precision.result b/tests/cases/standalone/common/types/timestamp/timestamp_precision.result
index 6864cca4b716..03126f280608 100644
--- a/tests/cases/standalone/common/types/timestamp/timestamp_precision.result
+++ b/tests/cases/standalone/common/types/timestamp/timestamp_precision.result
@@ -23,7 +23,7 @@ SELECT CAST(sec AS VARCHAR), CAST(msec AS VARCHAR), CAST(micros AS VARCHAR), CAS
 
 SELECT EXTRACT(MICROSECONDS FROM sec), EXTRACT(MICROSECONDS FROM msec), EXTRACT(MICROSECONDS FROM micros), EXTRACT(MICROSECONDS FROM nanos) FROM ts_precision;
 
-Error: 1003(Internal), Execution error: Date part 'MICROSECONDS' not supported
+Error: 3001(EngineExecuteQuery), Execution error: Date part 'MICROSECONDS' not supported
 
 -- we only support precisions 0, 3, 6, and 9
 -- any other precision is rounded up (e.g. 1/2 -> 3, 4/5 -> 6, 7/8 -> 9)