forked from ClickHouse/clickhouse-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclickhouse_receive_data.go
65 lines (63 loc) · 1.86 KB
/
clickhouse_receive_data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package clickhouse
import "fmt"
func (ch *clickhouse) receiveData() (*rows, error) {
var rows rows
for {
packet, err := readUvarint(ch.conn)
if err != nil {
return nil, err
}
switch packet {
case ServerExceptionPacket:
ch.log("[receive packet] <- exception")
return nil, ch.exception()
case ServerProgressPacket:
progress, err := ch.progress()
if err != nil {
return nil, err
}
ch.log("[receive packet] <- progress: rows=%d, bytes=%d, total rows=%d",
progress.bytes,
progress.rows,
progress.totalRows,
)
case ServerDataPacket:
var block block
if err := block.read(ch.serverRevision, ch.conn); err != nil {
ch.log("[receive packet] err: %v", err)
return nil, err
}
if block.numRows > 0 {
rows.append(&block)
}
ch.log("[receive packet] <- data: columns=%d, rows=%d", block.numColumns, block.numRows)
case ServerExtremesPacket:
var block block
if err := block.read(ch.serverRevision, ch.conn); err != nil {
return nil, err
}
ch.log("[receive packet] <- extremes: columns=%d, rows=%d", block.numColumns, block.numRows)
case ServerTotalsPacket:
var block block
if err := block.read(ch.serverRevision, ch.conn); err != nil {
return nil, err
}
if block.numRows > 0 {
rows.append(&block)
}
ch.log("[receive packet] <- totals: columns=%d, rows=%d", block.numColumns, block.numRows)
case ServerProfileInfoPacket:
profileInfo, err := ch.profileInfo()
if err != nil {
return nil, err
}
ch.log("[receive packet] <- profiling: rows=%d, bytes=%d, blocks=%d", profileInfo.rows, profileInfo.bytes, profileInfo.blocks)
case ServerEndOfStreamPacket:
ch.log("[receive packet] <- end of stream")
return &rows, nil
default:
ch.log("[receive packet] unexpected packet [%d]", packet)
return nil, fmt.Errorf("unexpected packet [%d] from server", packet)
}
}
}