Skip to content
This repository has been archived by the owner on Mar 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #11 from JoseTiagoCarvalho/features/FileExceptionFix
Browse files Browse the repository at this point in the history
PortableTimer: the class uses internaly a timer to execute the Ticks.…
  • Loading branch information
MiguelAlho authored Feb 10, 2017
2 parents 7a69dca + ae820d4 commit c664e28
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 102 deletions.
186 changes: 91 additions & 95 deletions src/Serilog.Sinks.Loggly/Sinks/Loggly/HttpLogShipper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ namespace Serilog.Sinks.Loggly
class HttpLogShipper : IDisposable
{
readonly JsonSerializer _serializer = JsonSerializer.Create();
static readonly TimeSpan RequiredLevelCheckInterval = TimeSpan.FromMinutes(2);

readonly int _batchPostingLimit;
readonly long? _eventBodyLimitBytes;
Expand All @@ -47,8 +46,7 @@ class HttpLogShipper : IDisposable
readonly object _stateLock = new object();
readonly PortableTimer _timer;

ControlledLevelSwitch _controlledSwitch;
DateTime _nextRequiredLevelCheckUtc = DateTime.UtcNow.Add(RequiredLevelCheckInterval);
readonly ControlledLevelSwitch _controlledSwitch;
volatile bool _unloading;

readonly LogglyClient _logglyClient;
Expand All @@ -66,9 +64,9 @@ public HttpLogShipper(
_controlledSwitch = new ControlledLevelSwitch(levelControlSwitch);
_connectionSchedule = new ExponentialBackoffConnectionSchedule(period);
_retainedInvalidPayloadsLimitBytes = retainedInvalidPayloadsLimitBytes;

_logglyClient = new LogglyClient(); //we'll use the loggly client instead of HTTP directly

_bookmarkFilename = Path.GetFullPath(bufferBaseFilename + ".bookmark");
_logFolder = Path.GetDirectoryName(_bookmarkFilename);
_candidateSearchPath = Path.GetFileName(bufferBaseFilename) + "*.json";
Expand Down Expand Up @@ -115,89 +113,91 @@ async Task OnTick()

try
{
int count;
do
// Locking the bookmark ensures that though there may be multiple instances of this
// class running, only one will ship logs at a time.
using (var bookmark = IOFile.Open(_bookmarkFilename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read))
{
count = 0;

// Locking the bookmark ensures that though there may be multiple instances of this
// class running, only one will ship logs at a time.

using (var bookmark = IOFile.Open(_bookmarkFilename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read))
using (var bookmarkStreamReader = new StreamReader(bookmark, Encoding.UTF8, false, 128))
{
long nextLineBeginsAtOffset;
string currentFile;

TryReadBookmark(bookmark, out nextLineBeginsAtOffset, out currentFile);

var fileSet = GetFileSet();

if (currentFile == null || !IOFile.Exists(currentFile))
using (var bookmarkStreamWriter = new StreamWriter(bookmark))
{
nextLineBeginsAtOffset = 0;
currentFile = fileSet.FirstOrDefault();
}

if (currentFile == null)
continue;

//grab the list of pending LogglyEvents from the file
var payload = GetListOfEvents(currentFile, ref nextLineBeginsAtOffset, ref count);

if (count > 0 || _controlledSwitch.IsActive && _nextRequiredLevelCheckUtc < DateTime.UtcNow)
{
lock (_stateLock)
{
_nextRequiredLevelCheckUtc = DateTime.UtcNow.Add(RequiredLevelCheckInterval);
}

//sen the loggly events through the bulk API
var result = await _logglyClient.Log(payload).ConfigureAwait(false);
if (result.Code == ResponseCode.Success)
{
_connectionSchedule.MarkSuccess();
WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFile);
}
else if (result.Code == ResponseCode.Error)
int count;
do
{
// The connection attempt was successful - the payload we sent was the problem.
_connectionSchedule.MarkSuccess();

await DumpInvalidPayload(result, payload).ConfigureAwait(false);
WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFile);
}
else
{
_connectionSchedule.MarkFailure();
SelfLog.WriteLine("Received failed HTTP shipping result {0}: {1}", result.Code, result.Message);
break;
}
count = 0;

long nextLineBeginsAtOffset;
string currentFile;

TryReadBookmark(bookmark, bookmarkStreamReader, out nextLineBeginsAtOffset, out currentFile);

var fileSet = GetFileSet();

if (currentFile == null || !IOFile.Exists(currentFile))
{
nextLineBeginsAtOffset = 0;
currentFile = fileSet.FirstOrDefault();
}

if (currentFile == null)
continue;

//grab the list of pending LogglyEvents from the file
var payload = GetListOfEvents(currentFile, ref nextLineBeginsAtOffset, ref count);

if (count > 0)
{
//sen the loggly events through the bulk API
var result = await _logglyClient.Log(payload).ConfigureAwait(false);
if (result.Code == ResponseCode.Success)
{
_connectionSchedule.MarkSuccess();
WriteBookmark(bookmarkStreamWriter, nextLineBeginsAtOffset, currentFile);
}
else if (result.Code == ResponseCode.Error)
{
// The connection attempt was successful - the payload we sent was the problem.
_connectionSchedule.MarkSuccess();

DumpInvalidPayload(result, payload);
WriteBookmark(bookmarkStreamWriter, nextLineBeginsAtOffset, currentFile);
}
else
{
_connectionSchedule.MarkFailure();
SelfLog.WriteLine("Received failed HTTP shipping result {0}: {1}", result.Code,
result.Message);

break;
}
}
else
{
// For whatever reason, there's nothing waiting to send. This means we should try connecting again at the
// regular interval, so mark the attempt as successful.
_connectionSchedule.MarkSuccess();

// Only advance the bookmark if no other process has the
// current file locked, and its length is as we found it.
if (fileSet.Length == 2 && fileSet.First() == currentFile &&
IsUnlockedAtLength(currentFile, nextLineBeginsAtOffset))
{
WriteBookmark(bookmarkStreamWriter, 0, fileSet[1]);
}

if (fileSet.Length > 2)
{
// Once there's a third file waiting to ship, we do our
// best to move on, though a lock on the current file
// will delay this.

IOFile.Delete(fileSet[0]);
}
}
} while (count == _batchPostingLimit);
}
else
{
// For whatever reason, there's nothing waiting to send. This means we should try connecting again at the
// regular interval, so mark the attempt as successful.
_connectionSchedule.MarkSuccess();

// Only advance the bookmark if no other process has the
// current file locked, and its length is as we found it.
if (fileSet.Length == 2 && fileSet.First() == currentFile && IsUnlockedAtLength(currentFile, nextLineBeginsAtOffset))
{
WriteBookmark(bookmark, 0, fileSet[1]);
}

if (fileSet.Length > 2)
{
// Once there's a third file waiting to ship, we do our
// best to move on, though a lock on the current file
// will delay this.

IOFile.Delete(fileSet[0]);
}
}
}
}
}
while (count == _batchPostingLimit);
}
catch (Exception ex)
{
Expand All @@ -217,7 +217,7 @@ async Task OnTick()
}

