Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: wrong parsed timestamp from json #16097

Closed
MrCroxx opened this issue Apr 2, 2024 · 6 comments
Closed

bug: wrong parsed timestamp from json #16097

MrCroxx opened this issue Apr 2, 2024 · 6 comments
Assignees
Labels
type/bug Something isn't working
Milestone

Comments

@MrCroxx
Copy link
Contributor

MrCroxx commented Apr 2, 2024

Describe the bug

When parsing json value to timestamp:

{
	"before": null,
	"after": {
	    "id": 13,
        "example_timestamp_tz": 63072000000000000
    },
	"source": {
		"table": "pathmetadata"
	},
	"op": "c",
	"ts_ms": 1707225270445
}

Value 63072000000000000 should be parsed as 1972-01-01 00:00:00. But RW parses it as 3968-09-03 02:00:00.000 +0200, which is 63072007200.

Error message/log

No response

To Reproduce

example json:

{
	"before": null,
	"after": {
	    "id": 13,
        "example_timestamp_tz": 63072000000000000
    },
	"source": {
		"table": "pathmetadata"
	},
	"op": "c",
	"ts_ms": 1707225270445
}

Expected behavior

Value 63072000000000000 should be parsed as 1972-01-01 00:00:00.

How did you deploy RisingWave?

No response

The version of RisingWave

No response

Additional context

No response

@MrCroxx MrCroxx added the type/bug Something isn't working label Apr 2, 2024
@github-actions github-actions bot added this to the release-1.8 milestone Apr 2, 2024
@tabVersion
Copy link
Contributor

seems the time unit inference gets this wrong, cc @xiangjinwu

pub fn i64_to_timestamptz(t: i64) -> Result<Timestamptz> {
const E11: u64 = 100_000_000_000;
const E14: u64 = 100_000_000_000_000;
const E17: u64 = 100_000_000_000_000_000;
match t.abs_diff(0) {
0..E11 => Ok(Timestamptz::from_secs(t).unwrap()), // s
E11..E14 => Ok(Timestamptz::from_millis(t).unwrap()), // ms
E14..E17 => Ok(Timestamptz::from_micros(t)), // us
E17.. => Ok(Timestamptz::from_micros(t / 1000)), // ns
}
}

@xiangjinwu
Copy link
Contributor

xiangjinwu commented Apr 3, 2024

To repeat myself one more time, this is by design and documented:

https://docs.risingwave.com/docs/current/sql-create-source/#debezium-json

Note that if you are ingesting data of type timestamp or timestamptz in RisingWave, the upstream value must be in the range of [1973-03-03 09:46:40, 5138-11-16 09:46:40] (UTC). The value may be parsed and ingested incorrectly without warning.

@MrCroxx Could you provide more context on this specific use case? How is the source created and how do we know if the unit is s / ms / us / ns?

Value 63072000000000000 should be parsed as 1970-01-01 00:00:00. But RW parses it as 3968-09-03 02:00:00.000 +0200, which is 63072007200.

This claim is problematic as well. How could a later date have a smaller value?

@MrCroxx Please correct the expected behavior before we can proceed. (fixed. see comment below)

@xiangjinwu xiangjinwu assigned MrCroxx and unassigned tabVersion Apr 3, 2024
@xiangjinwu
Copy link
Contributor

Sync'ed offline:

63072000000000000 in this case is expected to be 1972-01-01T00:00:00Z (in ns, not 1970) but is parsed as 3968-09-03T00:00:00Z (in us).

@fuyufjh
Copy link
Member

fuyufjh commented Apr 3, 2024

seems the time unit inference gets this wrong, cc @xiangjinwu

pub fn i64_to_timestamptz(t: i64) -> Result<Timestamptz> {
const E11: u64 = 100_000_000_000;
const E14: u64 = 100_000_000_000_000;
const E17: u64 = 100_000_000_000_000_000;
match t.abs_diff(0) {
0..E11 => Ok(Timestamptz::from_secs(t).unwrap()), // s
E11..E14 => Ok(Timestamptz::from_millis(t).unwrap()), // ms
E14..E17 => Ok(Timestamptz::from_micros(t)), // us
E17.. => Ok(Timestamptz::from_micros(t / 1000)), // ns
}
}

This solution looks acceptable, but is too general for some cases, for example

  1. If it's using the direct CDC, we can ensure the format must be milliseconds (after fix(direct cdc): use time.precision.mode=connect #16119)
  2. If users specify a parsing behavior manually via some WITH settings, we should follow that.

@MrCroxx MrCroxx assigned tabVersion and unassigned MrCroxx Apr 3, 2024
@xiangjinwu
Copy link
Contributor

xiangjinwu commented Apr 3, 2024

Update:

When consuming from debezium, the unit info is actually available as part of the schema:

We will focus on the second case here. Below is a message from debezium 2.4 MySQL connector:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": true,
            "field": "bid"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "io.debezium.time.MicroTimestamp",
            "version": 1,
            "field": "at"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.bar.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": true,
            "field": "bid"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "io.debezium.time.MicroTimestamp",
            "version": 1,
            "field": "at"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.bar.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.bar.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "bid": 8,
      "at": 1712132700123457
    },
    "source": {
      "version": "2.4.2.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1712132719000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "bar",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 3181,
      "row": 0,
      "thread": 8,
      "query": null
    },
    "op": "c",
    "ts_ms": 1712132719617,
    "transaction": null
  }
}

The relevant part is here:

      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": true,
            "field": "bid"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "io.debezium.time.MicroTimestamp",
            "version": 1,
            "field": "at"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.bar.Value",
        "field": "after"
      },

and

    "after": {
      "bid": 8,
      "at": 1712132700123457
    },

Note the field after.at has "type": "int64" and "name": "io.debezium.time.MicroTimestamp"

The structure of this schema field is actually from Java class org.apache.kafka.connect.json.JsonConverter:
https://github.com/apache/kafka/blob/3.7.0/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L313

@tabVersion
Copy link
Contributor

close this issue as completed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants