Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
andreiio committed Nov 21, 2024
1 parent fb6d3d8 commit 281f195
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 0 deletions.
97 changes: 97 additions & 0 deletions app/Jobs/Presidential241124/Turnouts/FetchTurnoutsJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?php

declare(strict_types=1);

namespace App\Jobs\Presidential241124\Turnouts;

use App\Jobs\DeleteTemporaryTableData;
use App\Jobs\PersistTemporaryTableData;
use App\Jobs\SchedulableJob;
use App\Models\County;
use App\Models\Turnout;
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Process;
use Illuminate\Support\Facades\Storage;
use Spatie\TemporaryDirectory\TemporaryDirectory;

class FetchTurnoutsJob extends SchedulableJob
{
public static function name(): string
{
return 'Prezidențiale 24.11.2024 / Prezență';
}

public function execute(): void
{
$temporaryDirectory = TemporaryDirectory::make()
->deleteWhenDestroyed();

$cwd = $temporaryDirectory->path();

$tmpDisk = Storage::build([
'driver' => 'local',
'root' => $cwd,
]);

$tmpDisk->put('turnout.csv', $this->scheduledJob->fetchSource()->resource());

// Split the CSV by county
Process::path($cwd)
->run([
config('import.awk_path'),
'-F,',
'FNR==1 {header = $0; next} !seen[$1]++ {print header > $1".csv"} {print > $1".csv"}',
'turnout.csv',
]);

$tmpDisk->delete('turnout.csv');

collect($tmpDisk->allFiles())
->each(function (string $file) use ($tmpDisk) {
$this->scheduledJob->disk()
->writeStream(
$this->scheduledJob->getSourcePath($file),
$tmpDisk->readStream($file)
);
});

$counties = County::all();

$electionName = $this->scheduledJob->election->getFilamentName();
$electionId = $this->scheduledJob->election_id;

$time = now()->toDateTimeString();

$jobs = $counties
->map(fn (County $county) => new ImportCountyTurnoutsJob($this->scheduledJob, $county))
->push(new ImportAbroadTurnoutsJob($this->scheduledJob));

$persistAndClean = fn () => Bus::chain([
new PersistTemporaryTableData(Turnout::class, $electionId),
new DeleteTemporaryTableData(Turnout::class, $electionId),
])->dispatch();

Bus::batch($jobs)
->catch($persistAndClean)
->then($persistAndClean)
->name("$electionName / Prezență / $time")
->allowFailures()
->dispatch();
}

/**
* Get the tags that should be assigned to the job.
*
* @return array<int, string>
*/
public function tags(): array
{
return [
'import',
'turnout',
'scheduled_job:' . $this->scheduledJob->id,
'election:' . $this->scheduledJob->election_id,
static::name(),
];
}
}
118 changes: 118 additions & 0 deletions app/Jobs/Presidential241124/Turnouts/ImportAbroadTurnoutsJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
<?php

declare(strict_types=1);

namespace App\Jobs\Presidential241124\Turnouts;

use App\Events\CountryCodeNotFound;
use App\Exceptions\CountryCodeNotFoundException;
use App\Exceptions\MissingSourceFileException;
use App\Models\Country;
use App\Models\ScheduledJob;
use App\Models\Turnout;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use League\Csv\Reader;