const string InvalidPayloadFilePrefix = "invalid-";
async Task DumpInvalidPayload(LogResponse result, IEnumerable<LogglyEvent> payload)
void DumpInvalidPayload(LogResponse result, IEnumerable<LogglyEvent> payload)
{
var invalidPayloadFilename = $"{InvalidPayloadFilePrefix}{result.Code}-{Guid.NewGuid():n}.json";
var invalidPayloadFile = Path.Combine(_logFolder, invalidPayloadFilename);
Expand Down Expand Up @@ -282,9 +282,9 @@ orderby candidateFileInfo.LastAccessTimeUtc descending
}
}

IEnumerable<LogglyEvent> GetListOfEvents(string currentFile, ref long nextLineBeginsAtOffset, ref int count)
List<LogglyEvent> GetListOfEvents(string currentFile, ref long nextLineBeginsAtOffset, ref int count)
{
List<LogglyEvent> events = new List<LogglyEvent>();
var events = new List<LogglyEvent>();

using (var current = IOFile.Open(currentFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
Expand Down Expand Up @@ -360,12 +360,10 @@ static bool IsUnlockedAtLength(string file, long maxLen)
return false;
}

static void WriteBookmark(FileStream bookmark, long nextLineBeginsAtOffset, string currentFile)
static void WriteBookmark(StreamWriter bookmarkStreamWriter, long nextLineBeginsAtOffset, string currentFile)
{
using (var writer = new StreamWriter(bookmark))
{
writer.WriteLine("{0}:::{1}", nextLineBeginsAtOffset, currentFile);
}
bookmarkStreamWriter.WriteLine("{0}:::{1}", nextLineBeginsAtOffset, currentFile);
bookmarkStreamWriter.Flush();
}

// It would be ideal to chomp whitespace here, but not required.
Expand Down Expand Up @@ -395,16 +393,15 @@ static bool TryReadLine(Stream current, ref long nextStart, out string nextLine)
return true;
}

static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, out string currentFile)
static void TryReadBookmark(Stream bookmark, StreamReader bookmarkStreamReader, out long nextLineBeginsAtOffset, out string currentFile)
{
nextLineBeginsAtOffset = 0;
currentFile = null;

if (bookmark.Length != 0)
{
// Important not to dispose this StreamReader as the stream must remain open.
var reader = new StreamReader(bookmark, Encoding.UTF8, false, 128);
var current = reader.ReadLine();
bookmarkStreamReader.BaseStream.Position = 0;
var current = bookmarkStreamReader.ReadLine();

if (current != null)
{
Expand All @@ -416,7 +413,6 @@ static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, ou
currentFile = parts[1];
}
}

}
}

Expand Down
7 changes: 0 additions & 7 deletions src/Serilog.Sinks.Loggly/Sinks/Loggly/PortableTimer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ public void Start(TimeSpan interval)
throw new ObjectDisposedException(nameof(PortableTimer));

_timer.Change(interval, Timeout.InfiniteTimeSpan);

Task.Delay(interval, _cancel.Token)
.ContinueWith(
_ => OnTick(),
CancellationToken.None,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default);
}
}

Expand Down

0 comments on commit c664e28

Please sign in to comment.