Skip to content

Commit

Permalink
Improve handling of disconnections
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin-Molinero committed Jul 19, 2024
1 parent d3af65e commit b02ca6d
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public class AlpacaBrokerageAdditionalTests
[OneTimeSetUp]
public void OneTimeSetUp()
{
var (apiKey, apiKeySecret, isPaperTrading) = AlpacaBrokerageTestHelpers.GetConfigParameters();
var (apiKey, apiKeySecret, isPaperTrading, accessToken) = AlpacaBrokerageTestHelpers.GetConfigParameters();

var algorithmMock = new Mock<IAlgorithm>();
_alpacaBrokerage = new TestAlpacaBrokerage(apiKey, apiKeySecret, isPaperTrading, algorithmMock.Object);
_alpacaBrokerage = new TestAlpacaBrokerage(apiKey, apiKeySecret, isPaperTrading, algorithmMock.Object, accessToken);
}

[Test]
Expand Down Expand Up @@ -72,13 +72,13 @@ public void GetLatestQuote(Symbol symbol)

internal class TestAlpacaBrokerage : AlpacaBrokerage
{
public TestAlpacaBrokerage(string apiKey, string apiKeySecret, bool isPaperTrading, IAlgorithm algorithm)
: base(apiKey, apiKeySecret, null, isPaperTrading, algorithm)
public TestAlpacaBrokerage(string apiKey, string apiKeySecret, bool isPaperTrading, IAlgorithm algorithm, string accessToken)
: base(apiKey, apiKeySecret, accessToken, isPaperTrading, algorithm)
{
}

public TestAlpacaBrokerage(string apiKey, string apiKeySecret, bool isPaperTrading, IOrderProvider orderProvider, ISecurityProvider securityProvider)
: base(apiKey, apiKeySecret, null, isPaperTrading, orderProvider, securityProvider)
public TestAlpacaBrokerage(string apiKey, string apiKeySecret, bool isPaperTrading, IOrderProvider orderProvider, ISecurityProvider securityProvider, string accessToken)
: base(apiKey, apiKeySecret, accessToken, isPaperTrading, orderProvider, securityProvider)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class AlpacaBrokerageHistoryProviderTests
[OneTimeSetUp]
public void OneTimeSetUp()
{
var (apiKey, apiKeySecret, isPaperTrading) = AlpacaBrokerageTestHelpers.GetConfigParameters();
_alpacaBrokerage = new AlpacaBrokerage(apiKey, apiKeySecret, null, isPaperTrading, new Mock<IAlgorithm>().Object);
var (apiKey, apiKeySecret, isPaperTrading, accessToken) = AlpacaBrokerageTestHelpers.GetConfigParameters();
_alpacaBrokerage = new AlpacaBrokerage(apiKey, apiKeySecret, accessToken, isPaperTrading, new Mock<IAlgorithm>().Object);
}

private static IEnumerable<TestCaseData> TestParameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using System;
using Alpaca.Markets;
using NUnit.Framework;
using QuantConnect.Configuration;

namespace QuantConnect.Brokerages.Alpaca.Tests
{
Expand All @@ -29,7 +28,7 @@ public class AlpacaBrokerageSymbolMapperTests
[OneTimeSetUp]
public void OneTimeSetUp()
{
var (apiKey, apiKeySecret, _) = AlpacaBrokerageTestHelpers.GetConfigParameters();
var (apiKey, apiKeySecret, _, _) = AlpacaBrokerageTestHelpers.GetConfigParameters();

var secretKey = new SecretKey(apiKey, apiKeySecret);
var alpacaTradingClient = Environments.Paper.GetAlpacaTradingClient(secretKey);
Expand Down
10 changes: 5 additions & 5 deletions QuantConnect.AlpacaBrokerage.Tests/AlpacaBrokerageTestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ public static class AlpacaBrokerageTestHelpers
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="isValidateOnEmpty"/> is <c>true</c> and any configuration parameter is null or empty.
/// </exception>
public static (string ApiKey, string ApiKeySecret, bool IsPapperTrading) GetConfigParameters(bool isValidateOnEmpty = true)
public static (string ApiKey, string ApiKeySecret, bool IsPapperTrading, string accessToken) GetConfigParameters(bool isValidateOnEmpty = true)
{
var (apiKey, apiKeySecret, isPaperTrading) = (Config.Get("alpaca-api-key"), Config.Get("alpaca-api-secret"), Config.GetBool("alpaca-paper-trading"));
var (apiKey, apiKeySecret, isPaperTrading, accessToken) = (Config.Get("alpaca-api-key"), Config.Get("alpaca-api-secret"), Config.GetBool("alpaca-paper-trading"), Config.Get("alpaca-access-token"));

if (!isValidateOnEmpty)
{
return (apiKey, apiKeySecret, isPaperTrading);
return (apiKey, apiKeySecret, isPaperTrading, accessToken);
}

if (string.IsNullOrEmpty(apiKey) || string.IsNullOrEmpty(apiKeySecret))
if (string.IsNullOrEmpty(accessToken) && (string.IsNullOrEmpty(apiKey) || string.IsNullOrEmpty(apiKeySecret)))
{
throw new ArgumentNullException("'API Key' or 'Secret Key' or 'Data Feed Provider' cannot be null or empty. Please check your configuration.");
}

return (apiKey, apiKeySecret, isPaperTrading);
return (apiKey, apiKeySecret, isPaperTrading, accessToken);

}
}
4 changes: 2 additions & 2 deletions QuantConnect.AlpacaBrokerage.Tests/AlpacaBrokerageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public partial class AlpacaBrokerageTests : BrokerageTests

protected override IBrokerage CreateBrokerage(IOrderProvider orderProvider, ISecurityProvider securityProvider)
{
var (apiKey, apiKeySecret, isPaperTrading) = AlpacaBrokerageTestHelpers.GetConfigParameters();
var (apiKey, apiKeySecret, isPaperTrading, accessToken) = AlpacaBrokerageTestHelpers.GetConfigParameters();

return new TestAlpacaBrokerage(apiKey, apiKeySecret, isPaperTrading, orderProvider, securityProvider);
return new TestAlpacaBrokerage(apiKey, apiKeySecret, isPaperTrading, orderProvider, securityProvider, accessToken);
}
protected override bool IsAsync() => false;
protected override decimal GetAskPrice(Symbol symbol)
Expand Down
16 changes: 9 additions & 7 deletions QuantConnect.AlpacaBrokerage/AlpacaBrokerage.Messaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,9 @@ private bool Subscribe(IEnumerable<Symbol> symbols)
var streamingClient = GetStreamingDataClient(symbol);
var tradeSubscription = streamingClient.GetTradeSubscription(brokerageSymbol);
tradeSubscription.Received += HandleTradeReceived;
tradeSubscription.OnSubscribedChanged += () => SubscriptionOnSubscribedChanged(symbol, "trade");

var quoteSubscription = streamingClient.GetQuoteSubscription(brokerageSymbol);
quoteSubscription.Received += HandleQuoteReceived;
quoteSubscription.OnSubscribedChanged += () => SubscriptionOnSubscribedChanged(symbol, "quote");

streamingClient.SubscribeAsync(tradeSubscription).ConfigureAwait(false).GetAwaiter().GetResult();
streamingClient.SubscribeAsync(quoteSubscription).ConfigureAwait(false).GetAwaiter().GetResult();
Expand All @@ -59,11 +57,6 @@ private bool Subscribe(IEnumerable<Symbol> symbols)
return true;
}

private void SubscriptionOnSubscribedChanged(Symbol symbol, string flavor)
{
Log.Trace($"{nameof(SubscriptionOnSubscribedChanged)}({symbol.ID}): {flavor}");
}

/// <summary>
/// Removes the specified symbols to the subscription
/// </summary>
Expand All @@ -77,8 +70,17 @@ private bool Unsubscribe(IEnumerable<Symbol> symbols)
var streamingClient = GetStreamingDataClient(symbol);
foreach (var subscription in subscriptions)
{
if (subscription is IAlpacaDataSubscription<IQuote> quoteSubscription)
{
quoteSubscription.Received -= HandleQuoteReceived;
}
else if(subscription is IAlpacaDataSubscription<ITrade> tradeSubscription)
{
tradeSubscription.Received -= HandleTradeReceived;
}
streamingClient.UnsubscribeAsync(subscription).ConfigureAwait(false).GetAwaiter().GetResult();
}
_dataSubscriptionByBrokerageSymbol.TryRemove(_symbolMapper.GetBrokerageSymbol(symbol), out _);
}
}
return true;
Expand Down
37 changes: 37 additions & 0 deletions QuantConnect.AlpacaBrokerage/AlpacaBrokerage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.Net;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using QuantConnect.Configuration;
using QuantConnect.Brokerages.CrossZero;

