Skip to content

Commit

Permalink
Blog post - data processing in php (#1413)
Browse files Browse the repository at this point in the history
* Data Processing in PHP - Blog Post

* Added generator
  • Loading branch information
norberttech authored Jan 27, 2025
1 parent 33571ce commit 22257dc
Show file tree
Hide file tree
Showing 15 changed files with 661 additions and 1 deletion.
8 changes: 8 additions & 0 deletions web/landing/assets/styles/app.css
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ code {
@apply text-blue-100 my-4 border-t-2 rounded;
}

#blog-post blockquote {
@apply border-l-4 border-orange-100 p-2 bg-blue-300 mb-3;
}

#blog-post code {
@apply text-orange-100;
}

#example-description h1 {
@apply font-bold text-2xl;
}
Expand Down
7 changes: 6 additions & 1 deletion web/landing/src/Flow/Website/Blog/Posts.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ final class Posts
'description' => 'Scalar functions are one of the most important building blocks of Flow. Learn how to build and use custom scalar functions in Flow PHP.',
'date' => '2024-08-08',
'slug' => 'scalar-functions',

],
[
'title' => 'Data processing in PHP',
'description' => 'Processing datasets is a common problem in almost any software. Learn how to process datasets in PHP just like it\'s being done in other programming languages.',
'date' => '2025-01-25',
'slug' => 'data-processing-in-php',
],
];

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"id": "uuid",
"country": "string{2}",
"state": "string{2}",
"zip": "string{9}",
"city": "string{256}",
"address_1": "string{256}",
"address_2": "string{256}",
"address_3": "string{256}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_branch;

$report = df()
->read(from_csv(__DIR__ . '/import.csv'))
->with(new Validation())
->write(
to_branch(
ref('valid')->isFalse(),
to_csv(__DIR__ . '/invalid_rows_' . time() . '.csv'),
)
)
->filter(ref('valid')->isTrue())
->drop('valid')
->batchSize(100)
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

use Ramsey\Uuid\Uuid;
use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\join_on;
use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_branch;
use function Flow\ETL\DSL\when;

$report = df()
->read(from_csv(__DIR__ . '/import.csv'))
->with(new Validation())
->write(
to_branch(
ref('valid')->isFalse(),
to_csv(__DIR__ . '/invalid_rows_' . time() . '.csv'),
)
)
->filter(ref('valid')->isTrue())
// at this point all invalid records are stored in another file
->drop('valid')
// we need to extract from the database user_id based on user_email
// lets do it in batches of 100
->batchSize(100)
->joinEach(
new UserIdJoinDataFrameFactory($connection),
join_on(['user_email' => 'user_email'])
)
// user email is no longer needed
->drop('user_email')
// defines the batch size for the insert operation
->batchSize(100)
// when import file does not have address id we need to generate
->withEntry('id', when(ref('id')->isNull(), lit(Uuid::uuid4()->toString()), ref('id')))
->write(
to_dbal_table_insert(
$connection,
'user_addresses',
[
'conflict_columns' => ['id']
]
)
)
->run(analyze: true);


