Skip to content

Commit

Permalink
Merge pull request #58 from NicolasConstant/develop
Browse files Browse the repository at this point in the history
0.9.0 PR
  • Loading branch information
NicolasConstant authored Jan 18, 2021
2 parents be13b6c + 05fa5df commit cd4d30b
Show file tree
Hide file tree
Showing 18 changed files with 157 additions and 93 deletions.
19 changes: 0 additions & 19 deletions src/BirdsiteLive.Cryptography/MagicKey.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,6 @@ public static MagicKey Generate()
return new MagicKey(JsonConvert.SerializeObject(RSAKeyParms.From(rsa.ExportParameters(true))));
}

//public static async Task<MagicKey> KeyForAuthor(ASObject obj)
//{
// var authorId = (string)obj["email"].FirstOrDefault()?.Primitive;
// if (authorId == null)
// {
// authorId = obj["name"].FirstOrDefault()?.Primitive + "@" + new Uri(obj.Id).Host;
// }

// var domain = authorId.Split('@')[1];
// var hc = new HttpClient();
// var wf = JsonConvert.DeserializeObject<WebfingerResult>(await hc.GetStringAsync($"https://{domain}/.well-known/webfinger?resource=acct:{Uri.EscapeDataString(authorId)}"));
// var link = wf.links.FirstOrDefault(a => a.rel == "magic-public-key");
// if (link == null) return null;

// if (!link.href.StartsWith("data:")) return null; // does this happen?

// return new MagicKey(link.href.Split(new char[] { ',' }, 2)[1]);
//}

public byte[] BuildSignedData(string data, string dataType, string encoding, string algorithm)
{
var sig = data + "." + _encodeBase64Url(Encoding.UTF8.GetBytes(dataType)) + "." + _encodeBase64Url(Encoding.UTF8.GetBytes(encoding)) + "." + _encodeBase64Url(Encoding.UTF8.GetBytes(algorithm));
Expand Down
29 changes: 14 additions & 15 deletions src/BirdsiteLive.Domain/ActivityPubService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,42 @@ public interface IActivityPubService
{
Task<Actor> GetUser(string objectId);
Task<HttpStatusCode> PostDataAsync<T>(T data, string targetHost, string actorUrl, string inbox = null);
Task<HttpStatusCode> PostNewNoteActivity(Note note, string username, string noteId, string targetHost,
Task PostNewNoteActivity(Note note, string username, string noteId, string targetHost,
string targetInbox);
}

public class ActivityPubService : IActivityPubService
{
private readonly InstanceSettings _instanceSettings;
private readonly IHttpClientFactory _httpClientFactory;
private readonly ICryptoService _cryptoService;

#region Ctor
public ActivityPubService(ICryptoService cryptoService, InstanceSettings instanceSettings)
public ActivityPubService(ICryptoService cryptoService, InstanceSettings instanceSettings, IHttpClientFactory httpClientFactory)
{
_cryptoService = cryptoService;
_instanceSettings = instanceSettings;
_httpClientFactory = httpClientFactory;
}
#endregion

public async Task<Actor> GetUser(string objectId)
{
using (var httpClient = new HttpClient())
{
httpClient.DefaultRequestHeaders.Add("Accept", "application/json");
var result = await httpClient.GetAsync(objectId);
var content = await result.Content.ReadAsStringAsync();
return JsonConvert.DeserializeObject<Actor>(content);
}
var httpClient = _httpClientFactory.CreateClient();
httpClient.DefaultRequestHeaders.Add("Accept", "application/json");
var result = await httpClient.GetAsync(objectId);
var content = await result.Content.ReadAsStringAsync();
return JsonConvert.DeserializeObject<Actor>(content);
}

public async Task<HttpStatusCode> PostNewNoteActivity(Note note, string username, string noteId, string targetHost, string targetInbox)
public async Task PostNewNoteActivity(Note note, string username, string noteId, string targetHost, string targetInbox)
{
var actor = UrlFactory.GetActorUrl(_instanceSettings.Domain, username);
var noteUri = UrlFactory.GetNoteUrl(_instanceSettings.Domain, username, noteId);

var now = DateTime.UtcNow;
var nowString = now.ToString("s") + "Z";

var noteActivity = new ActivityCreateNote()
{
context = "https://www.w3.org/ns/activitystreams",
Expand All @@ -67,7 +67,7 @@ public async Task<HttpStatusCode> PostNewNoteActivity(Note note, string username
apObject = note
};

return await PostDataAsync(noteActivity, targetHost, actor, targetInbox);
await PostDataAsync(noteActivity, targetHost, actor, targetInbox);
}

public async Task<HttpStatusCode> PostDataAsync<T>(T data, string targetHost, string actorUrl, string inbox = null)
Expand All @@ -85,7 +85,7 @@ public async Task<HttpStatusCode> PostDataAsync<T>(T data, string targetHost, st

var signature = _cryptoService.SignAndGetSignatureHeader(date, actorUrl, targetHost, digest, usedInbox);

var client = new HttpClient();
var client = _httpClientFactory.CreateClient();
var httpRequestMessage = new HttpRequestMessage
{
Method = HttpMethod.Post,
Expand All @@ -101,9 +101,8 @@ public async Task<HttpStatusCode> PostDataAsync<T>(T data, string targetHost, st
};

var response = await client.SendAsync(httpRequestMessage);
response.EnsureSuccessStatusCode();
return response.StatusCode;
}


}
}
4 changes: 4 additions & 0 deletions src/BirdsiteLive.Domain/BirdsiteLive.Domain.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Http" Version="5.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\BirdsiteLive.ActivityPub\BirdsiteLive.ActivityPub.csproj" />
<ProjectReference Include="..\BirdsiteLive.Cryptography\BirdsiteLive.Cryptography.csproj" />
Expand Down
4 changes: 2 additions & 2 deletions src/BirdsiteLive.Domain/UserService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public async Task<bool> FollowRequestedAsync(string signature, string method, st
}
};
var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject);
return result == HttpStatusCode.Accepted || result == HttpStatusCode.OK;
return result == HttpStatusCode.Accepted || result == HttpStatusCode.OK; //TODO: revamp this for better error handling
}

