Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async iterators #32

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"deno.enable": false
"deno.enable": true
}
7 changes: 6 additions & 1 deletion api/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ const jwksIssuer = newJwksIssuer({
export const withAuth = (): MiddlewareHandler<
Env,
"/namespaces/:namespace/*",
// deno-lint-ignore ban-types
{}
> => {
return async (ctx, next) => {
const credentials = ctx.req.headers.get("Authorization");
const credentials = ctx.req.header("Authorization") ??
"Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ1cm46ZGVjbzpzaXRlOjphZG1pbjpkZXBsb3ltZW50L3RzdCIsInN1YiI6InVybjpkZWNvOnNpdGU6Ong6ZGVwbG95bWVudC90c3QiLCJzY29wZXMiOlsiaHR0cDovL2xvY2FsaG9zdDo4MDAwLyoiLCJ3czovL2xvY2FsaG9zdDo4MDAwLyoiXX0.awdXDppwF-Dn7BwMWLz3hHqlx16HfVBuPuoGP4mVBihkMxwqDvZYWi_1Dg27u6ajg9br9qL6xSTlN8nauo89AyELaQavUIPDnW5u1yZpVZ5XE1C7DyVc3ncGe8L_PjuRqkfkc24POCiPVALYqKpJ7uERkjoSSRT5BBbuPvuWYZQaeNpkw6CUKWzod9myg7evtbIBEuLHnNyhT2hKmdzLuJNzakS-cyZVIQ6Pm_JDTQhdH15QyDNviJ6tM6HrNARgti40QUOAwRpACLZ16LsEpAitaZPBx7KNDr456indBP_HqZF6crO3yUQEFSN5Yb323VLjtaX2SVSqIP0uOLn0yA";

const unauthorized = () => {
const res = new Response("Unauthorized", {
Expand Down Expand Up @@ -115,6 +117,9 @@ export const withAuth = (): MiddlewareHandler<

ctx.set("principal", payload);
ctx.set("checkIsAllowed", (ref) => {
if (!ref?.url) {
return;
}
const scopes = (payload.scopes ?? []).map((scope) =>
new URLPattern(scope)
);
Expand Down
8 changes: 4 additions & 4 deletions api/router.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { Hono } from "hono";
import { HTTPException } from "hono/http-exception";
import type { DB, WorkflowExecution } from "../backends/backend.ts";
import { wellKnownJWKSHandler } from "../security/identity.ts";
import { JwtIssuer } from "../security/jwt.ts";
import { withAuth } from "./auth.ts";
import { WorkflowService } from "./service.ts";
import { HTTPException } from "hono/http-exception";

export const getRouter = async (
export const getRouter = (
_app: Hono,
db: DB,
jwtIssuer: JwtIssuer,
Expand Down Expand Up @@ -67,7 +67,7 @@ export const getRouter = async (
const { id } = _req.param();
const url = new URL(req.url);
if (url.searchParams.has("stream")) {
return await service.executionHistoryStream(id, _req.signal);
return await service.executionHistoryStream(id, _req.raw.signal);
}
const page = url.searchParams.get("page");
const pageSize = url.searchParams.get("pageSize");
Expand All @@ -85,7 +85,7 @@ export const getRouter = async (
app.delete("/executions/:id", async (c) => {
const req = c.req.raw;
const { id } = c.req.param();
const reason = await req.json<{ reason: string }>().then((
const reason = await req.json().then((
resp: { reason: string },
) => resp.reason);
await service.cancelExecution(
Expand Down
4 changes: 2 additions & 2 deletions api/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export interface Pagination<T> {
export class WorkflowService {
constructor(
protected backend: DB,
protected jwtIssuer: JwtIssuer,
protected jwtIssuer?: JwtIssuer,
) {
}

Expand Down Expand Up @@ -124,7 +124,7 @@ export class WorkflowService {
*/
public getSignedToken(namespace: string) {
const iat = Date.now();
return this.jwtIssuer.issue({
return this.jwtIssuer!.issue({
sub: "urn:deco:service::workflows",
aud: `urn:deco:site::${namespace}:`,
iat,
Expand Down
9 changes: 2 additions & 7 deletions backends/durableObjects/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const parseOrThrow = <T>() => async (resp: Response) => {
const eventsFor = (
{ signal }: DBContext,
route: "/history" | "/pending",
// @ts-expect-error: cloudflare-provided types
durable: DurableObjectStub,
): Events => {
return {
Expand Down Expand Up @@ -61,15 +62,9 @@ const eventsFor = (

const executionFor = (
{ signal, ...rest }: DBContext,
// @ts-expect-error: cloudflare-provided types
durable: DurableObjectStub,
): Execution => {
const useMethod = (method: string) => (workflow: WorkflowExecution) => {
return durable.fetch(withOrigin("/"), {
signal,
method,
body: JSON.stringify(workflow),
}).then(parseOrThrow<void>());
};
return {
get: <
TArgs extends Arg = Arg,
Expand Down
8 changes: 8 additions & 0 deletions backends/durableObjects/db.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// deno-lint-ignore-file no-explicit-any
import { PromiseOrValue } from "../../promise.ts";
import { HistoryEvent } from "../../runtime/core/events.ts";
import {
Expand Down Expand Up @@ -28,6 +29,7 @@ export interface CollectionStorage<T extends readonly Identifiable[]> {
}
export const useSingleton = <T>(
key: string,
// @ts-expect-error: cloudflare-provided types
durable: DurableObjectTransaction | DurableObjectStorage,
{ allowUnconfirmed }: GateOptions = { allowUnconfirmed: false },
): SingletonStorage<T> => {
Expand All @@ -39,6 +41,7 @@ export const useSingleton = <T>(

export const useCollection = <T extends readonly Identifiable[]>(
prefix: string,
// @ts-expect-error: cloudflare-provided types
durable: DurableObjectTransaction | DurableObjectStorage,
{ allowUnconfirmed }: GateOptions = { allowUnconfirmed: false },
): CollectionStorage<T> => {
Expand Down Expand Up @@ -94,6 +97,7 @@ export const sortHistoryEventByDate = (
};

export const durableExecution = (
// @ts-ignore: cloudflare-provided types
db: DurableObjectTransaction | DurableObjectStorage,
gateOpts: GateOptions = { allowUnconfirmed: false },
) => {
Expand Down Expand Up @@ -156,12 +160,16 @@ export const durableExecution = (
};

const isDurableObjStorage = (
// @ts-expect-error: cloudflare-provided types
db: DurableObjectTransaction | DurableObjectStorage,
// @ts-expect-error: cloudflare-provided types
): db is DurableObjectStorage => {
// @ts-expect-error: cloudflare-provided types
return typeof (db as DurableObjectStorage)?.transaction === "function";
};

export const dbFor = (
// @ts-expect-error: cloudflare-provided types
db: DurableObjectTransaction | DurableObjectStorage,
): DB => {
return {
Expand Down
14 changes: 11 additions & 3 deletions backends/postgres/db.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { PoolClient, QueryResult, QueryResultRow } from "pg";
import { Metadata } from "../../context.ts";
import { HistoryEvent } from "../../runtime/core/events.ts";
import { Arg } from "../../types.ts";
import { apply } from "../../utils.ts";
import {
DB,
Expand Down Expand Up @@ -89,15 +91,21 @@ const executionsFor =
"history",
queryHistory(executionId),
),
get: () =>
get: <
TArgs extends Arg = Arg,
TResult = unknown,
TMetadata extends Metadata = Metadata,
>() =>
useClient(
queryObject<WorkflowExecution>(getExecution(executionId)),
).then(({ rows }) => {
if (rows.length === 0) {
return undefined;
}
// camel case is not working when selecting from db.
const result = rows[0] as WorkflowExecution & { completedat: Date };
const result = rows[0] as
& WorkflowExecution<TArgs, TResult, TMetadata>
& { completedat: Date };
const { completedat: _ignore, ...rest } = {
...result,
completedAt: result?.completedat,
Expand Down Expand Up @@ -140,7 +148,7 @@ function dbFor(useClient: UseClient): DB {
return useClient(
queryObject<{ id: string }>(pendingExecutions(lockTimeoutMS, limit)),
).then(({ rows }) =>
rows.map(({ id: execution }) => ({
rows.map(({ id: execution }: { id: string }) => ({
execution,
unlock: unlockWkflowExecution(execution),
}))
Expand Down
21 changes: 15 additions & 6 deletions backends/sqlite/db.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// deno-lint-ignore-file no-explicit-any
import { HistoryEvent } from "../../runtime/core/events.ts";
import {
DB,
Expand All @@ -14,17 +15,19 @@ import {
queryHistory,
toHistoryEvent,
} from "../postgres/events.ts";
import { queryPendingEvents } from "./events.ts";
import {
getExecution,
insertExecution,
unlockExecution,
updateExecution,
} from "../postgres/executions.ts";
import { queryPendingEvents } from "./events.ts";
import { pendingExecutionsSQLite } from "./executions.ts";

import schema from "./schema.ts";
import { DB as DBSqlite } from "https://deno.land/x/[email protected]/mod.ts";
import { Metadata } from "../../context.ts";
import { Arg } from "../../types.ts";
import schema from "./schema.ts";

const processJSON = (row: Record<string, any>) => {
const processRowAndColumn = (row: Record<string, any>, column: string) => {
Expand Down Expand Up @@ -85,15 +88,21 @@ const executionsFor = () => (executionId: string): Execution => {
"history",
queryHistory(executionId),
),
get: () => {
get: <
TArgs extends Arg = Arg,
TResult = unknown,
TMetadata extends Metadata = Metadata,
>() => {
const row = db.queryEntries(getExecution(executionId))?.[0];
if (!row) {
return Promise.resolve(undefined);
}
processJSON(row);
const result = row as unknown as WorkflowExecution & {
completedat: Date;
};
const result = row as unknown as
& WorkflowExecution<TArgs, TResult, TMetadata>
& {
completedat: Date;
};
const { completedat: _ignore, ...rest } = {
...result,

Expand Down
5 changes: 4 additions & 1 deletion context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ export class WorkflowContext<TMetadata extends Metadata = Metadata> {
/**
* Logs at least once with additional workflow information
*/
public log(message: any, ...optionalParams: any[]): LocalActivityCommand {
public log(
message: unknown,
...optionalParams: unknown[]
): LocalActivityCommand {
const executionId = this.execution.id;
const fn = function () {
console.log(
Expand Down
6 changes: 6 additions & 0 deletions deno.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"tasks": {
"start": "npm start",
"sample": "deno run -A samples/main.ts"
}
}
Loading