Skip to content

Commit

Permalink
Added execution time to pipeline report
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech committed Jan 16, 2025
1 parent 31e927e commit 3dc9196
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 30 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"monolog/monolog": "^2.0||^3.0",
"packaged/thrift": "^0.15.0",
"php-http/discovery": "^1.0",
"psr/clock": "^1.0",
"psr/http-client": "^1.0",
"psr/http-message": "^1.0 || ^2.0",
"psr/log": "^2.0 || ^3.0",
Expand Down
54 changes: 51 additions & 3 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/core/etl/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"php": "~8.2.0 || ~8.3.0 || ~8.4.0",
"ext-json": "*",
"ext-mbstring": "*",
"psr/clock": "^1.0",
"flow-php/array-dot": "^0.10.0 || 1.x-dev",
"flow-php/rdsl": "^0.10.0 || 1.x-dev",
"flow-php/filesystem": "^0.10.0 || 1.x-dev",
Expand Down
29 changes: 29 additions & 0 deletions src/core/etl/src/Flow/Clock/FakeClock.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Flow\Clock;

use Psr\Clock\ClockInterface;

final class FakeClock implements ClockInterface
{
public function __construct(private \DateTimeImmutable $dateTime = new \DateTimeImmutable('now'))
{
}

public function modify(string $modify) : void
{
$this->dateTime = $this->dateTime->modify($modify);
}

public function now() : \DateTimeImmutable
{
return $this->dateTime;
}

public function set(\DateTimeImmutable $dateTime) : void
{
$this->dateTime = $dateTime;
}
}
29 changes: 29 additions & 0 deletions src/core/etl/src/Flow/Clock/SystemClock.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Flow\Clock;

use Psr\Clock\ClockInterface;

final readonly class SystemClock implements ClockInterface
{
public function __construct(private \DateTimeZone $timezone)
{
}

public static function system() : self

Check warning on line 15 in src/core/etl/src/Flow/Clock/SystemClock.php

View check run for this annotation

Codecov / codecov/patch

src/core/etl/src/Flow/Clock/SystemClock.php#L15

Added line #L15 was not covered by tests
{
return new self(new \DateTimeZone(date_default_timezone_get()));

Check warning on line 17 in src/core/etl/src/Flow/Clock/SystemClock.php

View check run for this annotation

Codecov / codecov/patch

src/core/etl/src/Flow/Clock/SystemClock.php#L17

Added line #L17 was not covered by tests
}

public static function utc() : self
{
return new self(new \DateTimeZone('UTC'));
}

public function now() : \DateTimeImmutable
{
return new \DateTimeImmutable('now', $this->timezone);
}
}
7 changes: 7 additions & 0 deletions src/core/etl/src/Flow/ETL/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Flow\ETL\Row\EntryFactory;
use Flow\Filesystem\{FilesystemTable};
use Flow\Serializer\Serializer;
use Psr\Clock\ClockInterface;

/**
* Immutable configuration that can be used to initialize many contexts.
Expand All @@ -33,6 +34,7 @@
public function __construct(
private string $id,
private Serializer $serializer,
private ClockInterface $clock,
private FilesystemTable $filesystemTable,
private FilesystemStreams $filesystemStreams,
private Optimizer $optimizer,
Expand All @@ -59,6 +61,11 @@ public function caster() : Caster
return $this->caster;
}

public function clock() : ClockInterface
{
return $this->clock;
}

public function entryFactory() : EntryFactory
{
return $this->entryFactory;
Expand Down
15 changes: 14 additions & 1 deletion src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow\ETL\Config;

use function Flow\Filesystem\DSL\fstab;
use Flow\Clock\SystemClock;
use Flow\ETL\Config\Cache\CacheConfigBuilder;
use Flow\ETL\Config\Sort\SortConfigBuilder;
use Flow\ETL\Filesystem\FilesystemStreams;
Expand All @@ -15,6 +16,7 @@
use Flow\ETL\{Cache, Config, NativePHPRandomValueGenerator, RandomValueGenerator};
use Flow\Filesystem\{Filesystem, FilesystemTable};
use Flow\Serializer\{Base64Serializer, NativePHPSerializer, Serializer};
use Psr\Clock\ClockInterface;

final class ConfigBuilder
{
Expand All @@ -24,6 +26,8 @@ final class ConfigBuilder

private ?Caster $caster;

private ?ClockInterface $clock;

private ?FilesystemTable $fstab;

private ?string $id;
Expand All @@ -44,6 +48,7 @@ public function __construct()
$this->putInputIntoRows = false;
$this->optimizer = null;
$this->caster = null;
$this->clock = null;
$this->cache = new CacheConfigBuilder();
$this->sort = new SortConfigBuilder();
$this->randomValueGenerator = new NativePHPRandomValueGenerator();
Expand All @@ -54,7 +59,7 @@ public function build() : Config
$this->id ??= 'flow_php' . $this->randomValueGenerator->string(32);
$entryFactory = new NativeEntryFactory();
$this->serializer ??= new Base64Serializer(new NativePHPSerializer());

$this->clock ??= SystemClock::utc();
$this->optimizer ??= new Optimizer(
new Optimizer\LimitOptimization(),
new Optimizer\BatchSizeOptimization(batchSize: 1000)
Expand All @@ -65,6 +70,7 @@ public function build() : Config
return new Config(
$this->id,
$this->serializer,
$this->clock,
$this->fstab(),
new FilesystemStreams($this->fstab()),
$this->optimizer,
Expand All @@ -83,6 +89,13 @@ public function cache(Cache $cache) : self
return $this;
}

public function clock(ClockInterface $clocks) : self
{
$this->clock = $clocks;

return $this;
}

public function dontPutInputIntoRows() : self
{
$this->putInputIntoRows = false;
Expand Down
12 changes: 9 additions & 3 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ public function rows(Transformer|Transformation $transformer) : self
/**
* @trigger
*
* @param null|callable(Rows $rows): void $callback
* @param null|callable(Rows $rows, FlowContext $context): void $callback
* @param bool $analyze - when set to true, run will return Report
*/
#[DSLMethod(exclude: true)]
Expand All @@ -756,9 +756,13 @@ public function run(?callable $callback = null, bool $analyze = false) : ?Report
$totalRows = 0;
$schema = new Schema();

if ($analyze) {
$startedAt = $this->context->config->clock()->now();
}

foreach ($clone->pipeline->process($clone->context) as $rows) {
if ($callback !== null) {
$callback($rows);
$callback($rows, $clone->context);
}

if ($analyze) {
Expand All @@ -768,7 +772,9 @@ public function run(?callable $callback = null, bool $analyze = false) : ?Report
}

if ($analyze) {
return new Report($schema, new Statistics($totalRows));
$endedAt = $this->context->config->clock()->now();

return new Report($schema, new Statistics($totalRows, new Statistics\ExecutionTime($startedAt, $endedAt)));
}

return null;
Expand Down
3 changes: 3 additions & 0 deletions src/core/etl/src/Flow/ETL/Dataset/Statistics.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Flow\ETL\Dataset;

use Flow\ETL\Dataset\Statistics\ExecutionTime;

final readonly class Statistics
{
public function __construct(
private int $totalRows,
public readonly ExecutionTime $executionTime,
) {
}

Expand Down
27 changes: 27 additions & 0 deletions src/core/etl/src/Flow/ETL/Dataset/Statistics/ExecutionTime.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Dataset\Statistics;

use Flow\ETL\Exception\InvalidArgumentException;

final readonly class ExecutionTime
{
public function __construct(public \DateTimeImmutable $startedAt, public \DateTimeImmutable $finishedAt)
{
if ($startedAt > $finishedAt) {
throw new InvalidArgumentException('Execution start date must be before finish date');

Check warning on line 14 in src/core/etl/src/Flow/ETL/Dataset/Statistics/ExecutionTime.php

View check run for this annotation

Codecov / codecov/patch

src/core/etl/src/Flow/ETL/Dataset/Statistics/ExecutionTime.php#L14

Added line #L14 was not covered by tests
}
}

public function duration() : \DateInterval

Check warning on line 18 in src/core/etl/src/Flow/ETL/Dataset/Statistics/ExecutionTime.php

View check run for this annotation

Codecov / codecov/patch

src/core/etl/src/Flow/ETL/Dataset/Statistics/ExecutionTime.php#L18

Added line #L18 was not covered by tests
{
return $this->startedAt->diff($this->finishedAt);

Check warning on line 20 in src/core/etl/src/Flow/ETL/Dataset/Statistics/ExecutionTime.php

View check run for this annotation

Codecov / codecov/patch

src/core/etl/src/Flow/ETL/Dataset/Statistics/ExecutionTime.php#L20

Added line #L20 was not covered by tests
}

public function inSeconds() : int
{
return $this->finishedAt->getTimestamp() - $this->startedAt->getTimestamp();
}
}
31 changes: 31 additions & 0 deletions src/core/etl/tests/Flow/Clock/SystemClockTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Flow\Clock;

use PHPUnit\Framework\TestCase;

final class SystemClockTest extends TestCase
{
public function test_now() : void
{
$clock = SystemClock::system();

self::assertInstanceOf(\DateTimeImmutable::class, $clock->now());
}

public function test_system_clock() : void
{
$clock = SystemClock::system();

self::assertInstanceOf(SystemClock::class, $clock);
}

public function test_utc_clock() : void
{
$clock = SystemClock::utc();

self::assertInstanceOf(SystemClock::class, $clock);
}
}
Loading

0 comments on commit 3dc9196

Please sign in to comment.