private string OnlyKeepRoute(string inbox, string host)
Expand Down Expand Up @@ -188,7 +188,7 @@ public async Task<bool> UndoFollowRequestedAsync(string signature, string method
}
};
var result = await _activityPubService.PostDataAsync(acceptFollow, followerHost, activity.apObject.apObject);
return result == HttpStatusCode.Accepted || result == HttpStatusCode.OK;
return result == HttpStatusCode.Accepted || result == HttpStatusCode.OK; //TODO: revamp this for better error handling
}

private async Task<SignatureValidationResult> ValidateSignature(string actor, string rawSig, string method, string path, string queryString, Dictionary<string, string> requestHeaders, string body)
Expand Down
3 changes: 2 additions & 1 deletion src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
using Microsoft.Extensions.Logging;

namespace BirdsiteLive.Pipeline.Processors
{
public class RetrieveTwitterUsersProcessor : IRetrieveTwitterUsersProcessor
{
private readonly ITwitterUserDal _twitterUserDal;
private readonly ILogger<RetrieveTwitterUsersProcessor> _logger;
private const int SyncPeriod = 15; //in minutes

#region Ctor
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal)
public RetrieveTwitterUsersProcessor(ITwitterUserDal twitterUserDal, ILogger<RetrieveTwitterUsersProcessor> logger)
{
_twitterUserDal = twitterUserDal;
_logger = logger;
}
#endregion

Expand All @@ -35,8 +38,7 @@ public async Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUse
}
catch (Exception e)
{
Console.WriteLine(e);
//TODO handle error
_logger.LogError(e, "Failing retrieving Twitter Users.");
}

await Task.Delay(SyncPeriod * 1000 * 60, ct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using BirdsiteLive.Pipeline.Processors.SubTasks;
using BirdsiteLive.Twitter;
using BirdsiteLive.Twitter.Models;
using Microsoft.Extensions.Logging;
using Tweetinvi.Models;

namespace BirdsiteLive.Pipeline.Processors
Expand All @@ -21,12 +22,14 @@ public class SendTweetsToFollowersProcessor : ISendTweetsToFollowersProcessor
{
private readonly ISendTweetsToInboxTask _sendTweetsToInboxTask;
private readonly ISendTweetsToSharedInboxTask _sendTweetsToSharedInbox;
private readonly ILogger<SendTweetsToFollowersProcessor> _logger;

#region Ctor
public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox)
public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, ILogger<SendTweetsToFollowersProcessor> logger)
{
_sendTweetsToInboxTask = sendTweetsToInboxTask;
_sendTweetsToSharedInbox = sendTweetsToSharedInbox;
_logger = logger;
}
#endregion

