diff --git a/PgSqlStorageManager.cs b/PgSqlStorageManager.cs new file mode 100644 index 0000000..8897c65 --- /dev/null +++ b/PgSqlStorageManager.cs @@ -0,0 +1,390 @@ +/* +DotNetMQ - A Complete Message Broker For .NET +Copyright (C) 2011 Halil ibrahim KALKAN + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License as published by the Free Software Foundation; either +version 2.1 of the License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +using System; +using System.Collections.Generic; +using System.Data; +using MDS.Communication.Messages; +using MDS.Exceptions; +using MDS.Serialization; +using Npgsql; + +namespace MDS.Storage.PgSqlStorage +{ + /// + /// This class is used to perform database operations on PostgresSQL database engine using PostgresSQL .Net Provider. + /// + public class PgSqlStorageManager : IStorageManager + { + #region Private fields + + /// + /// Connection string to connect database. + /// + public string ConnectionString + { + get { return _connectionString; } + set { _connectionString = value; } + } + private string _connectionString = "Server=127.0.0.1;Port=5432;Database=mds;User Id=roberto;Password=3zin;"; + + #endregion + + #region Public methods + + #region Unimplemented methods + + public void Start() + { + //No action + } + + public void Stop(bool waitToStop) + { + //No action + } + + public void WaitToStop() + { + //No action + } + + #endregion + + /// + /// Saves a MDSMessageRecord. + /// + /// MDSMessageRecord object to save + /// Auto Increment Id of saved message + public int StoreMessage(MDSMessageRecord messageRecord) + { + var bytesOfMessage = MDSSerializationHelper.SerializeToByteArray(messageRecord.Message); + + var id = InsertAndGetLastId( + "INSERT INTO messages(MessageId, DestServer, NextServer, DestApplication, MessageData, MessageDataLength, RecordDate) "+ + "VALUES(:MessageId, :DestServer, :NextServer, :DestApplication, :MessageData, :MessageDataLength, 'now') returning id", + new NpgsqlParameter("MessageId", messageRecord.Id), + new NpgsqlParameter("DestServer", messageRecord.DestServer), + new NpgsqlParameter("NextServer", messageRecord.NextServer), + new NpgsqlParameter("DestApplication", messageRecord.DestApplication), + new NpgsqlParameter("MessageData", bytesOfMessage) , + new NpgsqlParameter("MessageDataLength", bytesOfMessage.Length) + ); + + messageRecord.Id = id; + return id; + + } + + /// + /// Gets waiting messages for an application. + /// + /// Next server name + /// Destination application name + /// Minimum Id (as start Id) + /// Max record count to get + /// Records gotten from database. + public List GetWaitingMessagesOfApplication(string nextServer, string destApplication, int minId, int maxCount) + { + var recordsTable = GetTable( + "SELECT * FROM messages WHERE NextServer = :NextServer AND DestApplication = :DestApplication AND Id >= :Id ORDER BY Id ASC Limit :LimitValue", + new NpgsqlParameter("NextServer", nextServer), + new NpgsqlParameter("DestApplication", destApplication), + new NpgsqlParameter("Id", minId), + new NpgsqlParameter("LimitValue", maxCount) + ); + + var recordsList = new List(recordsTable.Rows.Count); + foreach (DataRow recordRow in recordsTable.Rows) + { + recordsList.Add( + new MDSMessageRecord + { + Id = Convert.ToInt32(recordRow["Id"]), + DestApplication = recordRow["DestApplication"] as string, + DestServer = recordRow["DestServer"] as string, + NextServer = recordRow["NextServer"] as string, + Message = MDSSerializationHelper.DeserializeFromByteArray(() => new MDSDataTransferMessage(), (byte[])recordRow["MessageData"]), + MessageId = recordRow["MessageId"] as string, + RecordDate = (DateTime)recordRow["RecordDate"] + } + ); + } + + return recordsList; + } + + /// + /// Gets last (biggest) Id of waiting messages for an application. + /// + /// Next server name + /// Destination application name + /// last (biggest) Id of waiting messages + public int GetMaxWaitingMessageIdOfApplication(string nextServer, string destApplication) + { + return GetScalarField( + "SELECT Id FROM messages WHERE NextServer = :NextServer AND DestApplication = :DestApplication ORDER BY Id DESC Limit 1", + new NpgsqlParameter("NextServer", nextServer), + new NpgsqlParameter("DestApplication", destApplication) + ); + } + + /// + /// Gets waiting messages for an MDS server. + /// + /// Next server name + /// Minimum Id (as start Id) + /// Max record count to get + /// Records gotten from database. + public List GetWaitingMessagesOfServer(string nextServer, int minId, int maxCount) + { + var recordsTable = GetTable( + "SELECT * FROM messages WHERE NextServer = :NextServer AND Id >= :Id ORDER BY Id ASC Limit :LimitValue", + new NpgsqlParameter("NextServer", nextServer), + new NpgsqlParameter("Id", minId), + new NpgsqlParameter("LimitValue", maxCount) + ); + + var recordsList = new List(recordsTable.Rows.Count); + foreach (DataRow recordRow in recordsTable.Rows) + { + recordsList.Add( + new MDSMessageRecord + { + Id = Convert.ToInt32(recordRow["Id"]), + DestApplication = recordRow["DestApplication"] as string, + DestServer = recordRow["DestServer"] as string, + NextServer = recordRow["NextServer"] as string, + Message = MDSSerializationHelper.DeserializeFromByteArray(() => new MDSDataTransferMessage(), (byte[])recordRow["MessageData"]), + MessageId = recordRow["MessageId"] as string, + RecordDate = (DateTime)recordRow["RecordDate"] + } + ); + } + + return recordsList; + } + + /// + /// Gets last (biggest) Id of waiting messages for an MDS server. + /// + /// Next server name + /// last (biggest) Id of waiting messages + public int GetMaxWaitingMessageIdOfServer(string nextServer) + { + return GetScalarField( + "SELECT Id FROM messages WHERE NextServer = :NextServer ORDER BY Id DESC Limit 1", + new NpgsqlParameter("NextServer", nextServer) + ); + } + + /// + /// Removes a message. + /// + /// id of message to remove + /// Effected rows count + public int RemoveMessage(int id) + { + return ExecuteNonQuery( + "DELETE FROM messages WHERE Id = :Id", + new NpgsqlParameter("Id", id) + ); + } + + /// + /// This method is used to set Next Server for a Destination Server. + /// It is used to update database records when Server Graph changed. + /// + /// Destination server of messages + /// Next server of messages for destServer + public void UpdateNextServer(string destServer, string nextServer) + { + ExecuteNonQuery( + "UPDATE messages SET NextServer = :NextServer WHERE DestServer = :DestServer", + new NpgsqlParameter("NextServer", nextServer), + new NpgsqlParameter("DestServer", destServer) + ); + } + + #endregion + + #region Helper Methods + + /// + /// Executes a query and returns effected rows count. + /// + /// Query to execute + /// Parameters + /// Effected rows count + private int ExecuteNonQuery(string query, params NpgsqlParameter[] parameters) + { + using (var connection = new NpgsqlConnection(ConnectionString)) + { + connection.Open(); + using (var command = new NpgsqlCommand(query, connection)) + { + foreach (var parameter in parameters) + { + command.Parameters.Add(parameter); + } + return command.ExecuteNonQuery(); + } + } + } + + /// + /// This method is used to run an insert query and get inserted row's auto increment column's value. + /// + /// Insert query to be executed + /// Parameters + /// Auto increment column's value of inserted row + private int InsertAndGetLastId(string query, params NpgsqlParameter[] parameters) + { + using (var connection = new NpgsqlConnection(ConnectionString)) + { + connection.Open(); + using (var command = new NpgsqlCommand(query, connection)) + { + + foreach (var parameter in parameters) + { + command.Parameters.Add(parameter); + } + + // Prepare the command. + // command.Prepare(); + + using (var reader = command.ExecuteReader()) + { + if (reader.Read()) + { + return Convert.ToInt32(reader[0]); + } + + return 0; + } + } + } + } + + /// + /// Runs a query and returns a DataTable. + /// + /// Query to execute + /// Parameters + /// Selected table + private DataTable GetTable(string query, params NpgsqlParameter[] parameters) + { + var table = new DataTable(); + using (var connection = new NpgsqlConnection(ConnectionString)) + { + using (var command = new NpgsqlCommand(query, connection)) + { + foreach (var parameter in parameters) + { + command.Parameters.Add(parameter); + } + + // command.Prepare(); + + using (var adapter = new NpgsqlDataAdapter(command)) + { + adapter.Fill(table); + return table; + } + } + } + } + + /// + /// Gets a record from a table. + /// + /// Select query + /// Select parameters + /// Returns found row as TableRecord object. If there is no row returns null + public TableRecord GetTableRecord(string query, params NpgsqlParameter[] parameters) + { + using (var connection = new NpgsqlConnection(ConnectionString)) + { + connection.Open(); + using (var command = new NpgsqlCommand(query, connection)) + { + foreach (var parameter in parameters) + { + command.Parameters.Add(parameter); + } + + // command.Prepare(); + + using (var reader = command.ExecuteReader()) + { + if (reader.Read()) + { + var record = new TableRecord(); + for (var i = 0; i < reader.FieldCount; i++) + { + record[reader.GetName(i)] = reader[i]; + } + + return record; + } + + return null; + } + } + } + } + + /// + /// Executes a query and gets a Integer result. + /// If query returns no data, method returns 0. + /// + /// Query to execute + /// Parameters + /// Query result or 0 + public int GetScalarField(string query, params NpgsqlParameter[] parameters) + { + using (var connection = new NpgsqlConnection(ConnectionString)) + { + connection.Open(); + using (var command = new NpgsqlCommand(query, connection)) + { + foreach (var parameter in parameters) + { + command.Parameters.Add(parameter); + } + + // command.Prepare(); + + using (var reader = command.ExecuteReader()) + { + if (reader.Read()) + { + return Convert.ToInt32(reader[0]); + } + + return 0; + } + } + } + } + + #endregion + } +}