diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index a8113f84..e7d37c73 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -18,7 +18,11 @@ package kafdrop.controller; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -28,9 +32,16 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import com.google.common.net.HttpHeaders; +import kafdrop.service.KafkaMonitorImpl; import kafdrop.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; +import org.springframework.stereotype.Repository; import org.springframework.ui.Model; import org.springframework.validation.BindingResult; import org.springframework.web.bind.annotation.GetMapping; @@ -64,6 +75,9 @@ @Controller public final class MessageController { + + private static final Logger LOG = LoggerFactory.getLogger(MessageController.class); + private final KafkaMonitor kafkaMonitor; private final MessageInspector messageInspector; @@ -208,6 +222,34 @@ private MessageFormat getSelectedMessageFormat(String format) { + @ApiOperation(value = "getMessages") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = ResponseEntity.class), + @ApiResponse(code = 404, message = "Invalid topic name") + }) + @RequestMapping(method = RequestMethod.GET, value = "/topic/{name:.+}/messages", produces = MediaType.APPLICATION_JSON_VALUE, params="download") + public ResponseEntity getMessages( + @PathVariable("name") String topicName, + @RequestParam(name = "partition", required = false) Integer partition, + @RequestParam(name = "offset", required = false) Long offset, + @RequestParam(name = "count", required = false) Integer count, + @RequestParam(name = "format", required = false) String format, + @RequestParam(name = "keyFormat", required = false) String keyFormat, + @RequestParam(name = "descFile", required = false) String descFile, + @RequestParam(name = "msgTypeName", required = false) String msgTypeName + ) { + if(count>10000) { + return ResponseEntity.badRequest().body("Maximum number for download is 10000- please reduce count or change offset".getBytes()); + } + try { + List messagesList= (List) (Object)getPartitionOrMessages(topicName, partition, offset, count, format, keyFormat, descFile, msgTypeName); + return ResponseEntity.ok().contentType(MediaType.APPLICATION_JSON). + header(HttpHeaders.CONTENT_DISPOSITION, String.format("attachment; filename=%1$s-%2$s-messages.json", topicName, partition)).body(messagesList.toString().getBytes()); + } catch (Exception e) { + e.printStackTrace(); + return ResponseEntity.badRequest().body("Error processing- Please check input".getBytes()); + } + } /** * Return a JSON list of all partition offset info for the given topic. If specific partition * and offset parameters are given, then this returns actual kafka messages from that partition diff --git a/src/main/java/kafdrop/model/MessageVO.java b/src/main/java/kafdrop/model/MessageVO.java index e0cd80d0..3b137c2d 100644 --- a/src/main/java/kafdrop/model/MessageVO.java +++ b/src/main/java/kafdrop/model/MessageVO.java @@ -18,6 +18,8 @@ package kafdrop.model; +import org.hibernate.validator.internal.IgnoreForbiddenApisErrors; + import java.util.*; import java.util.stream.*; @@ -76,4 +78,9 @@ public Date getTimestamp() { public void setTimestamp(Date timestamp) { this.timestamp = timestamp; } + + @Override + public String toString() { + return "{ \"Partition\": "+partition+", \"Offset\": "+offset+", \"Message\": \""+message.replace("\"", "\\\"").replace("\\\\\"", "\\\"")+"\", \"Key\": \""+key+"\", \"Headers\": \""+headers.toString()+"\", \"Timestamp\": \""+timestamp+"\" }"; + } } diff --git a/src/main/resources/templates/message-inspector.ftl b/src/main/resources/templates/message-inspector.ftl index 2911f5a0..e0b63b2f 100644 --- a/src/main/resources/templates/message-inspector.ftl +++ b/src/main/resources/templates/message-inspector.ftl @@ -150,6 +150,8 @@    +    +