Skip to content

Commit

Permalink
Merge pull request taosdata#205 from taosdata/3.0
Browse files Browse the repository at this point in the history
merge 3.0 to main
  • Loading branch information
huskar-t authored Oct 24, 2023
2 parents af22d62 + a3d62aa commit 22bd599
Show file tree
Hide file tree
Showing 36 changed files with 1,512 additions and 337 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
cd TDengine
mkdir debug
cd debug
cmake .. -DBUILD_JDBC=false -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9 -DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache
cmake .. -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9 -DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache
make -j 4
- name: package
Expand Down
71 changes: 70 additions & 1 deletion af/tmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event {
db := wrapper.TMQGetDBName(message)
resultType := wrapper.TMQGetResType(message)
offset := tmq.Offset(wrapper.TMQGetVgroupOffset(message))
vgID := wrapper.TMQGetVgroupID(message)
switch resultType {
case common.TMQ_RES_DATA:
result := &tmq.DataMessage{}
Expand All @@ -106,6 +107,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event {
}
result.SetData(data)
result.SetOffset(offset)
result.TopicPartition = tmq.TopicPartition{
Topic: &topic,
Partition: vgID,
Offset: offset,
}
wrapper.TaosFreeResult(message)
return result
case common.TMQ_RES_TABLE_META:
Expand All @@ -118,6 +124,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event {
}
result.SetMeta(meta)
result.SetOffset(offset)
result.TopicPartition = tmq.TopicPartition{
Topic: &topic,
Partition: vgID,
Offset: offset,
}
wrapper.TaosFreeResult(message)
return result
case common.TMQ_RES_METADATA:
Expand All @@ -137,6 +148,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event {
Meta: meta,
Data: data,
})
result.TopicPartition = tmq.TopicPartition{
Topic: &topic,
Partition: vgID,
Offset: offset,
}
wrapper.TaosFreeResult(message)
return result
default:
Expand Down Expand Up @@ -187,7 +203,16 @@ func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) {
}

func (c *Consumer) Commit() ([]tmq.TopicPartition, error) {
return c.doCommit(nil)
errCode := wrapper.TMQCommitSync(c.cConsumer, nil)
if errCode != taosError.SUCCESS {
errStr := wrapper.TMQErr2Str(errCode)
return nil, taosError.NewError(int(errCode), errStr)
}
partitions, err := c.Assignment()
if err != nil {
return nil, err
}
return c.Committed(partitions, 0)
}

func (c *Consumer) doCommit(message unsafe.Pointer) ([]tmq.TopicPartition, error) {
Expand Down Expand Up @@ -235,6 +260,50 @@ func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) erro
return nil
}

func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error) {
offsets = make([]tmq.TopicPartition, len(partitions))
for i := 0; i < len(partitions); i++ {
cOffset := wrapper.TMQCommitted(c.cConsumer, *partitions[i].Topic, partitions[i].Partition)
offset := tmq.Offset(cOffset)
if !offset.Valid() {
return nil, taosError.NewError(int(offset), wrapper.TMQErr2Str(int32(offset)))
}
offsets[i] = tmq.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: offset,
}
}
return
}

func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error) {
for i := 0; i < len(offsets); i++ {
errCode := wrapper.TMQCommitOffsetSync(c.cConsumer, *offsets[i].Topic, offsets[i].Partition, int64(offsets[i].Offset))
if errCode != taosError.SUCCESS {
errStr := wrapper.TMQErr2Str(errCode)
return nil, taosError.NewError(int(errCode), errStr)
}
}
return c.Committed(offsets, 0)
}

func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error) {
offsets = make([]tmq.TopicPartition, len(partitions))
for i := 0; i < len(partitions); i++ {
position := wrapper.TMQPosition(c.cConsumer, *partitions[i].Topic, partitions[i].Partition)
if position < 0 {
return nil, taosError.NewError(int(position), wrapper.TMQErr2Str(int32(position)))
}
offsets[i] = tmq.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: tmq.Offset(position),
}
}
return
}

