diff --git a/src/Serilog.Sinks.Loggly/Sinks/Loggly/HttpLogShipper.cs b/src/Serilog.Sinks.Loggly/Sinks/Loggly/HttpLogShipper.cs index 7860442..371c598 100644 --- a/src/Serilog.Sinks.Loggly/Sinks/Loggly/HttpLogShipper.cs +++ b/src/Serilog.Sinks.Loggly/Sinks/Loggly/HttpLogShipper.cs @@ -34,7 +34,6 @@ namespace Serilog.Sinks.Loggly class HttpLogShipper : IDisposable { readonly JsonSerializer _serializer = JsonSerializer.Create(); - static readonly TimeSpan RequiredLevelCheckInterval = TimeSpan.FromMinutes(2); readonly int _batchPostingLimit; readonly long? _eventBodyLimitBytes; @@ -47,8 +46,7 @@ class HttpLogShipper : IDisposable readonly object _stateLock = new object(); readonly PortableTimer _timer; - ControlledLevelSwitch _controlledSwitch; - DateTime _nextRequiredLevelCheckUtc = DateTime.UtcNow.Add(RequiredLevelCheckInterval); + readonly ControlledLevelSwitch _controlledSwitch; volatile bool _unloading; readonly LogglyClient _logglyClient; @@ -66,9 +64,9 @@ public HttpLogShipper( _controlledSwitch = new ControlledLevelSwitch(levelControlSwitch); _connectionSchedule = new ExponentialBackoffConnectionSchedule(period); _retainedInvalidPayloadsLimitBytes = retainedInvalidPayloadsLimitBytes; - + _logglyClient = new LogglyClient(); //we'll use the loggly client instead of HTTP directly - + _bookmarkFilename = Path.GetFullPath(bufferBaseFilename + ".bookmark"); _logFolder = Path.GetDirectoryName(_bookmarkFilename); _candidateSearchPath = Path.GetFileName(bufferBaseFilename) + "*.json"; @@ -115,89 +113,91 @@ async Task OnTick() try { - int count; - do + // Locking the bookmark ensures that though there may be multiple instances of this + // class running, only one will ship logs at a time. + using (var bookmark = IOFile.Open(_bookmarkFilename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read)) { - count = 0; - - // Locking the bookmark ensures that though there may be multiple instances of this - // class running, only one will ship logs at a time. - - using (var bookmark = IOFile.Open(_bookmarkFilename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read)) + using (var bookmarkStreamReader = new StreamReader(bookmark, Encoding.UTF8, false, 128)) { - long nextLineBeginsAtOffset; - string currentFile; - - TryReadBookmark(bookmark, out nextLineBeginsAtOffset, out currentFile); - - var fileSet = GetFileSet(); - - if (currentFile == null || !IOFile.Exists(currentFile)) + using (var bookmarkStreamWriter = new StreamWriter(bookmark)) { - nextLineBeginsAtOffset = 0; - currentFile = fileSet.FirstOrDefault(); - } - - if (currentFile == null) - continue; - - //grab the list of pending LogglyEvents from the file - var payload = GetListOfEvents(currentFile, ref nextLineBeginsAtOffset, ref count); - - if (count > 0 || _controlledSwitch.IsActive && _nextRequiredLevelCheckUtc < DateTime.UtcNow) - { - lock (_stateLock) - { - _nextRequiredLevelCheckUtc = DateTime.UtcNow.Add(RequiredLevelCheckInterval); - } - - //sen the loggly events through the bulk API - var result = await _logglyClient.Log(payload).ConfigureAwait(false); - if (result.Code == ResponseCode.Success) - { - _connectionSchedule.MarkSuccess(); - WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFile); - } - else if (result.Code == ResponseCode.Error) + int count; + do { - // The connection attempt was successful - the payload we sent was the problem. - _connectionSchedule.MarkSuccess(); - - await DumpInvalidPayload(result, payload).ConfigureAwait(false); - WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFile); - } - else - { - _connectionSchedule.MarkFailure(); - SelfLog.WriteLine("Received failed HTTP shipping result {0}: {1}", result.Code, result.Message); - break; - } + count = 0; + + long nextLineBeginsAtOffset; + string currentFile; + + TryReadBookmark(bookmark, bookmarkStreamReader, out nextLineBeginsAtOffset, out currentFile); + + var fileSet = GetFileSet(); + + if (currentFile == null || !IOFile.Exists(currentFile)) + { + nextLineBeginsAtOffset = 0; + currentFile = fileSet.FirstOrDefault(); + } + + if (currentFile == null) + continue; + + //grab the list of pending LogglyEvents from the file + var payload = GetListOfEvents(currentFile, ref nextLineBeginsAtOffset, ref count); + + if (count > 0) + { + //sen the loggly events through the bulk API + var result = await _logglyClient.Log(payload).ConfigureAwait(false); + if (result.Code == ResponseCode.Success) + { + _connectionSchedule.MarkSuccess(); + WriteBookmark(bookmarkStreamWriter, nextLineBeginsAtOffset, currentFile); + } + else if (result.Code == ResponseCode.Error) + { + // The connection attempt was successful - the payload we sent was the problem. + _connectionSchedule.MarkSuccess(); + + DumpInvalidPayload(result, payload); + WriteBookmark(bookmarkStreamWriter, nextLineBeginsAtOffset, currentFile); + } + else + { + _connectionSchedule.MarkFailure(); + SelfLog.WriteLine("Received failed HTTP shipping result {0}: {1}", result.Code, + result.Message); + + break; + } + } + else + { + // For whatever reason, there's nothing waiting to send. This means we should try connecting again at the + // regular interval, so mark the attempt as successful. + _connectionSchedule.MarkSuccess(); + + // Only advance the bookmark if no other process has the + // current file locked, and its length is as we found it. + if (fileSet.Length == 2 && fileSet.First() == currentFile && + IsUnlockedAtLength(currentFile, nextLineBeginsAtOffset)) + { + WriteBookmark(bookmarkStreamWriter, 0, fileSet[1]); + } + + if (fileSet.Length > 2) + { + // Once there's a third file waiting to ship, we do our + // best to move on, though a lock on the current file + // will delay this. + + IOFile.Delete(fileSet[0]); + } + } + } while (count == _batchPostingLimit); } - else - { - // For whatever reason, there's nothing waiting to send. This means we should try connecting again at the - // regular interval, so mark the attempt as successful. - _connectionSchedule.MarkSuccess(); - - // Only advance the bookmark if no other process has the - // current file locked, and its length is as we found it. - if (fileSet.Length == 2 && fileSet.First() == currentFile && IsUnlockedAtLength(currentFile, nextLineBeginsAtOffset)) - { - WriteBookmark(bookmark, 0, fileSet[1]); - } - - if (fileSet.Length > 2) - { - // Once there's a third file waiting to ship, we do our - // best to move on, though a lock on the current file - // will delay this. - - IOFile.Delete(fileSet[0]); - } - } - } + } } - while (count == _batchPostingLimit); } catch (Exception ex) { @@ -217,7 +217,7 @@ async Task OnTick() } const string InvalidPayloadFilePrefix = "invalid-"; - async Task DumpInvalidPayload(LogResponse result, IEnumerable payload) + void DumpInvalidPayload(LogResponse result, IEnumerable payload) { var invalidPayloadFilename = $"{InvalidPayloadFilePrefix}{result.Code}-{Guid.NewGuid():n}.json"; var invalidPayloadFile = Path.Combine(_logFolder, invalidPayloadFilename); @@ -282,9 +282,9 @@ orderby candidateFileInfo.LastAccessTimeUtc descending } } - IEnumerable GetListOfEvents(string currentFile, ref long nextLineBeginsAtOffset, ref int count) + List GetListOfEvents(string currentFile, ref long nextLineBeginsAtOffset, ref int count) { - List events = new List(); + var events = new List(); using (var current = IOFile.Open(currentFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) { @@ -360,12 +360,10 @@ static bool IsUnlockedAtLength(string file, long maxLen) return false; } - static void WriteBookmark(FileStream bookmark, long nextLineBeginsAtOffset, string currentFile) + static void WriteBookmark(StreamWriter bookmarkStreamWriter, long nextLineBeginsAtOffset, string currentFile) { - using (var writer = new StreamWriter(bookmark)) - { - writer.WriteLine("{0}:::{1}", nextLineBeginsAtOffset, currentFile); - } + bookmarkStreamWriter.WriteLine("{0}:::{1}", nextLineBeginsAtOffset, currentFile); + bookmarkStreamWriter.Flush(); } // It would be ideal to chomp whitespace here, but not required. @@ -395,16 +393,15 @@ static bool TryReadLine(Stream current, ref long nextStart, out string nextLine) return true; } - static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, out string currentFile) + static void TryReadBookmark(Stream bookmark, StreamReader bookmarkStreamReader, out long nextLineBeginsAtOffset, out string currentFile) { nextLineBeginsAtOffset = 0; currentFile = null; if (bookmark.Length != 0) { - // Important not to dispose this StreamReader as the stream must remain open. - var reader = new StreamReader(bookmark, Encoding.UTF8, false, 128); - var current = reader.ReadLine(); + bookmarkStreamReader.BaseStream.Position = 0; + var current = bookmarkStreamReader.ReadLine(); if (current != null) { @@ -416,7 +413,6 @@ static void TryReadBookmark(Stream bookmark, out long nextLineBeginsAtOffset, ou currentFile = parts[1]; } } - } } diff --git a/src/Serilog.Sinks.Loggly/Sinks/Loggly/PortableTimer.cs b/src/Serilog.Sinks.Loggly/Sinks/Loggly/PortableTimer.cs index 29e05f2..6e3a670 100644 --- a/src/Serilog.Sinks.Loggly/Sinks/Loggly/PortableTimer.cs +++ b/src/Serilog.Sinks.Loggly/Sinks/Loggly/PortableTimer.cs @@ -50,13 +50,6 @@ public void Start(TimeSpan interval) throw new ObjectDisposedException(nameof(PortableTimer)); _timer.Change(interval, Timeout.InfiniteTimeSpan); - - Task.Delay(interval, _cancel.Token) - .ContinueWith( - _ => OnTick(), - CancellationToken.None, - TaskContinuationOptions.DenyChildAttach, - TaskScheduler.Default); } }