Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update decoders to parse float and return null on error #444

Merged
merged 13 commits into from
Feb 5, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ a local testing environment, as shown in the following steps:
use the local testing settings specified in `tests/config.json`, instead of
the CI settings
3. Run the tests manually by using the command\
`deno test --unstable -A`
`deno test -A`

## Deno compatibility

Expand Down
12 changes: 3 additions & 9 deletions client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ export abstract class QueryClient {

#assertOpenConnection() {
if (this.#terminated) {
throw new Error(
"Connection to the database has been terminated",
);
throw new Error("Connection to the database has been terminated");
}
}

Expand Down Expand Up @@ -243,9 +241,7 @@ export abstract class QueryClient {
async #executeQuery<T>(
_query: Query<ResultType.OBJECT>,
): Promise<QueryObjectResult<T>>;
async #executeQuery(
query: Query<ResultType>,
): Promise<QueryResult> {
async #executeQuery(query: Query<ResultType>): Promise<QueryResult> {
return await this.#connection.query(query);
}

Expand Down Expand Up @@ -397,9 +393,7 @@ export abstract class QueryClient {
query: TemplateStringsArray,
...args: unknown[]
): Promise<QueryObjectResult<T>>;
async queryObject<
T = Record<string, unknown>,
>(
async queryObject<T = Record<string, unknown>>(
query_template_or_config:
| string
| QueryObjectOptions
Expand Down
10 changes: 2 additions & 8 deletions client/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,8 @@ export class PostgresError extends Error {
}

export class TransactionError extends Error {
constructor(
transaction_name: string,
cause: PostgresError,
) {
super(
`The transaction "${transaction_name}" has been aborted`,
{ cause },
);
constructor(transaction_name: string, cause: PostgresError) {
super(`The transaction "${transaction_name}" has been aborted`, { cause });
this.name = "TransactionError";
}
}
98 changes: 32 additions & 66 deletions connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ function assertSuccessfulAuthentication(auth_message: Message) {
throw new PostgresError(parseNoticeMessage(auth_message));
}

if (
auth_message.type !== INCOMING_AUTHENTICATION_MESSAGES.AUTHENTICATION
) {
if (auth_message.type !== INCOMING_AUTHENTICATION_MESSAGES.AUTHENTICATION) {
throw new Error(`Unexpected auth response: ${auth_message.type}.`);
}

Expand Down Expand Up @@ -118,10 +116,7 @@ export class Connection {
#onDisconnection: () => Promise<void>;
#packetWriter = new PacketWriter();
#pid?: number;
#queryLock: DeferredStack<undefined> = new DeferredStack(
1,
[undefined],
);
#queryLock: DeferredStack<undefined> = new DeferredStack(1, [undefined]);
// TODO
// Find out what the secret key is for
#secretKey?: number;
Expand Down Expand Up @@ -180,10 +175,7 @@ export class Connection {
async #serverAcceptsTLS(): Promise<boolean> {
const writer = this.#packetWriter;
writer.clear();
writer
.addInt32(8)
.addInt32(80877103)
.join();
writer.addInt32(8).addInt32(80877103).join();

await this.#bufWriter.write(writer.flush());
await this.#bufWriter.flush();
Expand Down Expand Up @@ -216,16 +208,20 @@ export class Connection {
// TODO: recognize other parameters
writer.addCString("user").addCString(this.#connection_params.user);
writer.addCString("database").addCString(this.#connection_params.database);
writer.addCString("application_name").addCString(
this.#connection_params.applicationName,
);
writer
.addCString("application_name")
.addCString(this.#connection_params.applicationName);

const connection_options = Object.entries(this.#connection_params.options);
if (connection_options.length > 0) {
// The database expects options in the --key=value
writer.addCString("options").addCString(
connection_options.map(([key, value]) => `--${key}=${value}`).join(" "),
);
writer
.addCString("options")
.addCString(
connection_options
.map(([key, value]) => `--${key}=${value}`)
.join(" "),
);
}

// terminator after all parameters were writter
Expand All @@ -236,10 +232,7 @@ export class Connection {

writer.clear();

const finalBuffer = writer
.addInt32(bodyLength)
.add(bodyBuffer)
.join();
const finalBuffer = writer.addInt32(bodyLength).add(bodyBuffer).join();

await this.#bufWriter.write(finalBuffer);
await this.#bufWriter.flush();
Expand All @@ -248,7 +241,7 @@ export class Connection {
}

async #openConnection(options: ConnectOptions) {
// @ts-ignore This will throw in runtime if the options passed to it are socket related and deno is running
// @ts-expect-error This will throw in runtime if the options passed to it are socket related and deno is running
// on stable
this.#conn = await Deno.connect(options);
this.#bufWriter = new BufWriter(this.#conn);
Expand All @@ -257,9 +250,7 @@ export class Connection {

async #openSocketConnection(path: string, port: number) {
if (Deno.build.os === "windows") {
throw new Error(
"Socket connection is only available on UNIX systems",
);
throw new Error("Socket connection is only available on UNIX systems");
}
const socket = await Deno.stat(path);

Expand Down Expand Up @@ -296,10 +287,7 @@ export class Connection {
this.connected = false;
this.#packetWriter = new PacketWriter();
this.#pid = undefined;
this.#queryLock = new DeferredStack(
1,
[undefined],
);
this.#queryLock = new DeferredStack(1, [undefined]);
this.#secretKey = undefined;
this.#tls = undefined;
this.#transport = undefined;
Expand All @@ -319,14 +307,10 @@ export class Connection {
this.#closeConnection();

const {
hostname,
host_type,
hostname,
port,
tls: {
enabled: tls_enabled,
enforce: tls_enforced,
caCertificates,
},
tls: { caCertificates, enabled: tls_enabled, enforce: tls_enforced },
} = this.#connection_params;

if (host_type === "socket") {
Expand All @@ -341,12 +325,11 @@ export class Connection {

if (tls_enabled) {
// If TLS is disabled, we don't even try to connect.
const accepts_tls = await this.#serverAcceptsTLS()
.catch((e) => {
// Make sure to close the connection if the TLS validation throws
this.#closeConnection();
throw e;
});
const accepts_tls = await this.#serverAcceptsTLS().catch((e) => {
// Make sure to close the connection if the TLS validation throws
this.#closeConnection();
throw e;
});

// https://www.postgresql.org/docs/14/protocol-flow.html#id-1.10.5.7.11
if (accepts_tls) {
Expand Down Expand Up @@ -657,24 +640,18 @@ export class Connection {
`Unexpected message in SASL finalization: ${maybe_sasl_continue.type}`,
);
}
const sasl_final = utf8.decode(
maybe_sasl_final.reader.readAllBytes(),
);
const sasl_final = utf8.decode(maybe_sasl_final.reader.readAllBytes());
await client.receiveResponse(sasl_final);

// Return authentication result
return this.#readMessage();
}

async #simpleQuery(
query: Query<ResultType.ARRAY>,
): Promise<QueryArrayResult>;
async #simpleQuery(query: Query<ResultType.ARRAY>): Promise<QueryArrayResult>;
async #simpleQuery(
query: Query<ResultType.OBJECT>,
): Promise<QueryObjectResult>;
async #simpleQuery(
query: Query<ResultType>,
): Promise<QueryResult> {
async #simpleQuery(query: Query<ResultType>): Promise<QueryResult> {
this.#packetWriter.clear();

const buffer = this.#packetWriter.addCString(query.text).flush(0x51);
Expand Down Expand Up @@ -757,9 +734,7 @@ export class Connection {
await this.#bufWriter.write(buffer);
}

async #appendArgumentsToMessage<T extends ResultType>(
query: Query<T>,
) {
async #appendArgumentsToMessage<T extends ResultType>(query: Query<T>) {
this.#packetWriter.clear();

const hasBinaryArgs = query.args.some((arg) => arg instanceof Uint8Array);
Expand Down Expand Up @@ -830,10 +805,7 @@ export class Connection {

// TODO
// Rename process function to a more meaningful name and move out of class
async #processErrorUnsafe(
msg: Message,
recoverable = true,
) {
async #processErrorUnsafe(msg: Message, recoverable = true) {
const error = new PostgresError(parseNoticeMessage(msg));
if (recoverable) {
let maybe_ready_message = await this.#readMessage();
Expand Down Expand Up @@ -930,15 +902,9 @@ export class Connection {
return result;
}

async query(
query: Query<ResultType.ARRAY>,
): Promise<QueryArrayResult>;
async query(
query: Query<ResultType.OBJECT>,
): Promise<QueryObjectResult>;
async query(
query: Query<ResultType>,
): Promise<QueryResult> {
async query(query: Query<ResultType.ARRAY>): Promise<QueryArrayResult>;
async query(query: Query<ResultType.OBJECT>): Promise<QueryObjectResult>;
async query(query: Query<ResultType>): Promise<QueryResult> {
if (!this.connected) {
await this.startup(true);
}
Expand Down
Loading
Loading