-
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b7a4bf1
commit bec132e
Showing
3 changed files
with
230 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
104 changes: 104 additions & 0 deletions
104
app/Jobs/ReferendumBucuresti241124/Records/FetchRecordsJob.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace App\Jobs\ReferendumBucuresti241124\Records; | ||
|
||
use App\Jobs\DeleteTemporaryTableData; | ||
use App\Jobs\PersistTemporaryTableData; | ||
use App\Jobs\SchedulableJob; | ||
use App\Jobs\UpdateElectionRecordsTimestamp; | ||
use App\Models\County; | ||
use App\Models\Record; | ||
use App\Models\Vote; | ||
use Illuminate\Support\Facades\Bus; | ||
use Illuminate\Support\Facades\Process; | ||
use Illuminate\Support\Facades\Storage; | ||
use Spatie\TemporaryDirectory\TemporaryDirectory; | ||
|
||
class FetchRecordsJob extends SchedulableJob | ||
{ | ||
public static function name(): string | ||
{ | ||
return 'Referendum Bucuresti 24.11.2024 / Procese Verbale'; | ||
} | ||
|
||
public function execute(): void | ||
{ | ||
$temporaryDirectory = TemporaryDirectory::make() | ||
->deleteWhenDestroyed(); | ||
|
||
$cwd = $temporaryDirectory->path(); | ||
|
||
$tmpDisk = Storage::build([ | ||
'driver' => 'local', | ||
'root' => $cwd, | ||
]); | ||
|
||
$tmpDisk->put('turnout.csv', $this->scheduledJob->fetchSource()->resource()); | ||
|
||
// Split the CSV by county | ||
Process::path($cwd) | ||
->run([ | ||
config('import.awk_path'), | ||
'-F,', | ||
'FNR==1 {header = $0; next} !seen[$1]++ {print header > $1".csv"} {print > $1".csv"}', | ||
'turnout.csv', | ||
]); | ||
|
||
$tmpDisk->delete('turnout.csv'); | ||
|
||
collect($tmpDisk->allFiles()) | ||
->each(function (string $file) use ($tmpDisk) { | ||
$this->scheduledJob->disk() | ||
->writeStream( | ||
$this->scheduledJob->getSourcePath($file), | ||
$tmpDisk->readStream($file) | ||
); | ||
}); | ||
|
||
$electionName = $this->scheduledJob->election->getFilamentName(); | ||
$electionId = $this->scheduledJob->election_id; | ||
|
||
$sourceFiles = collect([ | ||
'42' => 'B', | ||
]); | ||
|
||
$jobs = $sourceFiles | ||
->map(fn (string $countyCode, string $filename) => new ImportRecordsJob($this->scheduledJob, $countyCode, $filename)); | ||
|
||
$time = now()->toDateTimeString(); | ||
|
||
$persistAndClean = fn () => Bus::chain([ | ||
new PersistTemporaryTableData(Record::class, $electionId), | ||
new DeleteTemporaryTableData(Record::class, $electionId), | ||
|
||
new PersistTemporaryTableData(Vote::class, $electionId), | ||
new DeleteTemporaryTableData(Vote::class, $electionId), | ||
])->dispatch(); | ||
|
||
Bus::batch($jobs) | ||
->catch($persistAndClean) | ||
->then($persistAndClean) | ||
->then(fn () => UpdateElectionRecordsTimestamp::dispatch($electionId)) | ||
->name("$electionName / Rezultate / $time") | ||
->allowFailures() | ||
->dispatch(); | ||
} | ||
|
||
/** | ||
* Get the tags that should be assigned to the job. | ||
* | ||
* @return array<int, string> | ||
*/ | ||
public function tags(): array | ||
{ | ||
return [ | ||
'import', | ||
'records', | ||
'scheduled_job:' . $this->scheduledJob->id, | ||
'election:' . $this->scheduledJob->election_id, | ||
static::name(), | ||
]; | ||
} | ||
} |
126 changes: 126 additions & 0 deletions
126
app/Jobs/ReferendumBucuresti241124/Records/ImportRecordsJob.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace App\Jobs\ReferendumBucuresti241124\Records; | ||
|
||
use App\Exceptions\MissingSourceFileException; | ||
use App\Models\County; | ||
use App\Models\Record; | ||
use App\Models\ScheduledJob; | ||
use App\Models\Vote; | ||
use App\Services\RecordService; | ||
use Illuminate\Bus\Batchable; | ||
use Illuminate\Bus\Queueable; | ||
use Illuminate\Contracts\Queue\ShouldQueue; | ||
use Illuminate\Foundation\Bus\Dispatchable; | ||
use Illuminate\Queue\InteractsWithQueue; | ||
use Illuminate\Queue\SerializesModels; | ||
use League\Csv\Reader; | ||
|
||
class ImportRecordsJob implements ShouldQueue | ||
{ | ||
use Batchable; | ||
use Dispatchable; | ||
use InteractsWithQueue; | ||
use Queueable; | ||
use SerializesModels; | ||
|
||
public ScheduledJob $scheduledJob; | ||
|
||
public County $county; | ||
private string $filename; | ||
|
||
|
||
public function __construct(ScheduledJob $scheduledJob, $countyCode, $filename) | ||
{ | ||
$this->scheduledJob = $scheduledJob; | ||
$this->county = County::where('code', $countyCode)->first(); | ||
$this->filename = $filename; | ||
} | ||
|
||
public function handle(): void | ||
{ | ||
$disk = $this->scheduledJob->disk();; | ||
$path = $this->scheduledJob->getSourcePath("{$this->filename}.csv"); | ||
|
||
if (! $disk->exists($path)) { | ||
throw new MissingSourceFileException($path); | ||
} | ||
|
||
$reader = Reader::createFromStream($disk->readStream($path)); | ||
$reader->setHeaderOffset(0); | ||
|
||
$records = collect(); | ||
|
||
$votables = RecordService::generateVotables( | ||
$reader->getHeader(), | ||
$this->scheduledJob->election_id | ||
); | ||
|
||
foreach ($reader->getRecords() as $row) { | ||
$part = RecordService::getPart($row['report_stage_code']); | ||
|
||
$records->push([ | ||
'election_id' => $this->scheduledJob->election_id, | ||
'county_id' => $this->county->id, | ||
'locality_id' => $row['uat_siruta'], | ||
'section' => $row['precinct_nr'], | ||
'part' => $part, | ||
|
||
'eligible_voters_permanent' => $row['a1'], | ||
'eligible_voters_special' => 0, | ||
|
||
'present_voters_permanent' => $row['a2'], | ||
'present_voters_special' => 0, | ||
'present_voters_supliment' => 0, | ||
'present_voters_mail' => 0, //$row['b4'], | ||
|
||
'votes_valid' => (int)$row['a5'] + (int)$row['a6'], | ||
'votes_null' => $row['a7'], | ||
|
||
'papers_received' => $row['a3'], | ||
'papers_unused' => $row['a4'], | ||
|
||
'has_issues' => false, | ||
]); | ||
|
||
$votes = collect(); | ||
|
||
foreach ($votables as $column => $votable) { | ||
$votes->push([ | ||
'election_id' => $this->scheduledJob->election_id, | ||
'county_id' => $this->county->id, | ||
'locality_id' => $row['uat_siruta'], | ||
'section' => $row['precinct_nr'], | ||
'part' => $part, | ||
|
||
'votable_type' => $votable['votable_type'], | ||
'votable_id' => $votable['votable_id'], | ||
|
||
'votes' => $row[$column], | ||
]); | ||
} | ||
|
||
Vote::saveToTemporaryTable($votes->all()); | ||
} | ||
|
||
Record::saveToTemporaryTable($records->all()); | ||
} | ||
|
||
/** | ||
* Get the tags that should be assigned to the job. | ||
* | ||
* @return array<int, string> | ||
*/ | ||
public function tags(): array | ||
{ | ||
return [ | ||
'import', | ||
'records', | ||
'scheduled_job:' . $this->scheduledJob->id, | ||
'election:' . $this->scheduledJob->election_id, | ||
'county:' . $this->county->code, | ||
]; | ||
} | ||
} |