Skip to content

Commit

Permalink
feat: deps, native support for runQueryCount
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgroshkov committed Apr 6, 2024
1 parent 8c04f29 commit 3471ef1
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 303 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"version": "3.16.4",
"description": "Opinionated library to work with Google Datastore",
"scripts": {
"prepare": "husky install"
"prepare": "husky"
},
"dependencies": {
"@google-cloud/datastore": "^8.0.0",
Expand Down
19 changes: 6 additions & 13 deletions src/datastore.db.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Transform } from 'node:stream'
import { PropertyFilter, Transaction } from '@google-cloud/datastore'
import type { Datastore, Key, Query } from '@google-cloud/datastore'
import {
Expand Down Expand Up @@ -228,8 +227,9 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {
_opt?: DatastoreDBOptions,
): Promise<number> {
const q = dbQueryToDatastoreQuery(dbQuery.select([]), this.ds().createQuery(dbQuery.table))
const [entities] = await this.ds().runQuery(q)
return entities.length
const aq = this.ds().createAggregationQuery(q).count('count')
const [entities] = await this.ds().runAggregationQuery(aq)
return entities[0]?.count
}

async runDatastoreQuery<ROW extends ObjectWithId>(q: Query): Promise<RunQueryResult<ROW>> {
Expand All @@ -254,22 +254,15 @@ export class DatastoreDB extends BaseCommonDB implements CommonDB {

const stream: ReadableTyped<ROW> = (
opt.experimentalCursorStream
? new DatastoreStreamReadable(
? new DatastoreStreamReadable<ROW>(
q,
opt,
commonLoggerMinLevel(this.cfg.logger, opt.debug ? 'log' : 'warn'),
)
: this.ds().runQueryStream(q)
)
.on('error', err => stream.emit('error', err))
.pipe(
new Transform({
objectMode: true,
transform: (chunk, _enc, cb) => {
cb(null, this.mapId(chunk))
},
}),
)
// .on('error', err => stream.emit('error', err))
.map(chunk => this.mapId(chunk))

return stream
}
Expand Down
57 changes: 22 additions & 35 deletions src/datastoreKeyValueDB.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { CommonKeyValueDB, DBQuery, KeyValueDBTuple } from '@naturalcycles/db-lib'
import { ErrorMode, ObjectWithId } from '@naturalcycles/js-lib'
import { ReadableTyped, transformMapSimple } from '@naturalcycles/nodejs-lib'
import { ReadableTyped } from '@naturalcycles/nodejs-lib'
import { DatastoreDB } from './datastore.db'
import { DatastoreDBCfg } from './datastore.model'

Expand Down Expand Up @@ -49,51 +48,39 @@ export class DatastoreKeyValueDB implements CommonKeyValueDB {
.select(['id'])
.limit(limit || 0)

const stream: ReadableTyped<string> = this.db
.streamQuery<KVObject>(q)
.on('error', err => stream.emit('error', err))
.pipe(
transformMapSimple<ObjectWithId, string>(objectWithId => objectWithId.id, {
errorMode: ErrorMode.SUPPRESS, // cause .pipe() cannot propagate errors
}),
)

return stream
return (
this.db
.streamQuery(q)
// .on('error', err => stream.emit('error', err))
.map(r => r.id)
)
}

streamValues(table: string, limit?: number): ReadableTyped<Buffer> {
// `select v` doesn't work for some reason
const q = DBQuery.create<KVObject>(table).limit(limit || 0)

const stream: ReadableTyped<string> = this.db
.streamQuery<KVObject>(q)
.on('error', err => stream.emit('error', err))
.pipe(
transformMapSimple<{ v: Buffer }, Buffer>(obj => obj.v, {
errorMode: ErrorMode.SUPPRESS, // cause .pipe() cannot propagate errors
}),
)

return stream
return (
this.db
.streamQuery(q)
// .on('error', err => stream.emit('error', err))
.map(r => r.v)
)
}

streamEntries(table: string, limit?: number): ReadableTyped<KeyValueDBTuple> {
const q = DBQuery.create<KVObject>(table).limit(limit || 0)

const stream: ReadableTyped<string> = this.db
.streamQuery<KVObject>(q)
.on('error', err => stream.emit('error', err))
.pipe(
transformMapSimple<KVObject, KeyValueDBTuple>(obj => [obj.id, obj.v], {
errorMode: ErrorMode.SUPPRESS, // cause .pipe() cannot propagate errors
}),
)

return stream
return (
this.db
.streamQuery(q)
// .on('error', err => stream.emit('error', err))
.map(r => [r.id, r.v] as KeyValueDBTuple)
)
}

async count(_table: string): Promise<number> {
this.db.cfg.logger.warn(`DatastoreKeyValueDB.count is not supported`)
return 0
async count(table: string): Promise<number> {
const q = DBQuery.create<KVObject>(table)
return await this.db.runQueryCount(q)
}
}
Loading

0 comments on commit 3471ef1

Please sign in to comment.