diff --git a/src/main/java/kafdrop/controller/AclController.java b/src/main/java/kafdrop/controller/AclController.java index 2d8398cd..0e683b4d 100644 --- a/src/main/java/kafdrop/controller/AclController.java +++ b/src/main/java/kafdrop/controller/AclController.java @@ -23,6 +23,8 @@ import io.swagger.annotations.ApiResponses; import kafdrop.model.AclVO; import kafdrop.service.KafkaMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; @@ -31,11 +33,16 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; +import javax.annotation.PostConstruct; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; @Controller public final class AclController { private final KafkaMonitor kafkaMonitor; + private static final Logger LOG= LoggerFactory.getLogger(AclController.class); @Value("${kafkaproxy.URL}") String kafkaProxyURL; @@ -46,13 +53,27 @@ public final class AclController { @Value("${kafkaproxy.useCookie}") Boolean useCookie; + String kafkaProxyCookie=""; + public AclController(KafkaMonitor kafkaMonitor) { this.kafkaMonitor = kafkaMonitor; } + @PostConstruct + public void init() { + if(this.useCookie) { + try { + this.kafkaProxyCookie= Files.readString(Path.of(this.kafkaProxyCookiePath)).replace('\n', ' '); + } + catch (IOException | SecurityException | OutOfMemoryError e) { + LOG.error(e.toString()); + } + } + } + @RequestMapping("/acl") public String acls(Model model) { - final var acls = kafkaMonitor.getAcls(kafkaProxyURL, useCookie? kafkaProxyCookiePath: ""); + final var acls = kafkaMonitor.getAcls(kafkaProxyURL, kafkaProxyCookie); model.addAttribute("acls", acls); return "acl-overview"; @@ -64,6 +85,6 @@ public String acls(Model model) { }) @RequestMapping(path = "/acl", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody List getAllTopics() { - return kafkaMonitor.getAcls(kafkaProxyURL, useCookie? kafkaProxyCookiePath: ""); + return kafkaMonitor.getAcls(kafkaProxyURL, kafkaProxyCookie); } } diff --git a/src/main/java/kafdrop/controller/QuotaController.java b/src/main/java/kafdrop/controller/QuotaController.java index 08acf6f8..02393493 100644 --- a/src/main/java/kafdrop/controller/QuotaController.java +++ b/src/main/java/kafdrop/controller/QuotaController.java @@ -5,6 +5,8 @@ import io.swagger.annotations.ApiResponses; import kafdrop.model.KafkaQuotaVO; import kafdrop.service.KafkaMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; @@ -13,11 +15,16 @@ import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; +import javax.annotation.PostConstruct; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.List; @Controller public class QuotaController { private final KafkaMonitor kafkaMonitor; + private static final Logger LOG= LoggerFactory.getLogger(QuotaController.class); @Value("${kafkaproxy.URL}") String kafkaProxyURL; @@ -28,13 +35,27 @@ public class QuotaController { @Value("${kafkaproxy.useCookie}") Boolean useCookie; + String kafkaProxyCookie=""; + public QuotaController(KafkaMonitor kafkaMonitor) { this.kafkaMonitor = kafkaMonitor; } + @PostConstruct + public void init() { + if(this.useCookie) { + try { + this.kafkaProxyCookie= Files.readString(Path.of(this.kafkaProxyCookiePath)).replace('\n', ' '); + } + catch (IOException | SecurityException | OutOfMemoryError e) { + LOG.error(e.toString()); + } + } + } + @RequestMapping("/quotas") public String quotas(Model model) { - final var quotas = kafkaMonitor.getQuotas(kafkaProxyURL, useCookie? kafkaProxyCookiePath: ""); + final var quotas = kafkaMonitor.getQuotas(kafkaProxyURL, kafkaProxyCookie); model.addAttribute("quotas", quotas); return "quota-overview"; @@ -47,6 +68,6 @@ public String quotas(Model model) { @RequestMapping(path = "/quotas", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET) public @ResponseBody List getAllQuotas() { - return kafkaMonitor.getQuotas(kafkaProxyURL, useCookie? kafkaProxyCookiePath: ""); + return kafkaMonitor.getQuotas(kafkaProxyURL, kafkaProxyCookie); } } diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index effdf932..7dda54a8 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -58,7 +58,7 @@ List getMessages(TopicPartition topicPartition, long offset, int coun */ void deleteTopic(String topic); - List getAcls(String kafkaProxyURL, String kafkaProxyCookiePath); + List getAcls(String kafkaProxyURL, String kafkaProxyCookie); - List getQuotas(String kafkaProxyURL, String kafkaProxyCookiePath); + List getQuotas(String kafkaProxyURL, String kafkaProxyCookie); } diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 6ef4da9d..09af34d8 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -26,11 +26,17 @@ import org.apache.kafka.common.*; import org.apache.kafka.common.header.*; import org.slf4j.*; +import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.*; import org.springframework.web.client.RestTemplate; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.*; import java.util.Map.*; import java.util.function.*; @@ -221,23 +227,29 @@ public void deleteTopic(String topic) { } @Override - public List getAcls(String kafkaProxyURL, String kafkaProxyCookiePath) { + public List getAcls(String kafkaProxyURL, String kafkaProxyCookie) { HttpHeaders headers= new HttpHeaders(); - if(kafkaProxyCookiePath.length()!=0) { - // handle for APIC - } RestTemplate restTemplate= new RestTemplate(); + if(kafkaProxyCookie.length()!=0) { + headers.add("Cookie", "KafkaProxy-cookie="+kafkaProxyCookie); + HttpEntity httpEntity= new HttpEntity(headers); + ResponseEntity responseEntity= restTemplate.exchange(kafkaProxyURL+"/acls", HttpMethod.GET, httpEntity, AclVO[].class); + return Arrays.asList(responseEntity.getBody()); + } ResponseEntity responseEntity= restTemplate.getForEntity(kafkaProxyURL+"/acls", AclVO[].class); return Arrays.asList(responseEntity.getBody()); } @Override - public List getQuotas(String kafkaProxyURL, String kafkaProxyCookiePath) { + public List getQuotas(String kafkaProxyURL, String kafkaProxyCookie) { HttpHeaders headers= new HttpHeaders(); - if(kafkaProxyCookiePath.length()!=0) { - // handle for APIC - } RestTemplate restTemplate= new RestTemplate(); + if(kafkaProxyCookie.length()!=0) { + headers.add("Cookie", "KafkaProxy-cookie="+kafkaProxyCookie); + HttpEntity httpEntity= new HttpEntity(headers); + ResponseEntity responseEntity= restTemplate.exchange(kafkaProxyURL+"/quotas", HttpMethod.GET, httpEntity, KafkaQuotaVO[].class); + return Arrays.asList(responseEntity.getBody()); + } ResponseEntity responseEntity= restTemplate.getForEntity(kafkaProxyURL+"/quotas", KafkaQuotaVO[].class); return Arrays.asList(responseEntity.getBody()); }