From b2466302cb2cc1b8a3dba98701429b3fb838e179 Mon Sep 17 00:00:00 2001 From: Simba Peng <1531315@qq.com> Date: Wed, 15 Mar 2023 15:23:46 +0800 Subject: [PATCH 1/3] When a SQL error occurs, clean up all prepared statement. --- storage/mysql/log_storage.go | 1 + storage/mysql/tree_storage.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/storage/mysql/log_storage.go b/storage/mysql/log_storage.go index 2d57c7d4fd..4e7ecaaf8e 100644 --- a/storage/mysql/log_storage.go +++ b/storage/mysql/log_storage.go @@ -741,6 +741,7 @@ func (t *logTreeTX) getLeavesByHashInternal(ctx context.Context, leafHashes [][] args = append(args, t.treeID) rows, err := stx.QueryContext(ctx, args...) if err != nil { + t.ls.cleanAllStmt() klog.Warningf("Query() %s hash = %v", desc, err) return nil, err } diff --git a/storage/mysql/tree_storage.go b/storage/mysql/tree_storage.go index ffb0159cd6..64a778c44b 100644 --- a/storage/mysql/tree_storage.go +++ b/storage/mysql/tree_storage.go @@ -104,6 +104,20 @@ func expandPlaceholderSQL(sql string, num int, first, rest string) string { return strings.Replace(sql, placeholderSQL, parameters, 1) } +// clearAllStmt clean up all sql.Stmt in cache +func (m *mySQLTreeStorage) cleanAllStmt() { + m.statementMutex.Lock() + defer m.statementMutex.Unlock() + + for _, ns := range m.statements { + for _, s := range ns { + s.Close() + } + } + + m.statements = make(map[string]map[int]*sql.Stmt) +} + // getStmt creates and caches sql.Stmt structs based on the passed in statement // and number of bound arguments. // TODO(al,martin): consider pulling this all out as a separate unit for reuse @@ -200,6 +214,7 @@ func (t *treeTX) getSubtrees(ctx context.Context, treeRevision int64, ids [][]by rows, err := stx.QueryContext(ctx, args...) if err != nil { + t.ts.cleanAllStmt() klog.Warningf("Failed to get merkle subtrees: %s", err) return nil, err } @@ -296,6 +311,7 @@ func (t *treeTX) storeSubtrees(ctx context.Context, subtrees []*storagepb.Subtre r, err := stx.ExecContext(ctx, args...) if err != nil { + t.ts.cleanAllStmt() klog.Warningf("Failed to set merkle subtrees: %s", err) return err } From 237ad825d75c7f00b6c4559437e8e127914f8175 Mon Sep 17 00:00:00 2001 From: Simba Peng <1531315@qq.com> Date: Wed, 15 Mar 2023 18:46:35 +0800 Subject: [PATCH 2/3] Add logs and metric --- storage/mysql/log_storage.go | 10 ++++++---- storage/mysql/tree_storage.go | 9 ++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/storage/mysql/log_storage.go b/storage/mysql/log_storage.go index 4e7ecaaf8e..0c3f6cdcd6 100644 --- a/storage/mysql/log_storage.go +++ b/storage/mysql/log_storage.go @@ -82,10 +82,11 @@ const ( ) var ( - once sync.Once - queuedCounter monitoring.Counter - queuedDupCounter monitoring.Counter - dequeuedCounter monitoring.Counter + once sync.Once + queuedCounter monitoring.Counter + queuedDupCounter monitoring.Counter + dequeuedCounter monitoring.Counter + clearedAllStmtCounter monitoring.Counter queueLatency monitoring.Histogram queueInsertLatency monitoring.Histogram @@ -101,6 +102,7 @@ func createMetrics(mf monitoring.MetricFactory) { queuedCounter = mf.NewCounter("mysql_queued_leaves", "Number of leaves queued", logIDLabel) queuedDupCounter = mf.NewCounter("mysql_queued_dup_leaves", "Number of duplicate leaves queued", logIDLabel) dequeuedCounter = mf.NewCounter("mysql_dequeued_leaves", "Number of leaves dequeued", logIDLabel) + clearedAllStmtCounter = mf.NewCounter("mysql_cleared_all_prepared_statements", "Number of all prepared statements cleared") queueLatency = mf.NewHistogram("mysql_queue_leaves_latency", "Latency of queue leaves operation in seconds", logIDLabel) queueInsertLatency = mf.NewHistogram("mysql_queue_leaves_latency_insert", "Latency of insertion part of queue leaves operation in seconds", logIDLabel) diff --git a/storage/mysql/tree_storage.go b/storage/mysql/tree_storage.go index 64a778c44b..93c2f9224b 100644 --- a/storage/mysql/tree_storage.go +++ b/storage/mysql/tree_storage.go @@ -109,13 +109,18 @@ func (m *mySQLTreeStorage) cleanAllStmt() { m.statementMutex.Lock() defer m.statementMutex.Unlock() + klog.Info("Clearing all prepared statements") + for _, ns := range m.statements { for _, s := range ns { - s.Close() + if err := s.Close(); err != nil { + klog.Warningf("Failed to close stmt: %s", err) + } } } m.statements = make(map[string]map[int]*sql.Stmt) + clearedAllStmtCounter.Inc() } // getStmt creates and caches sql.Stmt structs based on the passed in statement @@ -128,8 +133,6 @@ func (m *mySQLTreeStorage) getStmt(ctx context.Context, statement string, num in if m.statements[statement] != nil { if m.statements[statement][num] != nil { - // TODO(al,martin): we'll possibly need to expire Stmts from the cache, - // e.g. when DB connections break etc. return m.statements[statement][num], nil } } else { From 96a0965d5f375e5715e7ea2075f4230c2a6bbee9 Mon Sep 17 00:00:00 2001 From: Simba Peng <1531315@qq.com> Date: Wed, 15 Mar 2023 20:29:32 +0800 Subject: [PATCH 3/3] Improve readability --- storage/mysql/log_storage.go | 14 +++++++------- storage/mysql/tree_storage.go | 16 ++++++++-------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/storage/mysql/log_storage.go b/storage/mysql/log_storage.go index 0c3f6cdcd6..045b803b07 100644 --- a/storage/mysql/log_storage.go +++ b/storage/mysql/log_storage.go @@ -82,11 +82,11 @@ const ( ) var ( - once sync.Once - queuedCounter monitoring.Counter - queuedDupCounter monitoring.Counter - dequeuedCounter monitoring.Counter - clearedAllStmtCounter monitoring.Counter + once sync.Once + queuedCounter monitoring.Counter + queuedDupCounter monitoring.Counter + dequeuedCounter monitoring.Counter + clearedStmtCacheCounter monitoring.Counter queueLatency monitoring.Histogram queueInsertLatency monitoring.Histogram @@ -102,7 +102,7 @@ func createMetrics(mf monitoring.MetricFactory) { queuedCounter = mf.NewCounter("mysql_queued_leaves", "Number of leaves queued", logIDLabel) queuedDupCounter = mf.NewCounter("mysql_queued_dup_leaves", "Number of duplicate leaves queued", logIDLabel) dequeuedCounter = mf.NewCounter("mysql_dequeued_leaves", "Number of leaves dequeued", logIDLabel) - clearedAllStmtCounter = mf.NewCounter("mysql_cleared_all_prepared_statements", "Number of all prepared statements cleared") + clearedStmtCacheCounter = mf.NewCounter("mysql_cleared_prepared-statement_caches", "Number of times the prepared-statement cache has been cleared") queueLatency = mf.NewHistogram("mysql_queue_leaves_latency", "Latency of queue leaves operation in seconds", logIDLabel) queueInsertLatency = mf.NewHistogram("mysql_queue_leaves_latency_insert", "Latency of insertion part of queue leaves operation in seconds", logIDLabel) @@ -743,7 +743,7 @@ func (t *logTreeTX) getLeavesByHashInternal(ctx context.Context, leafHashes [][] args = append(args, t.treeID) rows, err := stx.QueryContext(ctx, args...) if err != nil { - t.ls.cleanAllStmt() + t.ls.clearStmtCache() klog.Warningf("Query() %s hash = %v", desc, err) return nil, err } diff --git a/storage/mysql/tree_storage.go b/storage/mysql/tree_storage.go index 93c2f9224b..6cfda3ea5a 100644 --- a/storage/mysql/tree_storage.go +++ b/storage/mysql/tree_storage.go @@ -47,9 +47,9 @@ const ( n.TreeId = ? AND n.SubtreeRevision <= ? GROUP BY n.TreeId, n.SubtreeId ) AS x - INNER JOIN Subtree - ON Subtree.SubtreeId = x.SubtreeId - AND Subtree.SubtreeRevision = x.MaxRevision + INNER JOIN Subtree + ON Subtree.SubtreeId = x.SubtreeId + AND Subtree.SubtreeRevision = x.MaxRevision AND Subtree.TreeId = x.TreeId AND Subtree.TreeId = ?` placeholderSQL = "" @@ -104,8 +104,8 @@ func expandPlaceholderSQL(sql string, num int, first, rest string) string { return strings.Replace(sql, placeholderSQL, parameters, 1) } -// clearAllStmt clean up all sql.Stmt in cache -func (m *mySQLTreeStorage) cleanAllStmt() { +// clearStmtCache clear up all sql.Stmt in cache +func (m *mySQLTreeStorage) clearStmtCache() { m.statementMutex.Lock() defer m.statementMutex.Unlock() @@ -120,7 +120,7 @@ func (m *mySQLTreeStorage) cleanAllStmt() { } m.statements = make(map[string]map[int]*sql.Stmt) - clearedAllStmtCounter.Inc() + clearedStmtCacheCounter.Inc() } // getStmt creates and caches sql.Stmt structs based on the passed in statement @@ -217,7 +217,7 @@ func (t *treeTX) getSubtrees(ctx context.Context, treeRevision int64, ids [][]by rows, err := stx.QueryContext(ctx, args...) if err != nil { - t.ts.cleanAllStmt() + t.ts.clearStmtCache() klog.Warningf("Failed to get merkle subtrees: %s", err) return nil, err } @@ -314,7 +314,7 @@ func (t *treeTX) storeSubtrees(ctx context.Context, subtrees []*storagepb.Subtre r, err := stx.ExecContext(ctx, args...) if err != nil { - t.ts.cleanAllStmt() + t.ts.clearStmtCache() klog.Warningf("Failed to set merkle subtrees: %s", err) return err }