diff --git a/integration_tests/datagen/ad_click/ad_click.go b/integration_tests/datagen/ad_click/ad_click.go index 9ce71ae3f36bc..27928d3694e26 100644 --- a/integration_tests/datagen/ad_click/ad_click.go +++ b/integration_tests/datagen/ad_click/ad_click.go @@ -54,8 +54,8 @@ func (g *adClickGen) Load(ctx context.Context, outCh chan<- sink.SinkRecord) { record := &clickEvent{ UserId: rand.Int63n(100000), AdId: rand.Int63n(10), - ClickTimestamp: now.Add(time.Duration(rand.Intn(1000)) * time.Millisecond).Format(gen.RwTimestampLayout), - ImpressionTimestamp: now.Format(gen.RwTimestampLayout), + ClickTimestamp: now.Add(time.Duration(rand.Intn(1000)) * time.Millisecond).Format(gen.RwTimestamptzLayout), + ImpressionTimestamp: now.Format(gen.RwTimestamptzLayout), } select { case <-ctx.Done(): diff --git a/integration_tests/datagen/ad_ctr/ad_ctr.go b/integration_tests/datagen/ad_ctr/ad_ctr.go index 1134ce4c1e895..cd3000e33407e 100644 --- a/integration_tests/datagen/ad_ctr/ad_ctr.go +++ b/integration_tests/datagen/ad_ctr/ad_ctr.go @@ -96,14 +96,14 @@ func (g *adCtrGen) generate() []sink.SinkRecord { &adImpressionEvent{ BidId: bidId, AdId: adId, - ImpressionTimestamp: time.Now().Format(gen.RwTimestampLayout), + ImpressionTimestamp: time.Now().Format(gen.RwTimestamptzLayout), }, } if g.hasClick(adId) { randomDelay := time.Duration(g.faker.IntRange(1, 10) * int(time.Second)) events = append(events, &adClickEvent{ BidId: bidId, - ClickTimestamp: time.Now().Add(randomDelay).Format(gen.RwTimestampLayout), + ClickTimestamp: time.Now().Add(randomDelay).Format(gen.RwTimestamptzLayout), }) } return events diff --git a/integration_tests/datagen/cdn_metrics/nics.go b/integration_tests/datagen/cdn_metrics/nics.go index 6aae95479ec9f..a95be61012115 100644 --- a/integration_tests/datagen/cdn_metrics/nics.go +++ b/integration_tests/datagen/cdn_metrics/nics.go @@ -109,7 +109,7 @@ func (impl *deviceNicsMonitor) newMetrics( MetricName: metricName, Aggregation: aggregation, NicName: "eth" + strconv.Itoa(NicId), - ReportTime: reportTime.Format(gen.RwTimestampLayout), + ReportTime: reportTime.Format(gen.RwTimestamptzLayout), Bandwidth: maxBandwidth, Value: float64(value), } diff --git a/integration_tests/datagen/cdn_metrics/tcp.go b/integration_tests/datagen/cdn_metrics/tcp.go index da7ce31d76dd3..f315a7572d4b6 100644 --- a/integration_tests/datagen/cdn_metrics/tcp.go +++ b/integration_tests/datagen/cdn_metrics/tcp.go @@ -90,7 +90,7 @@ func (m *deviceTcpMonitor) newMetrics(metricName string, reportTime time.Time, v return &tcpMetric{ DeviceId: m.deviceId, MetricName: metricName, - ReportTime: reportTime.Format(gen.RwTimestampLayout), + ReportTime: reportTime.Format(gen.RwTimestamptzLayout), Value: value, } } diff --git a/integration_tests/datagen/clickstream/clickstream.go b/integration_tests/datagen/clickstream/clickstream.go index c0e9350b3f2b1..201610a299283 100644 --- a/integration_tests/datagen/clickstream/clickstream.go +++ b/integration_tests/datagen/clickstream/clickstream.go @@ -138,7 +138,7 @@ func (g *clickStreamGen) generate() sink.SinkRecord { UserId: fmt.Sprint(userId), TargetId: string(target) + fmt.Sprint(targetId), TargetType: string(target), - EventTimestamp: time.Now().Format(gen.RwTimestampLayout), + EventTimestamp: time.Now().Format(gen.RwTimestamptzLayout), BehaviorType: behavior, ParentTargetType: parentTargetType, ParentTargetId: parentTargetId, diff --git a/integration_tests/datagen/delivery/delivery.go b/integration_tests/datagen/delivery/delivery.go index 0ca20dd689fea..d8e1133f71497 100644 --- a/integration_tests/datagen/delivery/delivery.go +++ b/integration_tests/datagen/delivery/delivery.go @@ -69,7 +69,7 @@ func (g *orderEventGen) Load(ctx context.Context, outCh chan<- sink.SinkRecord) OrderId: g.seqOrderId, RestaurantId: rand.Int63n(num_of_restaurants), OrderState: order_states[rand.Intn(len(order_states))], - OrderTimestamp: now.Add(time.Duration(rand.Intn(total_minutes)) * time.Minute).Format(gen.RwTimestampLayout), + OrderTimestamp: now.Add(time.Duration(rand.Intn(total_minutes)) * time.Minute).Format(gen.RwTimestampNaiveLayout), } g.seqOrderId++ select { diff --git a/integration_tests/datagen/ecommerce/ecommerce.go b/integration_tests/datagen/ecommerce/ecommerce.go index 18520c9b7eb60..34ee31cde6931 100644 --- a/integration_tests/datagen/ecommerce/ecommerce.go +++ b/integration_tests/datagen/ecommerce/ecommerce.go @@ -103,7 +103,7 @@ func (g *ecommerceGen) KafkaTopics() []string { } func (g *ecommerceGen) generate() []sink.SinkRecord { - ts := time.Now().Format(gen.RwTimestampLayout) + ts := time.Now().Format(gen.RwTimestampNaiveLayout) if g.faker.Bool() && g.seqShipId >= g.seqOrderId { // New order. diff --git a/integration_tests/datagen/gen/generator.go b/integration_tests/datagen/gen/generator.go index d519beec08c35..f84ffe3fcdea4 100644 --- a/integration_tests/datagen/gen/generator.go +++ b/integration_tests/datagen/gen/generator.go @@ -9,6 +9,7 @@ import ( "datagen/sink/postgres" "datagen/sink/pulsar" "datagen/sink/s3" + "time" "gonum.org/v1/gonum/stat/distuv" ) @@ -47,7 +48,8 @@ type LoadGenerator interface { Load(ctx context.Context, outCh chan<- sink.SinkRecord) } -const RwTimestampLayout = "2006-01-02 15:04:05.07+01:00" +const RwTimestampNaiveLayout = time.DateTime +const RwTimestamptzLayout = time.RFC3339 type RandDist interface { // Rand returns a random number ranging from [0, max]. diff --git a/integration_tests/datagen/twitter/twitter.go b/integration_tests/datagen/twitter/twitter.go index 06a235aaf7d02..1daf193c36e6f 100644 --- a/integration_tests/datagen/twitter/twitter.go +++ b/integration_tests/datagen/twitter/twitter.go @@ -120,7 +120,7 @@ func NewTwitterGen() gen.LoadGenerator { endTime, _ := time.Parse("2006-01-01", fmt.Sprintf("%d-01-01", endYear)) startTime, _ := time.Parse("2006-01-01", fmt.Sprintf("%d-01-01", startYear)) users[id] = &twitterUser{ - CreatedAt: faker.DateRange(startTime, endTime).Format(gen.RwTimestampLayout), + CreatedAt: faker.DateRange(startTime, endTime).Format(gen.RwTimestamptzLayout), Id: id, Name: fmt.Sprintf("%s %s", faker.Name(), faker.Adverb()), UserName: faker.Username(), @@ -152,7 +152,7 @@ func (t *twitterGen) generate() twitterEvent { return twitterEvent{ Data: tweetData{ Id: id, - CreatedAt: time.Now().Format(gen.RwTimestampLayout), + CreatedAt: time.Now().Format(gen.RwTimestamptzLayout), Text: sentence, Lang: gofakeit.Language(), }, diff --git a/integration_tests/twitter-pulsar/pb/create_mv.sql b/integration_tests/twitter-pulsar/pb/create_mv.sql index c08722bacdbb3..06d2eb14e4074 100644 --- a/integration_tests/twitter-pulsar/pb/create_mv.sql +++ b/integration_tests/twitter-pulsar/pb/create_mv.sql @@ -4,7 +4,7 @@ CREATE MATERIALIZED VIEW hot_hashtags AS WITH tags AS ( SELECT unnest(regexp_matches((data).text, '#\w+', 'g')) AS hashtag, - (data).created_at :: timestamp AS created_at + (data).created_at :: timestamptz AS created_at FROM twitter ) diff --git a/integration_tests/twitter/pb/create_mv.sql b/integration_tests/twitter/pb/create_mv.sql index c08722bacdbb3..06d2eb14e4074 100644 --- a/integration_tests/twitter/pb/create_mv.sql +++ b/integration_tests/twitter/pb/create_mv.sql @@ -4,7 +4,7 @@ CREATE MATERIALIZED VIEW hot_hashtags AS WITH tags AS ( SELECT unnest(regexp_matches((data).text, '#\w+', 'g')) AS hashtag, - (data).created_at :: timestamp AS created_at + (data).created_at :: timestamptz AS created_at FROM twitter )