class ImportAbroadTurnoutsJob implements ShouldQueue
{
use Batchable;
use Dispatchable;
use InteractsWithQueue;
use Queueable;
use SerializesModels;

public ScheduledJob $scheduledJob;

public function __construct(ScheduledJob $scheduledJob)
{
$this->scheduledJob = $scheduledJob;
}

public function handle(): void
{
$disk = $this->scheduledJob->disk();
$path = $this->scheduledJob->getSourcePath('SR.csv');

if (! $disk->exists($path)) {
throw new MissingSourceFileException($path);
}

$reader = Reader::createFromStream($disk->readStream($path));
$reader->setHeaderOffset(0);

$values = collect();

$segments = Turnout::segmentsMap();

foreach ($reader->getRecords() as $record) {
try {
$values->push([
'election_id' => $this->scheduledJob->election_id,
'country_id' => $this->getCountryId($record['UAT']),
'section' => $record['Nr sectie de votare'],

'initial_permanent' => $record['Înscriși pe liste permanente'],
'initial_complement' => 0,
'permanent' => $record['LP'],
'complement' => $record['LSC'],
'supplement' => $record['LS'],
'mobile' => $record['UM'],

'area' => $record['Mediu'],
'has_issues' => $this->determineIfHasIssues($record),

...$segments->map(fn (string $segment) => $record[$segment]),
]);
} catch (CountryCodeNotFoundException $th) {
CountryCodeNotFound::dispatch($record['UAT'], $this->scheduledJob->election);
}
}

Turnout::saveToTemporaryTable($values->all());
}

protected function determineIfHasIssues(array $record): bool
{
$computedTotal = collect(['LP', 'LSC', 'LS', 'UM'])
->map(fn (string $key) => $record[$key])
->sum();

if ($computedTotal !== $record['LT']) {
return true;
}

return false;
}

protected function getCountryId(string $name): string
{
$country = Country::search($name)->first();

if (! $country) {
throw new CountryCodeNotFoundException($name);
}

return $country->id;
}

/**
* Get the tags that should be assigned to the job.
*
* @return array<int, string>
*/
public function tags(): array
{
return [
'import',
'turnout',
'scheduled_job:' . $this->scheduledJob->id,
'election:' . $this->scheduledJob->election_id,
'abroad',
];
}
}
106 changes: 106 additions & 0 deletions app/Jobs/Presidential241124/Turnouts/ImportCountyTurnoutsJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

declare(strict_types=1);

namespace App\Jobs\Presidential241124\Turnouts;

use App\Exceptions\MissingSourceFileException;
use App\Models\County;
use App\Models\ScheduledJob;
use App\Models\Turnout;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use League\Csv\Reader;

class ImportCountyTurnoutsJob implements ShouldQueue
{
use Batchable;
use Dispatchable;
use InteractsWithQueue;
use Queueable;
use SerializesModels;

public ScheduledJob $scheduledJob;

public County $county;

public function __construct(ScheduledJob $scheduledJob, County $county)
{
$this->scheduledJob = $scheduledJob;
$this->county = $county;
}

public function handle(): void
{
$disk = $this->scheduledJob->disk();
$path = $this->scheduledJob->getSourcePath("{$this->county->code}.csv");

if (! $disk->exists($path)) {
throw new MissingSourceFileException($path);
}

$reader = Reader::createFromStream($disk->readStream($path));
$reader->setHeaderOffset(0);

$values = collect();

$segments = Turnout::segmentsMap();

$records = $reader->getRecords();
foreach ($records as $record) {
$values->push([
'election_id' => $this->scheduledJob->election_id,
'county_id' => $this->county->id,
'locality_id' => $record['Siruta'],
'section' => $record['Nr sectie de votare'],

'initial_permanent' => $record['Înscriși pe liste permanente'],
'initial_complement' => 0,
'permanent' => $record['LP'],
'complement' => $record['LSC'],
'supplement' => $record['LS'],
'mobile' => $record['UM'],

'area' => $record['Mediu'],
'has_issues' => $this->determineIfHasIssues($record),

...$segments->map(fn (string $segment) => $record[$segment]),
]);
}

Turnout::saveToTemporaryTable($values->all());
}

protected function determineIfHasIssues(array $record): bool
{
$computedTotal = collect(['LP', 'LSC', 'LS', 'UM'])
->map(fn (string $key) => $record[$key])
->sum();

if ($computedTotal !== $record['LT']) {
return true;
}

return false;
}

/**
* Get the tags that should be assigned to the job.
*
* @return array<int, string>
*/
public function tags(): array
{
return [
'import',
'turnout',
'scheduled_job:' . $this->scheduledJob->id,
'election:' . $this->scheduledJob->election_id,
'county:' . $this->county->code,
];
}
}

0 comments on commit 281f195

Please sign in to comment.