Skip to content

Commit

Permalink
Fix web impl streaming line breaks with large rows (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Oct 5, 2024
1 parent 5c08c5e commit 1c9b28b
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 57 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# 1.7.0 (Common, Node.js, Web)

## Bug fixes

- (Web only) Fixed an issue where streaming large datasets could provide corrupted results. See [#333](https://github.com/ClickHouse/clickhouse-js/pull/333) (PR) for more details.

## New features

- (Experimental) Exposed the `parseColumnType` function that takes a string representation of a ClickHouse type (e.g., `FixedString(16)`, `Nullable(Int32)`, etc.) and returns an AST-like object that represents the type. For example:

```ts
Expand Down
37 changes: 37 additions & 0 deletions packages/client-common/__tests__/utils/datasets.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import type { ClickHouseClient } from '@clickhouse/client-common'
import { fakerRU } from '@faker-js/faker'
import { createTableWithFields } from '@test/fixtures/table_with_fields'

export async function genLargeStringsDataset<Stream = unknown>(
client: ClickHouseClient<Stream>,
{
rows,
words,
}: {
rows: number
words: number
},
): Promise<{
table: string
values: { id: number; sentence: string; timestamp: string }[]
}> {
const table = await createTableWithFields(
client as ClickHouseClient,
`sentence String, timestamp String`,
)
const values = [...new Array(rows)].map((_, id) => ({
id,
// it seems that it is easier to trigger an incorrect behavior with non-ASCII symbols
sentence: fakerRU.lorem.sentence(words),
timestamp: new Date().toISOString(),
}))
await client.insert({
table,
values,
format: 'JSONEachRow',
})
return {
table,
values,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import {
type ClickHouseClient,
type ClickHouseSettings,
} from '@clickhouse/client-common'
import { fakerRU } from '@faker-js/faker'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { createTableWithFields } from '@test/fixtures/table_with_fields'
import { createTestClient, guid } from '@test/utils'
import { genLargeStringsDataset } from '@test/utils/datasets'
import { tableFromIPC } from 'apache-arrow'
import { Buffer } from 'buffer'
import Fs from 'fs'
Expand Down Expand Up @@ -152,40 +151,9 @@ describe('[Node.js] streaming e2e', () => {
// Here we generate a large enough dataset to break into multiple chunks while streaming,
// effectively testing the implementation of incomplete rows handling
describe('should correctly process multiple chunks', () => {
async function generateData({
rows,
words,
}: {
rows: number
words: number
}): Promise<{
table: string
values: { id: number; sentence: string; timestamp: string }[]
}> {
const table = await createTableWithFields(
client as ClickHouseClient,
`sentence String, timestamp String`,
)
const values = [...new Array(rows)].map((_, id) => ({
id,
// it seems that it is easier to trigger an incorrect behavior with non-ASCII symbols
sentence: fakerRU.lorem.sentence(words),
timestamp: new Date().toISOString(),
}))
await client.insert({
table,
values,
format: 'JSONEachRow',
})
return {
table,
values,
}
}

describe('large amount of rows', () => {
it('should work with .json()', async () => {
const { table, values } = await generateData({
const { table, values } = await genLargeStringsDataset(client, {
rows: 10000,
words: 10,
})
Expand All @@ -199,7 +167,7 @@ describe('[Node.js] streaming e2e', () => {
})

it('should work with .stream()', async () => {
const { table, values } = await generateData({
const { table, values } = await genLargeStringsDataset(client, {
rows: 10000,
words: 10,
})
Expand All @@ -222,7 +190,7 @@ describe('[Node.js] streaming e2e', () => {

describe("rows that don't fit into a single chunk", () => {
it('should work with .json()', async () => {
const { table, values } = await generateData({
const { table, values } = await genLargeStringsDataset(client, {
rows: 5,
words: 10000,
})
Expand All @@ -236,7 +204,7 @@ describe('[Node.js] streaming e2e', () => {
})

it('should work with .stream()', async () => {
const { table, values } = await generateData({
const { table, values } = await genLargeStringsDataset(client, {
rows: 5,
words: 10000,
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import type { ClickHouseClient, Row } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import { genLargeStringsDataset } from '@test/utils/datasets'

describe('[Web] SELECT streaming', () => {
let client: ClickHouseClient<ReadableStream<Row[]>>
afterEach(async () => {
await client.close()
})
beforeEach(async () => {
client = createTestClient()
client = createTestClient({
// It is required to disable keep-alive to allow for larger inserts
// https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
// If contentLength is non-null and httpRequest’s keepalive is true, then:
// <...>
// If the sum of contentLength and inflightKeepaliveBytes is greater than 64 kibibytes, then return a network error.
keep_alive: { enabled: false },
})
})

describe('consume the response only once', () => {
Expand Down Expand Up @@ -199,6 +207,75 @@ describe('[Web] SELECT streaming', () => {
])
})
})

// See https://github.com/ClickHouse/clickhouse-js/issues/171 for more details
// Here we generate a large enough dataset to break into multiple chunks while streaming,
// effectively testing the implementation of incomplete rows handling
describe('should correctly process multiple chunks', () => {
describe('large amount of rows', () => {
it('should work with .json()', async () => {
const { table, values } = await genLargeStringsDataset(client, {
rows: 10000,
words: 10,
})
const result = await client
.query({
query: `SELECT * FROM ${table} ORDER BY id ASC`,
format: 'JSONEachRow',
})
.then((r) => r.json())
expect(result).toEqual(values)
})

it('should work with .stream()', async () => {
const { table, values } = await genLargeStringsDataset(client, {
rows: 10000,
words: 10,
})
const stream = await client
.query({
query: `SELECT * FROM ${table} ORDER BY id ASC`,
format: 'JSONEachRow',
})
.then((r) => r.stream())

const result = await rowsJsonValues(stream)
expect(result).toEqual(values)
})
})

describe("rows that don't fit into a single chunk", () => {
it('should work with .json()', async () => {
const { table, values } = await genLargeStringsDataset(client, {
rows: 5,
words: 10000,
})
const result = await client
.query({
query: `SELECT * FROM ${table} ORDER BY id ASC`,
format: 'JSONEachRow',
})
.then((r) => r.json())
expect(result).toEqual(values)
})

it('should work with .stream()', async () => {
const { table, values } = await genLargeStringsDataset(client, {
rows: 5,
words: 10000,
})
const stream = await client
.query({
query: `SELECT * FROM ${table} ORDER BY id ASC`,
format: 'JSONEachRow',
})
.then((r) => r.stream())

const result = await rowsJsonValues(stream)
expect(result).toEqual(values)
})
})
})
})

async function rowsJsonValues<T = unknown>(
Expand Down
67 changes: 48 additions & 19 deletions packages/client-web/src/result_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import type {
import {
isNotStreamableJSONFamily,
isStreamableJSONFamily,
validateStreamFormat,
} from '@clickhouse/client-common'
import { validateStreamFormat } from '@clickhouse/client-common'
import { getAsText } from './utils'

const NEWLINE = 0x0a as const

export class ResultSet<Format extends DataFormat | unknown>
implements BaseResultSet<ReadableStream<Row[]>, Format>
{
Expand Down Expand Up @@ -67,40 +69,67 @@ export class ResultSet<Format extends DataFormat | unknown>
this.markAsConsumed()
validateStreamFormat(this.format)

let decodedChunk = ''
let incompleteChunks: Uint8Array[] = []
let totalIncompleteLength = 0
const decoder = new TextDecoder('utf-8')
const transform = new TransformStream({
start() {
//
},
transform: (chunk, controller) => {
transform: (chunk: Uint8Array, controller) => {
if (chunk === null) {
controller.terminate()
}
decodedChunk += decoder.decode(chunk)
const rows: Row[] = []
// eslint-disable-next-line no-constant-condition
while (true) {
const idx = decodedChunk.indexOf('\n')
if (idx !== -1) {
const text = decodedChunk.slice(0, idx)
decodedChunk = decodedChunk.slice(idx + 1)
let idx: number
let lastIdx = 0
do {
// an unescaped newline character denotes the end of a row
idx = chunk.indexOf(NEWLINE, lastIdx)
// there is no complete row in the rest of the current chunk
if (idx === -1) {
// to be processed during the next transform iteration
const incompleteChunk = chunk.slice(lastIdx)
incompleteChunks.push(incompleteChunk)
totalIncompleteLength += incompleteChunk.length
// send the extracted rows to the consumer, if any
if (rows.length > 0) {
controller.enqueue(rows)
}
} else {
let text: string
if (incompleteChunks.length > 0) {
const completeRowBytes = new Uint8Array(
totalIncompleteLength + idx,
)

// using the incomplete chunks from the previous iterations
let offset = 0
incompleteChunks.forEach((incompleteChunk) => {
completeRowBytes.set(incompleteChunk, offset)
offset += incompleteChunk.length
})
// finalize the row with the current chunk slice that ends with a newline
const finalChunk = chunk.slice(0, idx)
completeRowBytes.set(finalChunk, offset)

// reset the incomplete chunks
incompleteChunks = []
totalIncompleteLength = 0

text = decoder.decode(completeRowBytes)
} else {
text = decoder.decode(chunk.slice(lastIdx, idx))
}
rows.push({
text,
json<T>(): T {
return JSON.parse(text)
},
})
} else {
if (rows.length) {
controller.enqueue(rows)
}
break
lastIdx = idx + 1 // skipping newline character
}
}
},
flush() {
decodedChunk = ''
} while (idx !== -1)
},
})

Expand Down

0 comments on commit 1c9b28b

Please sign in to comment.