Skip to content

Commit

Permalink
Merge pull request #19 from embedded-insurance/effect-schema-spark
Browse files Browse the repository at this point in the history
feat: add effect-schema-spark
  • Loading branch information
alex-dixon authored Jul 13, 2024
2 parents f9a689f + b78f817 commit 9c8372e
Show file tree
Hide file tree
Showing 9 changed files with 1,274 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/eight-coins-attack.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@diachronic/effect-schema-spark": patch
---

feat: add effect-schema-spark
103 changes: 103 additions & 0 deletions packages/effect-schema-spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# @diachronic/effect-schema-spark

> Generate Apache Spark schema from `@effect/schema` type definitions
## Usage

```ts
import * as S from '@effect/schema/Schema'
import { sparkSchemaFor } from '@diachronic/effect-schema-spark'

const GetUserActivityOutput = S.Struct({
id: S.Int,
name: S.String,
roles: S.optional(S.Array(S.String)),
})

sparkSchemaFor(GetUserActivityOutput)

// {
// "type": "struct",
// "fields": [
// {
// "type": "string",
// "name": "name",
// "nullable": false,
// "metadata": {}
// },
// {
// "type": "integer",
// "name": "id",
// "nullable": false,
// "metadata": {}
// },
// {
// "type": {
// "type": "array",
// "elementType": "string",
// "containsNull": false
// },
// "name": "roles",
// "nullable": true,
// "metadata": {}
// }
// ]
// }
```

## Usage in Spark
If you have CDC hooked up to Temporal (see https://github.com/embedded-insurance/diachronic/tree/main/packages/cdc) you can easily derive schemas for all types used in Temporal workflows (signals, activity input/output/error, workflow input/output/error).


```python
get_user_output = """{
"type": "struct",
"fields": [
{
"type": "string",
"name": "name",
"nullable": false,
"metadata": {}
},
{
"type": "integer",
"name": "id",
"nullable": false,
"metadata": {}
},
{
"type": {
"type": "array",
"elementType": "string",
"containsNull": false
},
"name": "roles",
"nullable": true,
"metadata": {}
}
]
}"""

schemas = {'getUser': {'output': get_user_output}}

from pyspark.sql.functions import *

df = (
spark.table("temporal.activity_calls")
.where(col("activity_type") == 'getUser')
.withColumn("output", from_json("output", schemas['getUser']['output']))
)

```

## Overview

The package aims to be as permissive as possible.

If a type can be translated from Effect Schema, the goal is to do it even if the type is very general with respect to
the original.

Warnings are printed when types cannot be generated. Errors are thrown in only when a schema cannot be produced at all.

This is in part because Spark Schema is much less specific than Effect Schema, and because in our experience it is
better to have some description of the data than none at all.
10 changes: 10 additions & 0 deletions packages/effect-schema-spark/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const config = {
verbose: true,
testEnvironment: 'node',
preset: 'ts-jest',
testMatch: ['<rootDir>/test/**/*.ts'],
testPathIgnorePatterns: ['test/lib', 'test/manual'],
passWithNoTests: true,
}

export default config
49 changes: 49 additions & 0 deletions packages/effect-schema-spark/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"name": "@diachronic/effect-schema-spark",
"version": "0.0.0",
"description": "Generate Apache Spark schema from `@effect/schema` type definitions",
"main": "src/index.ts",
"exports": {
".": "./src/index.ts",
"./*": "./src/*.ts"
},
"scripts": {
"clean": "rimraf node_modules & rimraf dist & rimraf .turbo",
"test": "jest",
"format": "prettier --write .",
"typecheck": "tsc --noEmit"
},
"keywords": [],
"author": {
"name": "Embedded Insurance",
"email": "[email protected]",
"url": "https://embeddedinsurance.com"
},
"license": "MIT",
"dependencies": {
"@effect/schema": "0.36.3",
"effect": "^2.0.0-next.36",
"ramda": "^0.29.1"
},
"devDependencies": {
"@types/jest": "^29.5.3",
"@types/node": "18",
"@types/ramda": "^0.29.9",
"jest": "29.5.0",
"prettier": "2.8.8",
"ts-jest": "29.1.0",
"ts-node": "^10.9.1",
"typescript": "5.2.2"
},
"engines": {
"node": ">=18"
},
"publishConfig": {
"access": "public"
},
"repository": {
"directory": "packages/effect-schema-spark",
"type": "git",
"url": "https://github.com/embedded-insurance/diachronic.git"
}
}
Loading

0 comments on commit 9c8372e

Please sign in to comment.