Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE: Chore: Drop messages v1 #633

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_READ;

import io.kafbat.ui.api.MessagesApi;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.CreateTopicMessageDTO;
import io.kafbat.ui.model.MessageFilterIdDTO;
import io.kafbat.ui.model.MessageFilterRegistrationDTO;
import io.kafbat.ui.model.MessageFilterTypeDTO;
import io.kafbat.ui.model.PollingModeDTO;
import io.kafbat.ui.model.SeekDirectionDTO;
import io.kafbat.ui.model.SeekTypeDTO;
import io.kafbat.ui.model.SerdeUsageDTO;
import io.kafbat.ui.model.SmartFilterTestExecutionDTO;
import io.kafbat.ui.model.SmartFilterTestExecutionResultDTO;
Expand Down Expand Up @@ -73,25 +69,8 @@ public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilte
.map(ResponseEntity::ok);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
SeekTypeDTO seekType,
List<String> seekTo,
Integer limit,
String q,
MessageFilterTypeDTO filterQueryType,
SeekDirectionDTO seekDirection,
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
throw new ValidationException("Not supported");
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName, String topicName,
PollingModeDTO mode,
List<Integer> partitions,
Integer limit,
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/io/kafbat/ui/KafkaConsumerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void shouldDeleteRecords() {
}

long count = webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages?mode=EARLIEST", LOCAL, topicName)
.accept(TEXT_EVENT_STREAM)
.exchange()
.expectStatus()
Expand All @@ -77,7 +77,7 @@ public void shouldDeleteRecords() {
.isOk();

count = webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages?mode=EARLIEST", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
Expand Down
142 changes: 27 additions & 115 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -662,33 +662,45 @@ paths:
required: true
schema:
type: string
- name: seekType
- name: mode
in: query
description: Messages polling mode
schema:
$ref: "#/components/schemas/SeekType"
- name: seekTo
$ref: "#/components/schemas/PollingMode"
- name: partitions
in: query
schema:
type: array
description: List of target partitions (all partitions if not provided)
items:
type: string
description: The format is [partition]::[offset] for specifying offsets or [partition]::[timestamp in millis] for specifying timestamps
type: integer
- name: limit
in: query
description: Max number of messages can be returned
schema:
type: integer
- name: q
- name: stringFilter
in: query
description: query string to contains string filtration
schema:
type: string
- name: filterQueryType
- name: smartFilterId
in: query
description: filter id, that was registered beforehand
schema:
$ref: "#/components/schemas/MessageFilterType"
- name: seekDirection
type: string
- name: offset
in: query
description: message offset to read from / to
schema:
$ref: "#/components/schemas/SeekDirection"
type: integer
format: int64
- name: timestamp
in: query
description: timestamp (in ms) to read from / to
schema:
type: integer
format: int64
- name: keySerde
in: query
description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
Expand All @@ -699,6 +711,11 @@ paths:
description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
schema:
type: string
- name: cursor
in: query
description: "id of the cursor for pagination, if passed - all other query params ignored"
schema:
type: string
responses:
200:
description: OK
Expand Down Expand Up @@ -793,89 +810,6 @@ paths:
schema:
$ref: '#/components/schemas/MessageFilterId'


/api/clusters/{clusterName}/topics/{topicName}/messages/v2:
get:
tags:
- Messages
summary: getTopicMessagesV2
operationId: getTopicMessagesV2
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: topicName
in: path
required: true
schema:
type: string
- name: mode
in: query
description: Messages polling mode
schema:
$ref: "#/components/schemas/PollingMode"
- name: partitions
in: query
schema:
type: array
description: List of target partitions (all partitions if not provided)
items:
type: integer
- name: limit
in: query
description: Max number of messages can be returned
schema:
type: integer
- name: stringFilter
in: query
description: query string to contains string filtration
schema:
type: string
- name: smartFilterId
in: query
description: filter id, that was registered beforehand
schema:
type: string
- name: offset
in: query
description: message offset to read from / to
schema:
type: integer
format: int64
- name: timestamp
in: query
description: timestamp (in ms) to read from / to
schema:
type: integer
format: int64
- name: keySerde
in: query
description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
schema:
type: string
- name: valueSerde
in: query
description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
schema:
type: string
- name: cursor
in: query
description: "id of the cursor for pagination, if passed - all other query params ignored"
schema:
type: string
responses:
200:
description: OK
content:
text/event-stream:
schema:
type: array
items:
$ref: '#/components/schemas/TopicMessageEvent'


/api/clusters/{clusterName}/topics/{topicName}/activeproducers:
get:
tags:
Expand Down Expand Up @@ -3080,14 +3014,6 @@ components:
- offset
- timestamp

SeekType:
type: string
enum:
- BEGINNING
- OFFSET
- TIMESTAMP
- LATEST

MessageFilterRegistration:
type: object
properties:
Expand All @@ -3111,20 +3037,6 @@ components:
- EARLIEST
- TAILING

MessageFilterType:
type: string
enum:
- STRING_CONTAINS
- CEL_SCRIPT

SeekDirection:
type: string
enum:
- FORWARD
- BACKWARD
- TAILING
default: FORWARD

Partition:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/lib/hooks/api/topicMessages.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export const useTopicMessages = ({

const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
clusterName
)}/topics/${topicName}/messages/v2`;
)}/topics/${topicName}/messages`;

const requestParams = new URLSearchParams({
limit: searchParams.get(MessagesFilterKeys.limit) || MESSAGES_PER_PAGE,
Expand Down
Loading