diff --git a/PgSqlStorageManager.cs b/PgSqlStorageManager.cs
new file mode 100644
index 0000000..8897c65
--- /dev/null
+++ b/PgSqlStorageManager.cs
@@ -0,0 +1,390 @@
+/*
+DotNetMQ - A Complete Message Broker For .NET
+Copyright (C) 2011 Halil ibrahim KALKAN
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Data;
+using MDS.Communication.Messages;
+using MDS.Exceptions;
+using MDS.Serialization;
+using Npgsql;
+
+namespace MDS.Storage.PgSqlStorage
+{
+ ///
+ /// This class is used to perform database operations on PostgresSQL database engine using PostgresSQL .Net Provider.
+ ///
+ public class PgSqlStorageManager : IStorageManager
+ {
+ #region Private fields
+
+ ///
+ /// Connection string to connect database.
+ ///
+ public string ConnectionString
+ {
+ get { return _connectionString; }
+ set { _connectionString = value; }
+ }
+ private string _connectionString = "Server=127.0.0.1;Port=5432;Database=mds;User Id=roberto;Password=3zin;";
+
+ #endregion
+
+ #region Public methods
+
+ #region Unimplemented methods
+
+ public void Start()
+ {
+ //No action
+ }
+
+ public void Stop(bool waitToStop)
+ {
+ //No action
+ }
+
+ public void WaitToStop()
+ {
+ //No action
+ }
+
+ #endregion
+
+ ///
+ /// Saves a MDSMessageRecord.
+ ///
+ /// MDSMessageRecord object to save
+ /// Auto Increment Id of saved message
+ public int StoreMessage(MDSMessageRecord messageRecord)
+ {
+ var bytesOfMessage = MDSSerializationHelper.SerializeToByteArray(messageRecord.Message);
+
+ var id = InsertAndGetLastId(
+ "INSERT INTO messages(MessageId, DestServer, NextServer, DestApplication, MessageData, MessageDataLength, RecordDate) "+
+ "VALUES(:MessageId, :DestServer, :NextServer, :DestApplication, :MessageData, :MessageDataLength, 'now') returning id",
+ new NpgsqlParameter("MessageId", messageRecord.Id),
+ new NpgsqlParameter("DestServer", messageRecord.DestServer),
+ new NpgsqlParameter("NextServer", messageRecord.NextServer),
+ new NpgsqlParameter("DestApplication", messageRecord.DestApplication),
+ new NpgsqlParameter("MessageData", bytesOfMessage) ,
+ new NpgsqlParameter("MessageDataLength", bytesOfMessage.Length)
+ );
+
+ messageRecord.Id = id;
+ return id;
+
+ }
+
+ ///
+ /// Gets waiting messages for an application.
+ ///
+ /// Next server name
+ /// Destination application name
+ /// Minimum Id (as start Id)
+ /// Max record count to get
+ /// Records gotten from database.
+ public List GetWaitingMessagesOfApplication(string nextServer, string destApplication, int minId, int maxCount)
+ {
+ var recordsTable = GetTable(
+ "SELECT * FROM messages WHERE NextServer = :NextServer AND DestApplication = :DestApplication AND Id >= :Id ORDER BY Id ASC Limit :LimitValue",
+ new NpgsqlParameter("NextServer", nextServer),
+ new NpgsqlParameter("DestApplication", destApplication),
+ new NpgsqlParameter("Id", minId),
+ new NpgsqlParameter("LimitValue", maxCount)
+ );
+
+ var recordsList = new List(recordsTable.Rows.Count);
+ foreach (DataRow recordRow in recordsTable.Rows)
+ {
+ recordsList.Add(
+ new MDSMessageRecord
+ {
+ Id = Convert.ToInt32(recordRow["Id"]),
+ DestApplication = recordRow["DestApplication"] as string,
+ DestServer = recordRow["DestServer"] as string,
+ NextServer = recordRow["NextServer"] as string,
+ Message = MDSSerializationHelper.DeserializeFromByteArray(() => new MDSDataTransferMessage(), (byte[])recordRow["MessageData"]),
+ MessageId = recordRow["MessageId"] as string,
+ RecordDate = (DateTime)recordRow["RecordDate"]
+ }
+ );
+ }
+
+ return recordsList;
+ }
+
+ ///
+ /// Gets last (biggest) Id of waiting messages for an application.
+ ///
+ /// Next server name
+ /// Destination application name
+ /// last (biggest) Id of waiting messages
+ public int GetMaxWaitingMessageIdOfApplication(string nextServer, string destApplication)
+ {
+ return GetScalarField(
+ "SELECT Id FROM messages WHERE NextServer = :NextServer AND DestApplication = :DestApplication ORDER BY Id DESC Limit 1",
+ new NpgsqlParameter("NextServer", nextServer),
+ new NpgsqlParameter("DestApplication", destApplication)
+ );
+ }
+
+ ///
+ /// Gets waiting messages for an MDS server.
+ ///
+ /// Next server name
+ /// Minimum Id (as start Id)
+ /// Max record count to get
+ /// Records gotten from database.
+ public List GetWaitingMessagesOfServer(string nextServer, int minId, int maxCount)
+ {
+ var recordsTable = GetTable(
+ "SELECT * FROM messages WHERE NextServer = :NextServer AND Id >= :Id ORDER BY Id ASC Limit :LimitValue",
+ new NpgsqlParameter("NextServer", nextServer),
+ new NpgsqlParameter("Id", minId),
+ new NpgsqlParameter("LimitValue", maxCount)
+ );
+
+ var recordsList = new List(recordsTable.Rows.Count);
+ foreach (DataRow recordRow in recordsTable.Rows)
+ {
+ recordsList.Add(
+ new MDSMessageRecord
+ {
+ Id = Convert.ToInt32(recordRow["Id"]),
+ DestApplication = recordRow["DestApplication"] as string,
+ DestServer = recordRow["DestServer"] as string,
+ NextServer = recordRow["NextServer"] as string,
+ Message = MDSSerializationHelper.DeserializeFromByteArray(() => new MDSDataTransferMessage(), (byte[])recordRow["MessageData"]),
+ MessageId = recordRow["MessageId"] as string,
+ RecordDate = (DateTime)recordRow["RecordDate"]
+ }
+ );
+ }
+
+ return recordsList;
+ }
+
+ ///
+ /// Gets last (biggest) Id of waiting messages for an MDS server.
+ ///
+ /// Next server name
+ /// last (biggest) Id of waiting messages
+ public int GetMaxWaitingMessageIdOfServer(string nextServer)
+ {
+ return GetScalarField(
+ "SELECT Id FROM messages WHERE NextServer = :NextServer ORDER BY Id DESC Limit 1",
+ new NpgsqlParameter("NextServer", nextServer)
+ );
+ }
+
+ ///
+ /// Removes a message.
+ ///
+ /// id of message to remove
+ /// Effected rows count
+ public int RemoveMessage(int id)
+ {
+ return ExecuteNonQuery(
+ "DELETE FROM messages WHERE Id = :Id",
+ new NpgsqlParameter("Id", id)
+ );
+ }
+
+ ///
+ /// This method is used to set Next Server for a Destination Server.
+ /// It is used to update database records when Server Graph changed.
+ ///
+ /// Destination server of messages
+ /// Next server of messages for destServer
+ public void UpdateNextServer(string destServer, string nextServer)
+ {
+ ExecuteNonQuery(
+ "UPDATE messages SET NextServer = :NextServer WHERE DestServer = :DestServer",
+ new NpgsqlParameter("NextServer", nextServer),
+ new NpgsqlParameter("DestServer", destServer)
+ );
+ }
+
+ #endregion
+
+ #region Helper Methods
+
+ ///
+ /// Executes a query and returns effected rows count.
+ ///
+ /// Query to execute
+ /// Parameters
+ /// Effected rows count
+ private int ExecuteNonQuery(string query, params NpgsqlParameter[] parameters)
+ {
+ using (var connection = new NpgsqlConnection(ConnectionString))
+ {
+ connection.Open();
+ using (var command = new NpgsqlCommand(query, connection))
+ {
+ foreach (var parameter in parameters)
+ {
+ command.Parameters.Add(parameter);
+ }
+ return command.ExecuteNonQuery();
+ }
+ }
+ }
+
+ ///
+ /// This method is used to run an insert query and get inserted row's auto increment column's value.
+ ///
+ /// Insert query to be executed
+ /// Parameters
+ /// Auto increment column's value of inserted row
+ private int InsertAndGetLastId(string query, params NpgsqlParameter[] parameters)
+ {
+ using (var connection = new NpgsqlConnection(ConnectionString))
+ {
+ connection.Open();
+ using (var command = new NpgsqlCommand(query, connection))
+ {
+
+ foreach (var parameter in parameters)
+ {
+ command.Parameters.Add(parameter);
+ }
+
+ // Prepare the command.
+ // command.Prepare();
+
+ using (var reader = command.ExecuteReader())
+ {
+ if (reader.Read())
+ {
+ return Convert.ToInt32(reader[0]);
+ }
+
+ return 0;
+ }
+ }
+ }
+ }
+
+ ///
+ /// Runs a query and returns a DataTable.
+ ///
+ /// Query to execute
+ /// Parameters
+ /// Selected table
+ private DataTable GetTable(string query, params NpgsqlParameter[] parameters)
+ {
+ var table = new DataTable();
+ using (var connection = new NpgsqlConnection(ConnectionString))
+ {
+ using (var command = new NpgsqlCommand(query, connection))
+ {
+ foreach (var parameter in parameters)
+ {
+ command.Parameters.Add(parameter);
+ }
+
+ // command.Prepare();
+
+ using (var adapter = new NpgsqlDataAdapter(command))
+ {
+ adapter.Fill(table);
+ return table;
+ }
+ }
+ }
+ }
+
+ ///
+ /// Gets a record from a table.
+ ///
+ /// Select query
+ /// Select parameters
+ /// Returns found row as TableRecord object. If there is no row returns null
+ public TableRecord GetTableRecord(string query, params NpgsqlParameter[] parameters)
+ {
+ using (var connection = new NpgsqlConnection(ConnectionString))
+ {
+ connection.Open();
+ using (var command = new NpgsqlCommand(query, connection))
+ {
+ foreach (var parameter in parameters)
+ {
+ command.Parameters.Add(parameter);
+ }
+
+ // command.Prepare();
+
+ using (var reader = command.ExecuteReader())
+ {
+ if (reader.Read())
+ {
+ var record = new TableRecord();
+ for (var i = 0; i < reader.FieldCount; i++)
+ {
+ record[reader.GetName(i)] = reader[i];
+ }
+
+ return record;
+ }
+
+ return null;
+ }
+ }
+ }
+ }
+
+ ///
+ /// Executes a query and gets a Integer result.
+ /// If query returns no data, method returns 0.
+ ///
+ /// Query to execute
+ /// Parameters
+ /// Query result or 0
+ public int GetScalarField(string query, params NpgsqlParameter[] parameters)
+ {
+ using (var connection = new NpgsqlConnection(ConnectionString))
+ {
+ connection.Open();
+ using (var command = new NpgsqlCommand(query, connection))
+ {
+ foreach (var parameter in parameters)
+ {
+ command.Parameters.Add(parameter);
+ }
+
+ // command.Prepare();
+
+ using (var reader = command.ExecuteReader())
+ {
+ if (reader.Read())
+ {
+ return Convert.ToInt32(reader[0]);
+ }
+
+ return 0;
+ }
+ }
+ }
+ }
+
+ #endregion
+ }
+}