Skip to content

Commit

Permalink
handle cookies for kafkaproxy
Browse files Browse the repository at this point in the history
  • Loading branch information
mavemuri committed Apr 12, 2021
1 parent f0d3aa6 commit b362e09
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 14 deletions.
25 changes: 23 additions & 2 deletions src/main/java/kafdrop/controller/AclController.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.swagger.annotations.ApiResponses;
import kafdrop.model.AclVO;
import kafdrop.service.KafkaMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
Expand All @@ -31,11 +33,16 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

@Controller
public final class AclController {
private final KafkaMonitor kafkaMonitor;
private static final Logger LOG= LoggerFactory.getLogger(AclController.class);

@Value("${kafkaproxy.URL}")
String kafkaProxyURL;
Expand All @@ -46,13 +53,27 @@ public final class AclController {
@Value("${kafkaproxy.useCookie}")
Boolean useCookie;

String kafkaProxyCookie="";

public AclController(KafkaMonitor kafkaMonitor) {
this.kafkaMonitor = kafkaMonitor;
}

@PostConstruct
public void init() {
if(this.useCookie) {
try {
this.kafkaProxyCookie= Files.readString(Path.of(this.kafkaProxyCookiePath)).replace('\n', ' ');
}
catch (IOException | SecurityException | OutOfMemoryError e) {
LOG.error(e.toString());
}
}
}

@RequestMapping("/acl")
public String acls(Model model) {
final var acls = kafkaMonitor.getAcls(kafkaProxyURL, useCookie? kafkaProxyCookiePath: "");
final var acls = kafkaMonitor.getAcls(kafkaProxyURL, kafkaProxyCookie);
model.addAttribute("acls", acls);

return "acl-overview";
Expand All @@ -64,6 +85,6 @@ public String acls(Model model) {
})
@RequestMapping(path = "/acl", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET)
public @ResponseBody List<AclVO> getAllTopics() {
return kafkaMonitor.getAcls(kafkaProxyURL, useCookie? kafkaProxyCookiePath: "");
return kafkaMonitor.getAcls(kafkaProxyURL, kafkaProxyCookie);
}
}
25 changes: 23 additions & 2 deletions src/main/java/kafdrop/controller/QuotaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.swagger.annotations.ApiResponses;
import kafdrop.model.KafkaQuotaVO;
import kafdrop.service.KafkaMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
Expand All @@ -13,11 +15,16 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

@Controller
public class QuotaController {
private final KafkaMonitor kafkaMonitor;
private static final Logger LOG= LoggerFactory.getLogger(QuotaController.class);

@Value("${kafkaproxy.URL}")
String kafkaProxyURL;
Expand All @@ -28,13 +35,27 @@ public class QuotaController {
@Value("${kafkaproxy.useCookie}")
Boolean useCookie;

String kafkaProxyCookie="";

public QuotaController(KafkaMonitor kafkaMonitor) {
this.kafkaMonitor = kafkaMonitor;
}

@PostConstruct
public void init() {
if(this.useCookie) {
try {
this.kafkaProxyCookie= Files.readString(Path.of(this.kafkaProxyCookiePath)).replace('\n', ' ');
}
catch (IOException | SecurityException | OutOfMemoryError e) {
LOG.error(e.toString());
}
}
}

@RequestMapping("/quotas")
public String quotas(Model model) {
final var quotas = kafkaMonitor.getQuotas(kafkaProxyURL, useCookie? kafkaProxyCookiePath: "");
final var quotas = kafkaMonitor.getQuotas(kafkaProxyURL, kafkaProxyCookie);
model.addAttribute("quotas", quotas);

return "quota-overview";
Expand All @@ -47,6 +68,6 @@ public String quotas(Model model) {
@RequestMapping(path = "/quotas", produces = MediaType.APPLICATION_JSON_VALUE, method = RequestMethod.GET)
public @ResponseBody
List<KafkaQuotaVO> getAllQuotas() {
return kafkaMonitor.getQuotas(kafkaProxyURL, useCookie? kafkaProxyCookiePath: "");
return kafkaMonitor.getQuotas(kafkaProxyURL, kafkaProxyCookie);
}
}
4 changes: 2 additions & 2 deletions src/main/java/kafdrop/service/KafkaMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ List<MessageVO> getMessages(TopicPartition topicPartition, long offset, int coun
*/
void deleteTopic(String topic);

List<AclVO> getAcls(String kafkaProxyURL, String kafkaProxyCookiePath);
List<AclVO> getAcls(String kafkaProxyURL, String kafkaProxyCookie);

List<KafkaQuotaVO> getQuotas(String kafkaProxyURL, String kafkaProxyCookiePath);
List<KafkaQuotaVO> getQuotas(String kafkaProxyURL, String kafkaProxyCookie);
}
28 changes: 20 additions & 8 deletions src/main/java/kafdrop/service/KafkaMonitorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@
import org.apache.kafka.common.*;
import org.apache.kafka.common.header.*;
import org.slf4j.*;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.*;
import org.springframework.web.client.RestTemplate;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.Map.*;
import java.util.function.*;
Expand Down Expand Up @@ -221,23 +227,29 @@ public void deleteTopic(String topic) {
}

@Override
public List<AclVO> getAcls(String kafkaProxyURL, String kafkaProxyCookiePath) {
public List<AclVO> getAcls(String kafkaProxyURL, String kafkaProxyCookie) {
HttpHeaders headers= new HttpHeaders();
if(kafkaProxyCookiePath.length()!=0) {
// handle for APIC
}
RestTemplate restTemplate= new RestTemplate();
if(kafkaProxyCookie.length()!=0) {
headers.add("Cookie", "KafkaProxy-cookie="+kafkaProxyCookie);
HttpEntity httpEntity= new HttpEntity(headers);
ResponseEntity<AclVO[]> responseEntity= restTemplate.exchange(kafkaProxyURL+"/acls", HttpMethod.GET, httpEntity, AclVO[].class);
return Arrays.asList(responseEntity.getBody());
}
ResponseEntity<AclVO[]> responseEntity= restTemplate.getForEntity(kafkaProxyURL+"/acls", AclVO[].class);
return Arrays.asList(responseEntity.getBody());
}

@Override
public List<KafkaQuotaVO> getQuotas(String kafkaProxyURL, String kafkaProxyCookiePath) {
public List<KafkaQuotaVO> getQuotas(String kafkaProxyURL, String kafkaProxyCookie) {
HttpHeaders headers= new HttpHeaders();
if(kafkaProxyCookiePath.length()!=0) {
// handle for APIC
}
RestTemplate restTemplate= new RestTemplate();
if(kafkaProxyCookie.length()!=0) {
headers.add("Cookie", "KafkaProxy-cookie="+kafkaProxyCookie);
HttpEntity httpEntity= new HttpEntity(headers);
ResponseEntity<KafkaQuotaVO[]> responseEntity= restTemplate.exchange(kafkaProxyURL+"/quotas", HttpMethod.GET, httpEntity, KafkaQuotaVO[].class);
return Arrays.asList(responseEntity.getBody());
}
ResponseEntity<KafkaQuotaVO[]> responseEntity= restTemplate.getForEntity(kafkaProxyURL+"/quotas", KafkaQuotaVO[].class);
return Arrays.asList(responseEntity.getBody());
}
Expand Down

0 comments on commit b362e09

Please sign in to comment.