echo 'Total rows: ' . $report->statistics()->totalRows() . PHP_EOL;
echo 'Execution time: ' . $report->statistics()->executionTime->highResolutionTime->toString() . PHP_EOL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
+----------------------+---------+-------+------------+-------------------+-----------------+-----------+-----------+----------------------+
| id | country | state | zip | city | address_1 | address_2 | address_3 | user_id |
+----------------------+---------+-------+------------+-------------------+-----------------+-----------+-----------+----------------------+
| 36a1192d-1c8e-31fc-9 | US | AR | 44389 | Keelingbury | Noemy Burgs | 359 | | 01a3e823-2a69-3432-9 |
| 3d763134-aec2-3a75-8 | US | FL | 71341-9244 | Krisside | Oren Road | 511 | | 01b0abd3-7217-332c-9 |
| b03c6357-8451-3320-9 | US | ID | 65146 | Bernardoview | Tevin Harbors | 4344 | | 01b1a371-1c70-320a-b |
| 476cf6ba-2aac-3b58-9 | US | LA | 64572-0907 | Lake Estel | Harber Ranch | 436 | Motorway | 01c45346-3fb8-3edf-8 |
| b37710e0-26dc-343c-9 | US | MS | 83824-7369 | North Carissafort | Bulah Dale | 8097 | | 01c45346-3fb8-3edf-8 |
| 03eaaf10-644d-3d92-9 | US | OH | 71386 | Cameronshire | Oren Drives | 3828 | | 01c64f97-9ec1-3b55-9 |
| a78970b7-4964-3f91-b | US | MA | 79319 | Port Rodgershire | Parker Mall | 873 | | 01c64f97-9ec1-3b55-9 |
| cc73778c-cb3a-305a-8 | US | NE | 63187 | Parkerfort | Cassandre Ways | 4633 | | 01c64f97-9ec1-3b55-9 |
| | US | NH | 96807 | West Catherine | Schmeler Valley | 9834 | Mills | 01ce6d03-6b8e-3b8a-9 |
| 2419b64c-db25-38af-9 | US | NM | 74053 | Carolstad | Rory Row | 8019 | Crossing | 01ce6d03-6b8e-3b8a-9 |
+----------------------+---------+-------+------------+-------------------+-----------------+-----------+-----------+----------------------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

use function Flow\ETL\Adapter\Doctrine\to_dbal_table_insert;
use function Flow\ETL\DSL\df;

df()
->write(
to_dbal_table_insert(
$connection,
'user_addresses',
[
'conflict_columns' => ['id']
]
)
)
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

use function Flow\ETL\DSL\lit;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\type_null;
use function Flow\ETL\DSL\type_string;
use function Flow\ETL\DSL\type_uuid;
use function Flow\ETL\DSL\when;

df()
->withEntry('valid', lit(true))
->withEntry('valid', when(ref('id')->isType(type_uuid(true)), ref('valid'), lit(false)))
->withEntry('valid', when(ref('user_email')->isType(type_string())->and(ref('user_email')->size()->between(1, 256)), ref('valid'), lit(false)))
->withEntry('valid', when(ref('country')->isType(type_string())->and(ref('country')->size()->equals(2)), ref('valid'), lit(false)))
->withEntry('valid', when(ref('state')->isType(type_string())->and(ref('state')->size()->equals(2)), ref('valid'), lit(false)))
->withEntry('valid', when(ref('city')->isType(type_string())->and(ref('city')->size()->between(1, 256)), ref('valid'), lit(false)))
->withEntry('valid', when(ref('zip')->isType(type_string())->and(ref('zip')->size()->between(4, 12)), ref('valid'), lit(false)))
->withEntry('valid', when(ref('address_1')->isType(type_string())->and(ref('address_1')->size()->between(1, 256)), ref('valid'), lit(false)))
->withEntry('valid', when(ref('address_2')->isType(type_null())->or(ref('address_2')->size()->between(1, 256)), ref('valid'), lit(false)))
->withEntry('valid', when(ref('address_3')->isType(type_null())->or(ref('address_3')->size()->between(1, 256)), ref('valid'), lit(false)))
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\df;
use function Flow\ETL\DSL\join_on;
use function Flow\ETL\DSL\ref;
use function Flow\ETL\DSL\to_branch;

$report = df()
->read(from_csv(__DIR__ . '/import.csv'))
->with(new Validation())
->write(
to_branch(
ref('valid')->isFalse(),
to_csv(__DIR__ . '/invalid_rows_' . time() . '.csv'),
)
)
->filter(ref('valid')->isTrue())
->drop('valid')
->batchSize(100)
->joinEach(
new UserIdJoinDataFrameFactory($connection),
join_on(['user_email' => 'user_email'])
)
->drop('user_email')
;
Loading

0 comments on commit 22257dc

Please sign in to comment.