Expand Down Expand Up @@ -61,8 +64,8 @@ private async Task ProcessFollowersWithSharedInbox(ExtractedTweet[] tweets, List
}
catch (Exception e)
{
Console.WriteLine(e);
//TODO handle error
var follower = followersPerInstance.First();
_logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.SharedInboxRoute);
}
}
}
Expand All @@ -77,8 +80,7 @@ private async Task ProcessFollowersWithInbox(ExtractedTweet[] tweets, List<Follo
}
catch (Exception e)
{
Console.WriteLine(e);
//TODO handle error
_logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.InboxRoute);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,8 @@ public async Task ExecuteAsync(IEnumerable<ExtractedTweet> tweets, Follower foll
foreach (var tweet in tweetsToSend)
{
var note = _statusService.GetStatus(user.Acct, tweet);
var result = await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox);

if (result == HttpStatusCode.Accepted || result == HttpStatusCode.OK)
syncStatus = tweet.Id;
else
throw new Exception("Posting new note activity failed");
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), follower.Host, inbox);
syncStatus = tweet.Id;
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class SendTweetsToSharedInboxTask : ISendTweetsToSharedInboxTask
private readonly IStatusService _statusService;
private readonly IActivityPubService _activityPubService;
private readonly IFollowersDal _followersDal;

#region Ctor
public SendTweetsToSharedInboxTask(IActivityPubService activityPubService, IStatusService statusService, IFollowersDal followersDal)
{
Expand Down Expand Up @@ -48,13 +48,8 @@ public async Task ExecuteAsync(ExtractedTweet[] tweets, SyncTwitterUser user, st
foreach (var tweet in tweetsToSend)
{
var note = _statusService.GetStatus(user.Acct, tweet);
var result =
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox);

if (result == HttpStatusCode.Accepted || result == HttpStatusCode.OK)
syncStatus = tweet.Id;
else
throw new Exception("Posting new note activity failed");
await _activityPubService.PostNewNoteActivity(note, user.Acct, tweet.Id.ToString(), host, inbox);
syncStatus = tweet.Id;
}
}
finally
Expand Down
21 changes: 11 additions & 10 deletions src/BirdsiteLive.Pipeline/StatusPublicationPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
using Microsoft.Extensions.Logging;

namespace BirdsiteLive.Pipeline
{
Expand All @@ -19,29 +20,31 @@ public class StatusPublicationPipeline : IStatusPublicationPipeline
private readonly IRetrieveTweetsProcessor _retrieveTweetsProcessor;
private readonly IRetrieveFollowersProcessor _retrieveFollowersProcessor;
private readonly ISendTweetsToFollowersProcessor _sendTweetsToFollowersProcessor;

private readonly ILogger<StatusPublicationPipeline> _logger;

#region Ctor
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor)
public StatusPublicationPipeline(IRetrieveTweetsProcessor retrieveTweetsProcessor, IRetrieveTwitterUsersProcessor retrieveTwitterAccountsProcessor, IRetrieveFollowersProcessor retrieveFollowersProcessor, ISendTweetsToFollowersProcessor sendTweetsToFollowersProcessor, ILogger<StatusPublicationPipeline> logger)
{
_retrieveTweetsProcessor = retrieveTweetsProcessor;
_retrieveTwitterAccountsProcessor = retrieveTwitterAccountsProcessor;
_retrieveFollowersProcessor = retrieveFollowersProcessor;
_sendTweetsToFollowersProcessor = sendTweetsToFollowersProcessor;
_logger = logger;
}
#endregion