Expand Down Expand Up @@ -203,6 +204,13 @@ private void StreamingClient_OnError(IStreamingClient client, Exception obj)
private void StreamingClient_SocketClosed(IStreamingClient client)
{
Log.Trace($"{nameof(StreamingClient_SocketClosed)}({client.GetType().Name}): SocketClosed");
if (_connected)
{
_connected = false;
// let consumers know, we will try to reconnect internally, if we can't lean will kill us
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Disconnect, "Disconnected", "Brokerage Disconnected"));
RunReconnectionLogic(5);
}
}

private void StreamingClient_SocketOpened(IStreamingClient client)
Expand Down Expand Up @@ -559,6 +567,35 @@ public override void Connect()
_connected = true;
}

private void RunReconnectionLogic(int secondsDelay)
{
Task.Delay(TimeSpan.FromSeconds(secondsDelay)).ContinueWith(_ =>
{
try
{
Connect();
}
catch (Exception ex)
{
Log.Error(ex);
}
if (!IsConnected)
{
RunReconnectionLogic(60);
}
else
{
// resubscribe
var symbols = _subscriptionManager.GetSubscribedSymbols();
Unsubscribe(symbols);
Subscribe(symbols);
// let consumers know we are reconnected, avoid lean killing us
OnMessage(new BrokerageMessageEvent(BrokerageMessageType.Reconnect, "Reconnected", "Brokerage Reconnected"));
}
});
}

/// <summary>
/// Disconnects the client from the broker's remote servers
/// </summary>
Expand Down

0 comments on commit b02ca6d

Please sign in to comment.