// Close release consumer
func (c *Consumer) Close() error {
errCode := wrapper.TMQConsumerClose(c.cConsumer)
Expand Down
55 changes: 40 additions & 15 deletions af/tmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestTmq(t *testing.T) {
return
}
sqls := []string{
"drop topic if exists test_tmq_common",
"drop database if exists af_test_tmq",
"create database if not exists af_test_tmq vgroups 2 WAL_RETENTION_PERIOD 86400",
"use af_test_tmq",
Expand All @@ -43,8 +44,8 @@ func TestTmq(t *testing.T) {
") tags(t1 int)",
"create table if not exists ct0 using all_type tags(1000)",
"create table if not exists ct1 using all_type tags(2000)",
"create table if not exists ct3 using all_type tags(3000)",
"create topic if not exists test_tmq_common as select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from ct1",
"create table if not exists ct2 using all_type tags(3000)",
"create topic if not exists test_tmq_common as select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from all_type",
}

defer func() {
Expand All @@ -59,20 +60,24 @@ func TestTmq(t *testing.T) {
assert.NoError(t, err)
}()
now := time.Now()
err = execWithoutResult(conn, fmt.Sprintf("insert into ct0 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano)))
assert.NoError(t, err)
err = execWithoutResult(conn, fmt.Sprintf("insert into ct1 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano)))
assert.NoError(t, err)
err = execWithoutResult(conn, fmt.Sprintf("insert into ct2 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano)))
assert.NoError(t, err)

consumer, err := NewConsumer(&tmq.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
//"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
})
if err != nil {
t.Error(err)
Expand All @@ -83,6 +88,10 @@ func TestTmq(t *testing.T) {
t.Error(err)
return
}
ass, err := consumer.Assignment()
t.Log(ass)
position, _ := consumer.Position(ass)
t.Log(position)
haveMessage := false
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
Expand All @@ -108,8 +117,23 @@ func TestTmq(t *testing.T) {
assert.Equal(t, float64(11), row1[11].(float64))
assert.Equal(t, "1", row1[12].(string))
assert.Equal(t, "2", row1[13].(string))
_, err = consumer.Commit()
t.Log(e.Offset())
ass, err := consumer.Assignment()
t.Log(ass)
committed, err := consumer.Committed(ass, 0)
t.Log(committed)
position, _ := consumer.Position(ass)
t.Log(position)
offsets, err := consumer.Position([]tmq.TopicPartition{e.TopicPartition})
assert.NoError(t, err)
_, err = consumer.CommitOffsets(offsets)
assert.NoError(t, err)
ass, err = consumer.Assignment()
t.Log(ass)
committed, err = consumer.Committed(ass, 0)
t.Log(committed)
position, _ = consumer.Position(ass)
t.Log(position)
err = consumer.Unsubscribe()
assert.NoError(t, err)
err = consumer.Close()
Expand Down Expand Up @@ -208,9 +232,10 @@ func TestSeek(t *testing.T) {
for _, datum := range data {
dataCount += len(datum.Data)
}
time.Sleep(time.Second * 2)
_, err = consumer.Commit()
assert.NoError(t, err)
}
_, err = consumer.Commit()
assert.NoError(t, err)
}
assert.Equal(t, record, dataCount)

Expand Down
2 changes: 2 additions & 0 deletions common/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
NullTime = reflect.TypeOf(types.NullTime{})
NullBool = reflect.TypeOf(types.NullBool{})
NullString = reflect.TypeOf(types.NullString{})
Bytes = reflect.TypeOf([]byte{})
NullJson = reflect.TypeOf(types.NullJson{})
UnknownType = reflect.TypeOf(new(interface{})).Elem()
)
Expand All @@ -40,4 +41,5 @@ var ColumnTypeMap = map[int]reflect.Type{
TSDB_DATA_TYPE_NCHAR: NullString,
TSDB_DATA_TYPE_TIMESTAMP: NullTime,
TSDB_DATA_TYPE_JSON: NullJson,
TSDB_DATA_TYPE_VARBINARY: Bytes,
}
13 changes: 12 additions & 1 deletion common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const (
TSDB_DATA_TYPE_UINT_Str = "INT UNSIGNED"
TSDB_DATA_TYPE_UBIGINT_Str = "BIGINT UNSIGNED"
TSDB_DATA_TYPE_JSON_Str = "JSON"
TSDB_DATA_TYPE_VARBINARY_Str = "VARBINARY"
)

var TypeNameMap = map[int]string{
Expand All @@ -83,6 +84,7 @@ var TypeNameMap = map[int]string{
TSDB_DATA_TYPE_UINT: TSDB_DATA_TYPE_UINT_Str,
TSDB_DATA_TYPE_UBIGINT: TSDB_DATA_TYPE_UBIGINT_Str,
TSDB_DATA_TYPE_JSON: TSDB_DATA_TYPE_JSON_Str,
TSDB_DATA_TYPE_VARBINARY: TSDB_DATA_TYPE_VARBINARY_Str,
}

var NameTypeMap = map[string]int{
Expand All @@ -102,6 +104,7 @@ var NameTypeMap = map[string]int{
TSDB_DATA_TYPE_UINT_Str: TSDB_DATA_TYPE_UINT,
TSDB_DATA_TYPE_UBIGINT_Str: TSDB_DATA_TYPE_UBIGINT,
TSDB_DATA_TYPE_JSON_Str: TSDB_DATA_TYPE_JSON,
TSDB_DATA_TYPE_VARBINARY_Str: TSDB_DATA_TYPE_VARBINARY,
}

const (
Expand Down Expand Up @@ -142,4 +145,12 @@ const (

const ReqIDKey = "taos_req_id"

const TAOS_NOTIFY_PASSVER = 0
const (
TAOS_NOTIFY_PASSVER = 0
TAOS_NOTIFY_WHITELIST_VER = 1
TAOS_NOTIFY_USER_DROPPED = 2
)

const (
TAOS_CONN_MODE_BI = 0
)
12 changes: 12 additions & 0 deletions common/param/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ func (c *ColumnType) AddBinary(strMaxLen int) *ColumnType {
return c
}

func (c *ColumnType) AddVarBinary(strMaxLen int) *ColumnType {
if c.column >= c.size {
return c
}
c.value[c.column] = &types.ColumnType{
Type: types.TaosVarBinaryType,
MaxLen: strMaxLen,
}
c.column += 1
return c
}

func (c *ColumnType) AddNchar(strMaxLen int) *ColumnType {
if c.column >= c.size {
return c
Expand Down
16 changes: 16 additions & 0 deletions common/param/param.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (p *Param) SetBinary(offset int, value []byte) {
p.value[offset] = taosTypes.TaosBinary(value)
}

func (p *Param) SetVarBinary(offset int, value []byte) {
if offset >= p.size {
return
}
p.value[offset] = taosTypes.TaosVarBinary(value)
}

func (p *Param) SetNchar(offset int, value string) {
if offset >= p.size {
return
Expand Down Expand Up @@ -252,6 +259,15 @@ func (p *Param) AddBinary(value []byte) *Param {
return p
}

func (p *Param) AddVarBinary(value []byte) *Param {
if p.offset >= p.size {
return p
}
p.value[p.offset] = taosTypes.TaosVarBinary(value)
p.offset += 1
return p
}

func (p *Param) AddNchar(value string) *Param {
if p.offset >= p.size {
return p
Expand Down
Loading

0 comments on commit 22bd599

Please sign in to comment.