public async Task ExecuteAsync(CancellationToken ct)
{
// Create blocks
var twitterUsersBufferBlock = new BufferBlock<SyncTwitterUser[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct});
var twitterUsersBufferBlock = new BufferBlock<SyncTwitterUser[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveTweetsBlock = new TransformBlock<SyncTwitterUser[], UserWithTweetsToSync[]>(async x => await _retrieveTweetsProcessor.ProcessAsync(x, ct));
var retrieveTweetsBufferBlock = new BufferBlock<UserWithTweetsToSync[]>(new DataflowBlockOptions { BoundedCapacity = 1, CancellationToken = ct });
var retrieveFollowersBlock = new TransformManyBlock<UserWithTweetsToSync[], UserWithTweetsToSync>(async x => await _retrieveFollowersProcessor.ProcessAsync(x, ct));
var retrieveFollowersBufferBlock = new BufferBlock<UserWithTweetsToSync>(new DataflowBlockOptions { BoundedCapacity = 20, CancellationToken = ct });
var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct});
var sendTweetsToFollowersBlock = new ActionBlock<UserWithTweetsToSync>(async x => await _sendTweetsToFollowersProcessor.ProcessAsync(x, ct), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, CancellationToken = ct });

// Link pipeline
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions {PropagateCompletion = true});
twitterUsersBufferBlock.LinkTo(retrieveTweetsBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBlock.LinkTo(retrieveTweetsBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveTweetsBufferBlock.LinkTo(retrieveFollowersBlock, new DataflowLinkOptions { PropagateCompletion = true });
retrieveFollowersBlock.LinkTo(retrieveFollowersBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });
Expand All @@ -51,12 +54,10 @@ public async Task ExecuteAsync(CancellationToken ct)
var retrieveTwitterAccountsTask = _retrieveTwitterAccountsProcessor.GetTwitterUsersAsync(twitterUsersBufferBlock, ct);

// Wait
await Task.WhenAny(new []{ retrieveTwitterAccountsTask , sendTweetsToFollowersBlock.Completion});
await Task.WhenAny(new[] { retrieveTwitterAccountsTask, sendTweetsToFollowersBlock.Completion });

var foreground = Console.ForegroundColor;
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("An error occured, pipeline stopped");
Console.ForegroundColor = foreground;
var ex = retrieveTwitterAccountsTask.IsFaulted ? retrieveTwitterAccountsTask.Exception : sendTweetsToFollowersBlock.Completion.Exception;
_logger.LogCritical(ex, "An error occurred, pipeline stopped");
}
}
}
1 change: 1 addition & 0 deletions src/BirdsiteLive.Twitter/BirdsiteLive.Twitter.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="5.0.0" />
<PackageReference Include="TweetinviAPI" Version="4.0.3" />
</ItemGroup>

Expand Down
52 changes: 52 additions & 0 deletions src/BirdsiteLive.Twitter/CachedTwitterService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using BirdsiteLive.Twitter.Models;
using Microsoft.Extensions.Caching.Memory;

namespace BirdsiteLive.Twitter
{
public class CachedTwitterService : ITwitterService
{
private readonly ITwitterService _twitterService;

private MemoryCache _userCache = new MemoryCache(new MemoryCacheOptions()
{
SizeLimit = 5000
});
private MemoryCacheEntryOptions _cacheEntryOptions = new MemoryCacheEntryOptions()
.SetSize(1)//Size amount
//Priority on removing when reaching size limit (memory pressure)
.SetPriority(CacheItemPriority.High)
// Keep in cache for this time, reset time if accessed.
.SetSlidingExpiration(TimeSpan.FromHours(24))
// Remove from cache after this time, regardless of sliding expiration
.SetAbsoluteExpiration(TimeSpan.FromDays(30));

#region Ctor
public CachedTwitterService(ITwitterService twitterService)
{
_twitterService = twitterService;
}
#endregion

public TwitterUser GetUser(string username)
{
if (!_userCache.TryGetValue(username, out TwitterUser user))
{
user = _twitterService.GetUser(username);
_userCache.Set(username, user, _cacheEntryOptions);
}

return user;
}

public ExtractedTweet GetTweet(long statusId)
{
return _twitterService.GetTweet(statusId);
}

public ExtractedTweet[] GetTimeline(string username, int nberTweets, long fromTweetId = -1)
{
return _twitterService.GetTimeline(username, nberTweets, fromTweetId);
}
}
}
Loading

0 comments on commit cd4d30b

Please sign in to comment.